This commit is contained in:
ddrwode
2026-01-31 10:35:25 +08:00
parent c85f370e32
commit 970080a2e6
13 changed files with 14982 additions and 0 deletions

View File

@@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
"""
Bitmart 多周期K线数据抓取 - 供回测与实盘使用
与 bitmart/回测.py、bitmart/抓取多周期K线.py 相同的 API 调用方式
"""
import time
import csv
import os
from typing import List, Dict, Optional
try:
from loguru import logger
except ImportError:
import logging
logger = logging.getLogger(__name__)
# 项目根目录为 lm_codebitmart 包在根目录
import sys
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(SCRIPT_DIR)
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
# 默认 API与 bitmart/回测.py 一致,回测仅拉数据可不填有效 key
# APIContract 在 fetch_klines / fetch_multi_timeframe 内按需导入,以便 --no-api 回测不依赖 bitmart
DEFAULT_API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
DEFAULT_SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
DEFAULT_MEMO = "合约交易"
CONTRACT_SYMBOL = "ETHUSDT"
def _format_bar(k: dict) -> dict:
"""API 单根K线 -> 统一格式id 为秒时间戳"""
return {
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
"volume": float(k.get("volume", 0)),
}
def fetch_klines(
contract_api: "APIContract",
step: int,
start_time: int,
end_time: int,
symbol: str = CONTRACT_SYMBOL,
) -> List[Dict]:
"""
拉取指定周期的K线与 bitmart 回测抓取方式一致)。
:param contract_api: bitmart.api_contract.APIContract 实例
:param step: K线周期分钟如 5/15/60
:param start_time: 开始时间戳(秒)
:param end_time: 结束时间戳(秒)
:param symbol: 合约符号
:return: [{"id", "open", "high", "low", "close", "volume"}, ...],按 id 升序
"""
all_data: List[Dict] = []
existing_ids = set()
request_interval = step * 60 * 500 # 每次最多约 500 根
current_start = start_time
while current_start < end_time:
current_end = min(current_start + request_interval, end_time)
try:
response = contract_api.get_kline(
contract_symbol=symbol,
step=step,
start_time=current_start,
end_time=current_end,
)[0]
if response.get("code") != 1000:
logger.warning(f"get_kline code={response.get('code')}, msg={response.get('msg')}")
time.sleep(1)
current_start = current_end
continue
data = response.get("data", [])
except Exception as e:
logger.warning(f"get_kline 异常 step={step} {e}60秒后重试")
time.sleep(60)
continue
for k in data:
k_id = int(k["timestamp"])
if k_id in existing_ids:
continue
existing_ids.add(k_id)
all_data.append(_format_bar(k))
if len(data) < 500:
current_start = current_end
else:
all_data.sort(key=lambda x: x["id"])
current_start = all_data[-1]["id"] + 1
time.sleep(0.25)
all_data.sort(key=lambda x: x["id"])
return all_data
def fetch_multi_timeframe(
start_time: int,
end_time: int,
steps: List[int] = None,
api_key: str = DEFAULT_API_KEY,
secret_key: str = DEFAULT_SECRET_KEY,
memo: str = DEFAULT_MEMO,
symbol: str = CONTRACT_SYMBOL,
) -> Dict[int, List[Dict]]:
"""
拉取多周期K线5/15/60供回测使用。
:return: { step: [kline_list] }
"""
steps = steps or [5, 15, 60]
from bitmart.api_contract import APIContract
api = APIContract(api_key, secret_key, memo, timeout=(5, 15))
result = {}
for step in steps:
logger.info(f"抓取 {step} 分钟 K 线: {start_time} ~ {end_time}")
result[step] = fetch_klines(api, step, start_time, end_time, symbol)
logger.info(f" -> {len(result[step])}")
return result
def save_klines_csv(klines: List[Dict], path: str) -> None:
"""将K线保存为 CSVid, open, high, low, close, volume"""
if not klines:
return
cols = ["id", "open", "high", "low", "close", "volume"]
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=cols)
w.writeheader()
for row in klines:
w.writerow({k: row.get(k) for k in cols})
logger.info(f"已保存 {len(klines)} 条到 {path}")
def load_klines_csv(path: str) -> List[Dict]:
"""从 CSV 加载K线"""
if not os.path.isfile(path):
return []
with open(path, "r", encoding="utf-8") as f:
r = csv.DictReader(f)
rows = list(r)
out = []
for row in rows:
out.append({
"id": int(row["id"]),
"open": float(row["open"]),
"high": float(row["high"]),
"low": float(row["low"]),
"close": float(row["close"]),
"volume": float(row.get("volume", 0)),
})
out.sort(key=lambda x: x["id"])
return out
if __name__ == "__main__":
import datetime
# 示例:拉取最近约 30 天 5/15/60 分钟数据并保存
end_ts = int(time.time())
start_ts = end_ts - 30 * 24 * 3600
data_dir = os.path.join(SCRIPT_DIR, "data")
os.makedirs(data_dir, exist_ok=True)
data = fetch_multi_timeframe(start_ts, end_ts, steps=[5, 15, 60])
for step, klines in data.items():
path = os.path.join(data_dir, f"kline_{step}m.csv")
save_klines_csv(klines, path)