# -*- coding: utf-8 -*- """ Bitmart 多周期K线数据抓取 - 供回测与实盘使用 与 bitmart/回测.py、bitmart/抓取多周期K线.py 相同的 API 调用方式 """ import time import csv import os from typing import List, Dict, Optional try: from loguru import logger except ImportError: import logging logger = logging.getLogger(__name__) # 项目根目录为 lm_code,bitmart 包在根目录 import sys SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(SCRIPT_DIR) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) # 默认 API(与 bitmart/回测.py 一致,回测仅拉数据可不填有效 key) # APIContract 在 fetch_klines / fetch_multi_timeframe 内按需导入,以便 --no-api 回测不依赖 bitmart DEFAULT_API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" DEFAULT_SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" DEFAULT_MEMO = "合约交易" CONTRACT_SYMBOL = "ETHUSDT" def _format_bar(k: dict) -> dict: """API 单根K线 -> 统一格式,id 为秒时间戳""" return { "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), "volume": float(k.get("volume", 0)), } def fetch_klines( contract_api: "APIContract", step: int, start_time: int, end_time: int, symbol: str = CONTRACT_SYMBOL, ) -> List[Dict]: """ 拉取指定周期的K线(与 bitmart 回测抓取方式一致)。 :param contract_api: bitmart.api_contract.APIContract 实例 :param step: K线周期(分钟),如 5/15/60 :param start_time: 开始时间戳(秒) :param end_time: 结束时间戳(秒) :param symbol: 合约符号 :return: [{"id", "open", "high", "low", "close", "volume"}, ...],按 id 升序 """ all_data: List[Dict] = [] existing_ids = set() request_interval = step * 60 * 500 # 每次最多约 500 根 current_start = start_time while current_start < end_time: current_end = min(current_start + request_interval, end_time) try: response = contract_api.get_kline( contract_symbol=symbol, step=step, start_time=current_start, end_time=current_end, )[0] if response.get("code") != 1000: logger.warning(f"get_kline code={response.get('code')}, msg={response.get('msg')}") time.sleep(1) current_start = current_end continue data = response.get("data", []) except Exception as e: logger.warning(f"get_kline 异常 step={step} {e},60秒后重试") time.sleep(60) continue for k in data: k_id = int(k["timestamp"]) if k_id in existing_ids: continue existing_ids.add(k_id) all_data.append(_format_bar(k)) if len(data) < 500: current_start = current_end else: all_data.sort(key=lambda x: x["id"]) current_start = all_data[-1]["id"] + 1 time.sleep(0.25) all_data.sort(key=lambda x: x["id"]) return all_data def fetch_multi_timeframe( start_time: int, end_time: int, steps: List[int] = None, api_key: str = DEFAULT_API_KEY, secret_key: str = DEFAULT_SECRET_KEY, memo: str = DEFAULT_MEMO, symbol: str = CONTRACT_SYMBOL, ) -> Dict[int, List[Dict]]: """ 拉取多周期K线(5/15/60),供回测使用。 :return: { step: [kline_list] } """ steps = steps or [5, 15, 60] from bitmart.api_contract import APIContract api = APIContract(api_key, secret_key, memo, timeout=(5, 15)) result = {} for step in steps: logger.info(f"抓取 {step} 分钟 K 线: {start_time} ~ {end_time}") result[step] = fetch_klines(api, step, start_time, end_time, symbol) logger.info(f" -> {len(result[step])} 条") return result def save_klines_csv(klines: List[Dict], path: str) -> None: """将K线保存为 CSV(id, open, high, low, close, volume)""" if not klines: return cols = ["id", "open", "high", "low", "close", "volume"] with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() for row in klines: w.writerow({k: row.get(k) for k in cols}) logger.info(f"已保存 {len(klines)} 条到 {path}") def load_klines_csv(path: str) -> List[Dict]: """从 CSV 加载K线""" if not os.path.isfile(path): return [] with open(path, "r", encoding="utf-8") as f: r = csv.DictReader(f) rows = list(r) out = [] for row in rows: out.append({ "id": int(row["id"]), "open": float(row["open"]), "high": float(row["high"]), "low": float(row["low"]), "close": float(row["close"]), "volume": float(row.get("volume", 0)), }) out.sort(key=lambda x: x["id"]) return out if __name__ == "__main__": import datetime # 示例:拉取最近约 30 天 5/15/60 分钟数据并保存 end_ts = int(time.time()) start_ts = end_ts - 30 * 24 * 3600 data_dir = os.path.join(SCRIPT_DIR, "data") os.makedirs(data_dir, exist_ok=True) data = fetch_multi_timeframe(start_ts, end_ts, steps=[5, 15, 60]) for step, klines in data.items(): path = os.path.join(data_dir, f"kline_{step}m.csv") save_klines_csv(klines, path)