diff --git a/bitmart/三分之一策略交易.py b/bitmart/四分之一,五分钟,反手条件充足.py similarity index 100% rename from bitmart/三分之一策略交易.py rename to bitmart/四分之一,五分钟,反手条件充足.py diff --git a/bitmart/抓取多周期K线.py b/bitmart/抓取多周期K线.py index ff19b00..8d02539 100644 --- a/bitmart/抓取多周期K线.py +++ b/bitmart/抓取多周期K线.py @@ -1,7 +1,8 @@ """ BitMart 多周期K线数据抓取脚本 支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据 -自动创建对应的数据库表 +支持秒级价格数据(通过成交记录API) +支持断点续传,从数据库最新/最早记录继续抓取 """ import time @@ -26,6 +27,34 @@ KLINE_CONFIGS = { } +class BitMartETHTrades(Model): + """成交记录模型(秒级/毫秒级原始数据)""" + id = BigIntegerField(primary_key=True) # 成交ID + timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒) + price = FloatField() # 成交价格 + volume = FloatField() # 成交量 + side = IntegerField() # 方向: 1=买, -1=卖 + + class Meta: + database = db + table_name = 'bitmart_eth_trades' + + +class BitMartETHSecond(Model): + """秒级K线模型(由成交记录聚合而来)""" + id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + volume = FloatField(null=True) + trade_count = IntegerField(null=True) # 该秒内成交笔数 + + class Meta: + database = db + table_name = 'bitmart_eth_1s' + + def create_kline_model(step: int): """ 动态创建K线数据模型 @@ -85,67 +114,100 @@ class BitMartMultiKlineCollector: # 创建表(如果不存在) db.create_tables([model], safe=True) logger.info(f"初始化表: {model._meta.table_name}") + + # 创建成交记录表和秒级K线表 + db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True) + logger.info(f"初始化表: bitmart_eth_trades (成交记录)") + logger.info(f"初始化表: bitmart_eth_1s (秒级K线)") - def get_klines(self, step: int, start_time: int, end_time: int): + def get_db_time_range(self, step: int): """ - 获取K线数据 + 获取数据库中已有数据的时间范围 + :param step: K线周期 + :return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None) + """ + model = self.models.get(step) + if not model: + return None, None + + try: + # 获取最早记录 + earliest = model.select(fn.MIN(model.id)).scalar() + # 获取最新记录 + latest = model.select(fn.MAX(model.id)).scalar() + return earliest, latest + except Exception as e: + logger.error(f"查询数据库时间范围异常: {e}") + return None, None + + def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3): + """ + 获取K线数据(带重试) :param step: K线周期(分钟) :param start_time: 开始时间戳(秒级) :param end_time: 结束时间戳(秒级) + :param max_retries: 最大重试次数 :return: K线数据列表 """ - try: - # 确保是整数 - start_time = int(start_time) - end_time = int(end_time) - - logger.debug(f"API请求: step={step}, start={start_time}, end={end_time}") - - response = self.contractAPI.get_kline( - contract_symbol=self.contract_symbol, - step=step, - start_time=start_time, - end_time=end_time - )[0] - - if response['code'] != 1000: - logger.error(f"获取 {step}分钟 K线失败: {response}") + for attempt in range(max_retries): + try: + start_time = int(start_time) + end_time = int(end_time) + + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=step, + start_time=start_time, + end_time=end_time + )[0] + + if response['code'] != 1000: + logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}") + if attempt < max_retries - 1: + time.sleep(1) + continue + return [] + + klines = response.get('data', []) + formatted = [] + for k in klines: + timestamp_ms = int(k["timestamp"]) * 1000 + formatted.append({ + 'id': timestamp_ms, + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + + formatted.sort(key=lambda x: x['id']) + return formatted + + except Exception as e: + logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}") + if attempt < max_retries - 1: + time.sleep(2) + continue return [] - - klines = response.get('data', []) - formatted = [] - for k in klines: - timestamp_ms = int(k["timestamp"]) * 1000 - formatted.append({ - 'id': timestamp_ms, - 'open': float(k["open_price"]), - 'high': float(k["high_price"]), - 'low': float(k["low_price"]), - 'close': float(k["close_price"]) - }) - - formatted.sort(key=lambda x: x['id']) - return formatted - except Exception as e: - logger.error(f"获取 {step}分钟 K线异常: {e}") - return [] + + return [] def save_klines(self, step: int, klines: list): """ 保存K线数据到数据库 :param step: K线周期 :param klines: K线数据列表 - :return: 保存的数量 + :return: 新保存的数量 """ model = self.models.get(step) if not model: logger.error(f"未找到 {step}分钟 的数据模型") return 0 - saved_count = 0 + new_count = 0 for kline in klines: try: - model.get_or_create( + _, created = model.get_or_create( id=kline['id'], defaults={ 'open': kline['open'], @@ -154,87 +216,164 @@ class BitMartMultiKlineCollector: 'close': kline['close'], } ) - saved_count += 1 + if created: + new_count += 1 except Exception as e: - logger.error(f"保存 {step}分钟 K线数据失败 {kline['id']}: {e}") + logger.error(f"保存K线数据失败 {kline['id']}: {e}") - return saved_count + return new_count - def collect_single_period(self, step: int, start_date: str = None, days: int = None): + def get_batch_seconds(self, step: int): + """根据周期获取合适的批次大小""" + if step == 1: + return 3600 * 4 # 1分钟: 每次4小时 + elif step == 3: + return 3600 * 8 # 3分钟: 每次8小时 + elif step == 5: + return 3600 * 12 # 5分钟: 每次12小时 + elif step == 15: + return 3600 * 24 # 15分钟: 每次1天 + elif step == 30: + return 3600 * 48 # 30分钟: 每次2天 + else: + return 3600 * 72 # 1小时: 每次3天 + + def collect_period_range(self, step: int, target_start: int, target_end: int): """ - 抓取单个周期的历史数据(从当前时间向前抓取,直到遇到API限制) + 抓取指定时间范围的K线数据(支持断点续传) :param step: K线周期(分钟) - :param start_date: 起始日期 'YYYY-MM-DD'(目标,可能无法达到) - :param days: 抓取天数(目标,可能无法达到) + :param target_start: 目标开始时间戳(秒) + :param target_end: 目标结束时间戳(秒) + :return: 保存的总数量 """ suffix = KLINE_CONFIGS.get(step, f'{step}m') - now = int(time.time()) + batch_seconds = self.get_batch_seconds(step) - if start_date: - start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') - target_start_time = int(start_dt.timestamp()) - logger.info(f"开始抓取 {suffix} K线数据: 目标从 {start_date} 开始(从现在向前抓取)") - elif days: - target_start_time = now - 3600 * 24 * days - logger.info(f"开始抓取 {suffix} K线数据: 目标最近 {days} 天") - else: - target_start_time = now - 3600 * 24 * 30 - logger.info(f"开始抓取 {suffix} K线数据: 目标最近 30 天") + # 获取数据库已有数据范围 + db_earliest, db_latest = self.get_db_time_range(step) - # 根据周期调整批次大小 - if step <= 5: - batch_seconds = 3600 * 6 # 小周期每次6小时 - elif step <= 30: - batch_seconds = 3600 * 24 # 中周期每次1天 + if db_earliest and db_latest: + db_earliest_sec = db_earliest // 1000 + db_latest_sec = db_latest // 1000 + logger.info(f"[{suffix}] 数据库已有数据: " + f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ " + f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}") else: - batch_seconds = 3600 * 24 * 3 # 大周期每次3天 + db_earliest_sec = None + db_latest_sec = None + logger.info(f"[{suffix}] 数据库暂无数据") total_saved = 0 - fail_count = 0 - max_fail = 3 - # 从当前时间向前抓取 - current_end = now - while current_end > target_start_time: - current_start = current_end - batch_seconds - - # 打印时间范围 - start_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start)) - end_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end)) - logger.info(f"[{suffix}] 抓取: {start_str} -> {end_str}") - - klines = self.get_klines(step, current_start, current_end) - if klines: - saved = self.save_klines(step, klines) - total_saved += saved - logger.info(f"[{suffix}] 保存 {saved} 条,累计 {total_saved} 条") - fail_count = 0 - else: - fail_count += 1 - logger.warning(f"[{suffix}] 未获取到数据 (连续失败 {fail_count} 次)") - if fail_count >= max_fail: - earliest = time.strftime('%Y-%m-%d', time.localtime(current_end)) - logger.warning(f"[{suffix}] 达到API历史数据限制,最早可用数据约 {earliest}") - break - - current_end = current_start - time.sleep(0.3) # API请求间隔 + # === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start)=== + if db_earliest_sec: + backward_end = db_earliest_sec + else: + backward_end = target_end + + if backward_end > target_start: + logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===") + total_backward = backward_end - target_start + + current_end = backward_end + fail_count = 0 + max_fail = 5 + + while current_end > target_start and fail_count < max_fail: + current_start = max(current_end - batch_seconds, target_start) + + # 计算进度 + progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0 + start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) + end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) + + klines = self.get_klines(step, current_start, current_end) + if klines: + saved = self.save_klines(step, klines) + total_saved += saved + logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | " + f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") + fail_count = 0 + else: + fail_count += 1 + logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})") + if fail_count >= max_fail: + earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end)) + logger.warning(f"[{suffix}] 已达到API历史数据限制,最早可获取: {earliest_date}") + break + + current_end = current_start + time.sleep(0.3) + + # === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end)=== + if db_latest_sec: + forward_start = db_latest_sec + else: + # 如果没有数据,从第一阶段结束的地方开始 + forward_start = target_start + + if forward_start < target_end: + logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===") + total_forward = target_end - forward_start + + current_start = forward_start + fail_count = 0 + max_fail = 3 + + while current_start < target_end and fail_count < max_fail: + current_end = min(current_start + batch_seconds, target_end) + + # 计算进度 + progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0 + start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) + end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) + + klines = self.get_klines(step, current_start, current_end) + if klines: + saved = self.save_klines(step, klines) + total_saved += saved + logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | " + f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") + fail_count = 0 + else: + fail_count += 1 + logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})") + + current_start = current_end + time.sleep(0.3) + + # 统计最终数据范围 + final_earliest, final_latest = self.get_db_time_range(step) + if final_earliest and final_latest: + logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: " + f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ " + f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}") + else: + logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条") - logger.success(f"[{suffix}] 抓取完成,共保存 {total_saved} 条数据") return total_saved - def collect_all_periods(self, start_date: str = None, days: int = None, - periods: list = None): + def collect_from_date(self, start_date: str, periods: list = None): """ - 抓取所有周期的历史数据 + 从指定日期抓取到当前时间 :param start_date: 起始日期 'YYYY-MM-DD' - :param days: 抓取天数 :param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部 """ if periods is None: periods = list(KLINE_CONFIGS.keys()) - logger.info(f"开始抓取多周期K线数据,周期: {[KLINE_CONFIGS[p] for p in periods]}") + # 计算时间范围 + start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') + target_start = int(start_dt.timestamp()) + target_end = int(time.time()) + + start_str = start_dt.strftime('%Y-%m-%d') + end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') + + logger.info(f"{'='*60}") + logger.info(f"目标时间范围: {start_str} ~ {end_str}") + logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}") + logger.info(f"{'='*60}") results = {} for step in periods: @@ -242,23 +381,347 @@ class BitMartMultiKlineCollector: logger.warning(f"不支持的周期: {step}分钟,跳过") continue - logger.info(f"\n{'='*50}") + logger.info(f"\n{'='*60}") logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线") - logger.info(f"{'='*50}") + logger.info(f"{'='*60}") - saved = self.collect_single_period(step, start_date, days) + saved = self.collect_period_range(step, target_start, target_end) results[KLINE_CONFIGS[step]] = saved time.sleep(1) # 不同周期之间间隔 - logger.info(f"\n{'='*50}") + # 打印总结 + logger.info(f"\n{'='*60}") logger.info("所有周期抓取完成!统计:") for period, count in results.items(): - logger.info(f" {period}: {count} 条") - logger.info(f"{'='*50}") + logger.info(f" {period}: 新增 {count} 条") + logger.info(f"{'='*60}") return results + def get_stats(self): + """获取各周期数据统计""" + logger.info(f"\n{'='*60}") + logger.info("数据库统计:") + logger.info(f"{'='*60}") + + for step, model in self.models.items(): + suffix = KLINE_CONFIGS.get(step, f'{step}m') + try: + count = model.select().count() + earliest, latest = self.get_db_time_range(step) + if earliest and latest: + earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000)) + logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f" {suffix:>4}: {count:>8} 条") + except Exception as e: + logger.error(f" {suffix}: 查询失败 - {e}") + + # 成交记录统计 + try: + trades_count = BitMartETHTrades.select().count() + if trades_count > 0: + earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar() + latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar() + earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000)) + logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f"trades: {trades_count:>8} 条") + except Exception as e: + logger.error(f"trades: 查询失败 - {e}") + + # 秒级K线统计 + try: + second_count = BitMartETHSecond.select().count() + if second_count > 0: + earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar() + latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar() + earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000)) + logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f" 1s: {second_count:>8} 条") + except Exception as e: + logger.error(f" 1s: 查询失败 - {e}") + + logger.info(f"{'='*60}") + + # ==================== 秒级数据相关方法 ==================== + + def get_trades(self, limit: int = 100): + """ + 获取最近成交记录 + :param limit: 获取条数 + :return: 成交记录列表 + """ + try: + response = self.contractAPI.get_trades( + contract_symbol=self.contract_symbol, + )[0] + + if response['code'] != 1000: + logger.error(f"获取成交记录失败: {response}") + return [] + + trades = response.get('data', {}).get('trades', []) + formatted = [] + for t in trades: + formatted.append({ + 'id': int(t.get('trade_id', 0)), + 'timestamp': int(t.get('create_time', 0)), + 'price': float(t.get('deal_price', 0)), + 'volume': float(t.get('deal_vol', 0)), + 'side': int(t.get('way', 0)), + }) + + return formatted + except Exception as e: + logger.error(f"获取成交记录异常: {e}") + return [] + + def save_trades(self, trades: list): + """保存成交记录到数据库""" + new_count = 0 + for trade in trades: + try: + _, created = BitMartETHTrades.get_or_create( + id=trade['id'], + defaults={ + 'timestamp': trade['timestamp'], + 'price': trade['price'], + 'volume': trade['volume'], + 'side': trade['side'], + } + ) + if created: + new_count += 1 + except Exception as e: + pass # 忽略重复数据 + return new_count + + def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3): + """ + 实时持续采集成交记录(秒级数据源) + :param duration_seconds: 采集时长(秒),默认1小时 + :param interval: 采集间隔(秒),默认0.3秒 + """ + logger.info(f"{'='*60}") + logger.info(f"开始实时采集成交记录") + logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)") + logger.info(f"间隔: {interval}秒") + logger.info(f"{'='*60}") + + start_time = time.time() + end_time = start_time + duration_seconds + total_saved = 0 + batch_count = 0 + + while time.time() < end_time: + trades = self.get_trades(limit=100) + if trades: + saved = self.save_trades(trades) + total_saved += saved + batch_count += 1 + + # 每10批显示一次进度 + if batch_count % 10 == 0: + elapsed = time.time() - start_time + remaining = end_time - time.time() + latest = trades[-1] + ts_str = datetime.datetime.fromtimestamp( + latest['timestamp']/1000 + ).strftime('%H:%M:%S') + logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | " + f"本批新增: {saved} | 累计: {total_saved} | " + f"剩余: {remaining/60:.1f}分钟") + + time.sleep(interval) + + logger.success(f"采集完成!共新增 {total_saved} 条成交记录") + + # 自动聚合为秒级K线 + logger.info("正在将成交记录聚合为秒级K线...") + self.aggregate_trades_to_seconds() + + return total_saved + + def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None): + """ + 将成交记录聚合为秒级K线数据 + :param start_ts: 开始时间戳(毫秒),默认全部 + :param end_ts: 结束时间戳(毫秒),默认全部 + :return: 聚合的秒级K线数量 + """ + # 构建查询 + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + + # 按秒聚合 + second_data = {} + trade_count = 0 + + for trade in query: + trade_count += 1 + # 取整到秒(毫秒时间戳) + second_ts = (trade.timestamp // 1000) * 1000 + + if second_ts not in second_data: + second_data[second_ts] = { + 'open': trade.price, + 'high': trade.price, + 'low': trade.price, + 'close': trade.price, + 'volume': trade.volume, + 'trade_count': 1 + } + else: + second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price) + second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price) + second_data[second_ts]['close'] = trade.price + second_data[second_ts]['volume'] += trade.volume + second_data[second_ts]['trade_count'] += 1 + + # 保存到数据库 + saved_count = 0 + for ts, ohlc in second_data.items(): + try: + BitMartETHSecond.insert( + id=ts, + open=ohlc['open'], + high=ohlc['high'], + low=ohlc['low'], + close=ohlc['close'], + volume=ohlc['volume'], + trade_count=ohlc['trade_count'] + ).on_conflict( + conflict_target=[BitMartETHSecond.id], + update={ + BitMartETHSecond.open: ohlc['open'], + BitMartETHSecond.high: ohlc['high'], + BitMartETHSecond.low: ohlc['low'], + BitMartETHSecond.close: ohlc['close'], + BitMartETHSecond.volume: ohlc['volume'], + BitMartETHSecond.trade_count: ohlc['trade_count'], + } + ).execute() + saved_count += 1 + except Exception as e: + logger.error(f"保存秒级K线失败 {ts}: {e}") + + logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线") + return saved_count + + def get_second_klines(self, start_ts: int = None, end_ts: int = None): + """ + 获取秒级K线数据 + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :return: 秒级K线列表 + """ + query = BitMartETHSecond.select().order_by(BitMartETHSecond.id) + if start_ts: + query = query.where(BitMartETHSecond.id >= start_ts) + if end_ts: + query = query.where(BitMartETHSecond.id <= end_ts) + + return [{ + 'timestamp': k.id, + 'open': k.open, + 'high': k.high, + 'low': k.low, + 'close': k.close, + 'volume': k.volume, + 'trade_count': k.trade_count + } for k in query] + + def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None): + """ + 将成交记录聚合为自定义毫秒级K线数据(不保存到数据库,直接返回) + :param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒 + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...] + """ + # 构建查询 + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + + # 按指定间隔聚合 + interval_data = {} + trade_count = 0 + + for trade in query: + trade_count += 1 + # 取整到指定间隔 + interval_ts = (trade.timestamp // interval_ms) * interval_ms + + if interval_ts not in interval_data: + interval_data[interval_ts] = { + 'open': trade.price, + 'high': trade.price, + 'low': trade.price, + 'close': trade.price, + 'volume': trade.volume, + 'trade_count': 1 + } + else: + interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price) + interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price) + interval_data[interval_ts]['close'] = trade.price + interval_data[interval_ts]['volume'] += trade.volume + interval_data[interval_ts]['trade_count'] += 1 + + # 转换为列表 + result = [] + for ts, ohlc in sorted(interval_data.items()): + result.append({ + 'timestamp': ts, + 'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], + 'open': ohlc['open'], + 'high': ohlc['high'], + 'low': ohlc['low'], + 'close': ohlc['close'], + 'volume': ohlc['volume'], + 'trade_count': ohlc['trade_count'] + }) + + logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)} 条 {interval_ms}ms K线") + return result + + def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None): + """ + 获取原始成交记录(逐笔数据,毫秒级) + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :param limit: 最大返回条数 + :return: 成交记录列表 + """ + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + if limit: + query = query.limit(limit) + + return [{ + 'id': t.id, + 'timestamp': t.timestamp, + 'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], + 'price': t.price, + 'volume': t.volume, + 'side': '买' if t.side == 1 else '卖' + } for t in query] + def close(self): """关闭数据库连接""" if not db.is_closed(): @@ -269,11 +732,30 @@ if __name__ == '__main__': collector = BitMartMultiKlineCollector() try: - # 抓取尽可能多的历史数据(从现在向前,直到遇到API限制自动停止) - # 目标:2025-01-01,但实际能抓取多少取决于 BitMart API 的历史数据限制 - collector.collect_all_periods( - start_date='2025-01-01', # 目标起始日期 + # 查看当前数据统计 + collector.get_stats() + + # ============ 选择要执行的任务 ============ + + # 任务1: 抓取K线数据(1分钟~1小时周期) + # 从 2025-01-01 抓取到当前时间(支持断点续传) + collector.collect_from_date( + start_date='2025-01-01', periods=[1, 3, 5, 15, 30, 60] # 所有周期 ) + + # 任务2: 实时采集秒级数据(成交记录) + # 注意: 秒级数据只能实时采集,无法获取历史 + # collector.collect_trades_realtime( + # duration_seconds=3600, # 采集1小时 + # interval=0.3 # 每0.3秒请求一次 + # ) + + # 任务3: 将已采集的成交记录聚合为秒级K线 + # collector.aggregate_trades_to_seconds() + + # 再次查看统计 + collector.get_stats() + finally: collector.close() diff --git a/models/database.db b/models/database.db index c07bbea..f8678eb 100644 Binary files a/models/database.db and b/models/database.db differ