""" BitMart 多周期K线数据抓取脚本 支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据 支持秒级价格数据(通过成交记录API) 支持断点续传,从数据库最新/最早记录继续抓取 """ import time import datetime from pathlib import Path from loguru import logger from peewee import * from bitmart.api_contract import APIContract # 数据库配置(使用脚本所在项目目录下的 models) DB_PATH = Path(__file__).parent / 'models' / 'database.db' db = SqliteDatabase(str(DB_PATH)) # K线周期配置:step值 -> 表名后缀 KLINE_CONFIGS = { 1: '1m', # 1分钟 3: '3m', # 3分钟 5: '5m', # 5分钟 15: '15m', # 15分钟 30: '30m', # 30分钟 60: '1h', # 1小时 } class BitMartETHTrades(Model): """成交记录模型(秒级/毫秒级原始数据)""" id = BigIntegerField(primary_key=True) # 成交ID timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒) price = FloatField() # 成交价格 volume = FloatField() # 成交量 side = IntegerField() # 方向: 1=买, -1=卖 class Meta: database = db table_name = 'bitmart_eth_trades' class BitMartETHSecond(Model): """秒级K线模型(由成交记录聚合而来)""" id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) volume = FloatField(null=True) trade_count = IntegerField(null=True) # 该秒内成交笔数 class Meta: database = db table_name = 'bitmart_eth_1s' def create_kline_model(step: int): """ 动态创建K线数据模型 :param step: K线周期(分钟) :return: Model类 """ suffix = KLINE_CONFIGS.get(step, f'{step}m') tbl_name = f'bitmart_eth_{suffix}' # 使用 type() 动态创建类,避免闭包问题 attrs = { 'id': BigIntegerField(primary_key=True), 'open': FloatField(null=True), 'high': FloatField(null=True), 'low': FloatField(null=True), 'close': FloatField(null=True), } # 创建 Meta 类 meta_attrs = { 'database': db, 'table_name': tbl_name, } Meta = type('Meta', (), meta_attrs) attrs['Meta'] = Meta # 动态创建 Model 类 model_name = f'BitMartETH{suffix.upper()}' KlineModel = type(model_name, (Model,), attrs) return KlineModel class BitMartMultiKlineCollector: """多周期K线数据抓取器""" def __init__(self): self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" self.memo = "数据抓取" self.contract_symbol = "ETHUSDT" self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) # 存储各周期的模型 self.models = {} # 初始化数据库连接和表 self._init_database() def _init_database(self): """初始化数据库,创建所有周期的表""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) db.connect(reuse_if_open=True) for step in KLINE_CONFIGS.keys(): model = create_kline_model(step) self.models[step] = model # 创建表(如果不存在) db.create_tables([model], safe=True) logger.info(f"初始化表: {model._meta.table_name}") # 创建成交记录表和秒级K线表 db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True) logger.info(f"初始化表: bitmart_eth_trades (成交记录)") logger.info(f"初始化表: bitmart_eth_1s (秒级K线)") def get_db_time_range(self, step: int): """ 获取数据库中已有数据的时间范围 :param step: K线周期 :return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None) """ model = self.models.get(step) if not model: return None, None try: # 获取最早记录 earliest = model.select(fn.MIN(model.id)).scalar() # 获取最新记录 latest = model.select(fn.MAX(model.id)).scalar() return earliest, latest except Exception as e: logger.error(f"查询数据库时间范围异常: {e}") return None, None def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3): """ 获取K线数据(带重试) :param step: K线周期(分钟) :param start_time: 开始时间戳(秒级) :param end_time: 结束时间戳(秒级) :param max_retries: 最大重试次数 :return: K线数据列表 """ for attempt in range(max_retries): try: start_time = int(start_time) end_time = int(end_time) response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=step, start_time=start_time, end_time=end_time )[0] if response['code'] != 1000: logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}") if attempt < max_retries - 1: time.sleep(1) continue return [] klines = response.get('data', []) formatted = [] for k in klines: timestamp_ms = int(k["timestamp"]) * 1000 formatted.append({ 'id': timestamp_ms, 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) return formatted except Exception as e: logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(2) continue return [] return [] def save_klines(self, step: int, klines: list): """ 保存K线数据到数据库 :param step: K线周期 :param klines: K线数据列表 :return: 新保存的数量 """ model = self.models.get(step) if not model: logger.error(f"未找到 {step}分钟 的数据模型") return 0 new_count = 0 for kline in klines: try: _, created = model.get_or_create( id=kline['id'], defaults={ 'open': kline['open'], 'high': kline['high'], 'low': kline['low'], 'close': kline['close'], } ) if created: new_count += 1 except Exception as e: logger.error(f"保存K线数据失败 {kline['id']}: {e}") return new_count def get_batch_seconds(self, step: int): """根据周期获取合适的批次大小""" if step == 1: return 3600 * 4 # 1分钟: 每次4小时 elif step == 3: return 3600 * 8 # 3分钟: 每次8小时 elif step == 5: return 3600 * 12 # 5分钟: 每次12小时 elif step == 15: return 3600 * 24 # 15分钟: 每次1天 elif step == 30: return 3600 * 48 # 30分钟: 每次2天 else: return 3600 * 72 # 1小时: 每次3天 def collect_period_range(self, step: int, target_start: int, target_end: int): """ 抓取指定时间范围的K线数据(支持断点续传) :param step: K线周期(分钟) :param target_start: 目标开始时间戳(秒) :param target_end: 目标结束时间戳(秒) :return: 保存的总数量 """ suffix = KLINE_CONFIGS.get(step, f'{step}m') batch_seconds = self.get_batch_seconds(step) # 获取数据库已有数据范围 db_earliest, db_latest = self.get_db_time_range(step) if db_earliest and db_latest: db_earliest_sec = db_earliest // 1000 db_latest_sec = db_latest // 1000 logger.info(f"[{suffix}] 数据库已有数据: " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}") else: db_earliest_sec = None db_latest_sec = None logger.info(f"[{suffix}] 数据库暂无数据") total_saved = 0 # === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start)=== if db_earliest_sec: backward_end = db_earliest_sec else: backward_end = target_end if backward_end > target_start: logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===") total_backward = backward_end - target_start current_end = backward_end fail_count = 0 max_fail = 5 while current_end > target_start and fail_count < max_fail: current_start = max(current_end - batch_seconds, target_start) # 计算进度 progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0 start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) klines = self.get_klines(step, current_start, current_end) if klines: saved = self.save_klines(step, klines) total_saved += saved logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | " f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") fail_count = 0 else: fail_count += 1 logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})") if fail_count >= max_fail: earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end)) logger.warning(f"[{suffix}] 已达到API历史数据限制,最早可获取: {earliest_date}") break current_end = current_start time.sleep(0.3) # === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end)=== if db_latest_sec: forward_start = db_latest_sec else: # 如果没有数据,从第一阶段结束的地方开始 forward_start = target_start if forward_start < target_end: logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===") total_forward = target_end - forward_start current_start = forward_start fail_count = 0 max_fail = 3 while current_start < target_end and fail_count < max_fail: current_end = min(current_start + batch_seconds, target_end) # 计算进度 progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0 start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) klines = self.get_klines(step, current_start, current_end) if klines: saved = self.save_klines(step, klines) total_saved += saved logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | " f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") fail_count = 0 else: fail_count += 1 logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})") current_start = current_end time.sleep(0.3) # 统计最终数据范围 final_earliest, final_latest = self.get_db_time_range(step) if final_earliest and final_latest: logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: " f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ " f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}") else: logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条") return total_saved def collect_from_date(self, start_date: str, periods: list = None): """ 从指定日期抓取到当前时间 :param start_date: 起始日期 'YYYY-MM-DD' :param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部 """ if periods is None: periods = list(KLINE_CONFIGS.keys()) # 计算时间范围 start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') target_start = int(start_dt.timestamp()) target_end = int(time.time()) start_str = start_dt.strftime('%Y-%m-%d') end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') logger.info(f"{'='*60}") logger.info(f"目标时间范围: {start_str} ~ {end_str}") logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}") logger.info(f"{'='*60}") results = {} for step in periods: if step not in KLINE_CONFIGS: logger.warning(f"不支持的周期: {step}分钟,跳过") continue logger.info(f"\n{'='*60}") logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线") logger.info(f"{'='*60}") saved = self.collect_period_range(step, target_start, target_end) results[KLINE_CONFIGS[step]] = saved time.sleep(1) # 不同周期之间间隔 # 打印总结 logger.info(f"\n{'='*60}") logger.info("所有周期抓取完成!统计:") for period, count in results.items(): logger.info(f" {period}: 新增 {count} 条") logger.info(f"{'='*60}") return results def get_stats(self): """获取各周期数据统计""" logger.info(f"\n{'='*60}") logger.info("数据库统计:") logger.info(f"{'='*60}") for step, model in self.models.items(): suffix = KLINE_CONFIGS.get(step, f'{step}m') try: count = model.select().count() earliest, latest = self.get_db_time_range(step) if earliest and latest: earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000)) latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000)) logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}") else: logger.info(f" {suffix:>4}: {count:>8} 条") except Exception as e: logger.error(f" {suffix}: 查询失败 - {e}") # 成交记录统计 try: trades_count = BitMartETHTrades.select().count() if trades_count > 0: earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar() latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar() earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000)) latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000)) logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}") else: logger.info(f"trades: {trades_count:>8} 条") except Exception as e: logger.error(f"trades: 查询失败 - {e}") # 秒级K线统计 try: second_count = BitMartETHSecond.select().count() if second_count > 0: earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar() latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar() earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000)) latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000)) logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}") else: logger.info(f" 1s: {second_count:>8} 条") except Exception as e: logger.error(f" 1s: 查询失败 - {e}") logger.info(f"{'='*60}") # ==================== 秒级数据相关方法 ==================== def get_trades(self, limit: int = 100): """ 获取最近成交记录 :param limit: 获取条数 :return: 成交记录列表 """ try: response = self.contractAPI.get_trades( contract_symbol=self.contract_symbol, )[0] if response['code'] != 1000: logger.error(f"获取成交记录失败: {response}") return [] trades = response.get('data', {}).get('trades', []) formatted = [] for t in trades: formatted.append({ 'id': int(t.get('trade_id', 0)), 'timestamp': int(t.get('create_time', 0)), 'price': float(t.get('deal_price', 0)), 'volume': float(t.get('deal_vol', 0)), 'side': int(t.get('way', 0)), }) return formatted except Exception as e: logger.error(f"获取成交记录异常: {e}") return [] def save_trades(self, trades: list): """保存成交记录到数据库""" new_count = 0 for trade in trades: try: _, created = BitMartETHTrades.get_or_create( id=trade['id'], defaults={ 'timestamp': trade['timestamp'], 'price': trade['price'], 'volume': trade['volume'], 'side': trade['side'], } ) if created: new_count += 1 except Exception as e: pass # 忽略重复数据 return new_count def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3): """ 实时持续采集成交记录(秒级数据源) :param duration_seconds: 采集时长(秒),默认1小时 :param interval: 采集间隔(秒),默认0.3秒 """ logger.info(f"{'='*60}") logger.info(f"开始实时采集成交记录") logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)") logger.info(f"间隔: {interval}秒") logger.info(f"{'='*60}") start_time = time.time() end_time = start_time + duration_seconds total_saved = 0 batch_count = 0 while time.time() < end_time: trades = self.get_trades(limit=100) if trades: saved = self.save_trades(trades) total_saved += saved batch_count += 1 # 每10批显示一次进度 if batch_count % 10 == 0: elapsed = time.time() - start_time remaining = end_time - time.time() latest = trades[-1] ts_str = datetime.datetime.fromtimestamp( latest['timestamp']/1000 ).strftime('%H:%M:%S') logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | " f"本批新增: {saved} | 累计: {total_saved} | " f"剩余: {remaining/60:.1f}分钟") time.sleep(interval) logger.success(f"采集完成!共新增 {total_saved} 条成交记录") # 自动聚合为秒级K线 logger.info("正在将成交记录聚合为秒级K线...") self.aggregate_trades_to_seconds() return total_saved def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None): """ 将成交记录聚合为秒级K线数据 :param start_ts: 开始时间戳(毫秒),默认全部 :param end_ts: 结束时间戳(毫秒),默认全部 :return: 聚合的秒级K线数量 """ # 构建查询 query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) if start_ts: query = query.where(BitMartETHTrades.timestamp >= start_ts) if end_ts: query = query.where(BitMartETHTrades.timestamp <= end_ts) # 按秒聚合 second_data = {} trade_count = 0 for trade in query: trade_count += 1 # 取整到秒(毫秒时间戳) second_ts = (trade.timestamp // 1000) * 1000 if second_ts not in second_data: second_data[second_ts] = { 'open': trade.price, 'high': trade.price, 'low': trade.price, 'close': trade.price, 'volume': trade.volume, 'trade_count': 1 } else: second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price) second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price) second_data[second_ts]['close'] = trade.price second_data[second_ts]['volume'] += trade.volume second_data[second_ts]['trade_count'] += 1 # 保存到数据库 saved_count = 0 for ts, ohlc in second_data.items(): try: BitMartETHSecond.insert( id=ts, open=ohlc['open'], high=ohlc['high'], low=ohlc['low'], close=ohlc['close'], volume=ohlc['volume'], trade_count=ohlc['trade_count'] ).on_conflict( conflict_target=[BitMartETHSecond.id], update={ BitMartETHSecond.open: ohlc['open'], BitMartETHSecond.high: ohlc['high'], BitMartETHSecond.low: ohlc['low'], BitMartETHSecond.close: ohlc['close'], BitMartETHSecond.volume: ohlc['volume'], BitMartETHSecond.trade_count: ohlc['trade_count'], } ).execute() saved_count += 1 except Exception as e: logger.error(f"保存秒级K线失败 {ts}: {e}") logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线") return saved_count def get_second_klines(self, start_ts: int = None, end_ts: int = None): """ 获取秒级K线数据 :param start_ts: 开始时间戳(毫秒) :param end_ts: 结束时间戳(毫秒) :return: 秒级K线列表 """ query = BitMartETHSecond.select().order_by(BitMartETHSecond.id) if start_ts: query = query.where(BitMartETHSecond.id >= start_ts) if end_ts: query = query.where(BitMartETHSecond.id <= end_ts) return [{ 'timestamp': k.id, 'open': k.open, 'high': k.high, 'low': k.low, 'close': k.close, 'volume': k.volume, 'trade_count': k.trade_count } for k in query] def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None): """ 将成交记录聚合为自定义毫秒级K线数据(不保存到数据库,直接返回) :param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒 :param start_ts: 开始时间戳(毫秒) :param end_ts: 结束时间戳(毫秒) :return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...] """ # 构建查询 query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) if start_ts: query = query.where(BitMartETHTrades.timestamp >= start_ts) if end_ts: query = query.where(BitMartETHTrades.timestamp <= end_ts) # 按指定间隔聚合 interval_data = {} trade_count = 0 for trade in query: trade_count += 1 # 取整到指定间隔 interval_ts = (trade.timestamp // interval_ms) * interval_ms if interval_ts not in interval_data: interval_data[interval_ts] = { 'open': trade.price, 'high': trade.price, 'low': trade.price, 'close': trade.price, 'volume': trade.volume, 'trade_count': 1 } else: interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price) interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price) interval_data[interval_ts]['close'] = trade.price interval_data[interval_ts]['volume'] += trade.volume interval_data[interval_ts]['trade_count'] += 1 # 转换为列表 result = [] for ts, ohlc in sorted(interval_data.items()): result.append({ 'timestamp': ts, 'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], 'open': ohlc['open'], 'high': ohlc['high'], 'low': ohlc['low'], 'close': ohlc['close'], 'volume': ohlc['volume'], 'trade_count': ohlc['trade_count'] }) logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)} 条 {interval_ms}ms K线") return result def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None): """ 获取原始成交记录(逐笔数据,毫秒级) :param start_ts: 开始时间戳(毫秒) :param end_ts: 结束时间戳(毫秒) :param limit: 最大返回条数 :return: 成交记录列表 """ query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) if start_ts: query = query.where(BitMartETHTrades.timestamp >= start_ts) if end_ts: query = query.where(BitMartETHTrades.timestamp <= end_ts) if limit: query = query.limit(limit) return [{ 'id': t.id, 'timestamp': t.timestamp, 'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], 'price': t.price, 'volume': t.volume, 'side': '买' if t.side == 1 else '卖' } for t in query] def close(self): """关闭数据库连接""" if not db.is_closed(): db.close() if __name__ == '__main__': collector = BitMartMultiKlineCollector() try: # 查看当前数据统计 collector.get_stats() # ============ 选择要执行的任务 ============ # 任务1: 抓取K线数据(1分钟~1小时周期) # 从 2025-01-01 抓取到当前时间(支持断点续传) collector.collect_from_date( start_date='2010-01-01', periods=[1, 3, 5, 15, 30, 60] # 所有周期 ) # 任务2: 实时采集秒级数据(成交记录) # 注意: 秒级数据只能实时采集,无法获取历史 # collector.collect_trades_realtime( # duration_seconds=3600, # 采集1小时 # interval=0.3 # 每0.3秒请求一次 # ) # 任务3: 将已采集的成交记录聚合为秒级K线 # collector.aggregate_trades_to_seconds() # 再次查看统计 collector.get_stats() finally: collector.close()