""" 基于 ccxt 的币安永续 ETHUSDT K线 + 秒级数据抓取 - K线: 1m、3m、5m、15m、30m、1h - 秒级: fetch_trades 逐笔成交聚合 支持代理,与 抓取币安K线.py 共用同一数据库表 """ import time import datetime import os from pathlib import Path from loguru import logger from peewee import * try: import ccxt except ImportError: ccxt = None DB_PATH = Path(__file__).parent / 'models' / 'database.db' db = SqliteDatabase(str(DB_PATH)) KLINE_CONFIGS = {1: '1m', 3: '3m', 5: '5m', 15: '15m', 30: '30m', 60: '1h'} CCXT_INTERVALS = {1: '1m', 3: '3m', 5: '5m', 15: '15m', 30: '30m', 60: '1h'} class BinanceETHTrades(Model): id = BigIntegerField(primary_key=True) timestamp = BigIntegerField(index=True) price = FloatField() volume = FloatField() side = IntegerField() class Meta: database = db table_name = 'binance_eth_trades' class BinanceETHSecond(Model): id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) volume = FloatField(null=True) trade_count = IntegerField(null=True) class Meta: database = db table_name = 'binance_eth_1s' def create_kline_model(step: int): suffix = KLINE_CONFIGS.get(step, f'{step}m') tbl_name = f'binance_eth_{suffix}' attrs = { 'id': BigIntegerField(primary_key=True), 'open': FloatField(null=True), 'high': FloatField(null=True), 'low': FloatField(null=True), 'close': FloatField(null=True), } meta = type('Meta', (), {'database': db, 'table_name': tbl_name}) attrs['Meta'] = meta return type(f'BinanceETH{suffix}', (Model,), attrs) class BinanceCCXTCollector: """基于 ccxt 的币安永续 K 线抓取""" def __init__(self, symbol: str = "ETHUSDT", proxy: str = None): self.symbol = symbol self.ccxt_symbol = "ETH/USDT:USDT" self.proxy = proxy or os.environ.get('BINANCE_PROXY', '').strip() or None self.models = {} self.exchange = None self._init_exchange() self._init_database() def _init_exchange(self): if not ccxt: raise ImportError("请安装 ccxt: pip install ccxt") options = {'defaultType': 'future', 'adjustForTimeDifference': True} config = {'timeout': 30000, 'enableRateLimit': True} if self.proxy: config['proxy'] = self.proxy self.exchange = ccxt.binanceusdm(config=config) self.exchange.options.update(options) logger.info(f"ccxt 交易所: binanceusdm" + (f" 代理: {self.proxy}" if self.proxy else "")) def _init_database(self): DB_PATH.parent.mkdir(parents=True, exist_ok=True) db.connect(reuse_if_open=True) for step in KLINE_CONFIGS.keys(): model = create_kline_model(step) self.models[step] = model db.create_tables([model], safe=True) db.create_tables([BinanceETHTrades, BinanceETHSecond], safe=True) logger.info("数据库表已就绪") def get_db_time_range(self, step: int): model = self.models.get(step) if not model: return None, None try: lo = model.select(fn.MIN(model.id)).scalar() hi = model.select(fn.MAX(model.id)).scalar() return lo, hi except Exception: return None, None def fetch_klines(self, step: int, start_sec: int, end_sec: int, max_retries: int = 5): """ccxt fetch_ohlcv,按批次拉取""" interval = CCXT_INTERVALS.get(step, f'{step}m') since_ms = int(start_sec) * 1000 end_ms = int(end_sec) * 1000 out = [] while since_ms < end_ms: for attempt in range(max_retries): try: ohlcv = self.exchange.fetch_ohlcv( self.ccxt_symbol, interval, since=since_ms, limit=1500, ) if not ohlcv: break for row in ohlcv: ts, o, h, l, c = int(row[0]), row[1], row[2], row[3], row[4] if ts >= end_ms: break out.append({'id': ts, 'open': float(o), 'high': float(h), 'low': float(l), 'close': float(c)}) since_ms = int(ohlcv[-1][0]) + 1 time.sleep(0.15) break except Exception as e: delay = 2 + attempt * 2 logger.warning(f"fetch_ohlcv 异常 ({attempt+1}/{max_retries}): {e},{delay}s 后重试") if attempt < max_retries - 1: time.sleep(delay) else: return [] return out def save_klines(self, step: int, klines: list): model = self.models.get(step) if not model: return 0 n = 0 for k in klines: try: _, created = model.get_or_create( id=k['id'], defaults={'open': k['open'], 'high': k['high'], 'low': k['low'], 'close': k['close']} ) if created: n += 1 except Exception as e: logger.error(f"保存失败 {k['id']}: {e}") return n def get_batch_seconds(self, step: int): return {1: 3600*4, 3: 3600*8, 5: 3600*12, 15: 3600*24, 30: 3600*48}.get(step, 3600*72) def collect_period_range(self, step: int, target_start: int, target_end: int): suffix = KLINE_CONFIGS.get(step, f'{step}m') batch = self.get_batch_seconds(step) db_lo, db_hi = self.get_db_time_range(step) db_lo_sec = db_lo // 1000 if db_lo else None db_hi_sec = db_hi // 1000 if db_hi else None total = 0 backward_end = db_lo_sec if db_lo_sec else target_end if backward_end > target_start: logger.info(f"[{suffix}] 向前抓取历史") current = backward_end fail = 0 while current > target_start and fail < 5: start = max(current - batch, target_start) klines = self.fetch_klines(step, start, current) if klines: saved = self.save_klines(step, klines) total += saved logger.info(f"[{suffix}] ← {time.strftime('%Y-%m-%d %H:%M', time.localtime(start))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current))} | " f"获取 {len(klines)} 新增 {saved}") fail = 0 else: fail += 1 current = start time.sleep(0.3) forward_start = db_hi_sec if db_hi_sec else target_start if forward_start < target_end: logger.info(f"[{suffix}] 向后抓取最新") current = forward_start fail = 0 while current < target_end and fail < 3: end = min(current + batch, target_end) klines = self.fetch_klines(step, current, end) if klines: saved = self.save_klines(step, klines) total += saved logger.info(f"[{suffix}] → {time.strftime('%Y-%m-%d %H:%M', time.localtime(current))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(end))} | " f"获取 {len(klines)} 新增 {saved}") fail = 0 else: fail += 1 current = end time.sleep(0.3) lo, hi = self.get_db_time_range(step) if lo and hi: logger.success(f"[{suffix}] 完成 新增 {total} | " f"{time.strftime('%Y-%m-%d', time.localtime(lo//1000))} ~ " f"{time.strftime('%Y-%m-%d', time.localtime(hi//1000))}") return total def collect_from_date(self, start_date: str, periods: list = None): if periods is None: periods = list(KLINE_CONFIGS.keys()) start_ts = int(datetime.datetime.strptime(start_date, '%Y-%m-%d').timestamp()) end_ts = int(time.time()) logger.info(f"币安 ETHUSDT 永续(ccxt) | {start_date} ~ 当前 | 周期: {[KLINE_CONFIGS[p] for p in periods]}") results = {} for step in periods: results[KLINE_CONFIGS[step]] = self.collect_period_range(step, start_ts, end_ts) time.sleep(0.5) return results # ==================== 秒级 ==================== def fetch_trades(self, since_ms: int = None, limit: int = 1000, max_retries: int = 5): for attempt in range(max_retries): try: trades = self.exchange.fetch_trades( self.ccxt_symbol, since=since_ms, limit=limit, ) out = [] for t in trades: out.append({ 'id': t['id'], 'timestamp': t['timestamp'], 'price': float(t['price']), 'volume': float(t['amount']), 'side': 1 if t.get('side') == 'buy' else 0, }) return out except Exception as e: delay = 2 + attempt * 2 logger.warning(f"fetch_trades 异常 ({attempt+1}/{max_retries}): {e},{delay}s 后重试") if attempt < max_retries - 1: time.sleep(delay) return [] def save_trades(self, trades: list): n = 0 for t in trades: try: _, created = BinanceETHTrades.get_or_create( id=t['id'], defaults={'timestamp': t['timestamp'], 'price': t['price'], 'volume': t['volume'], 'side': t['side']} ) if created: n += 1 except Exception: pass return n def aggregate_to_seconds(self, start_ms: int = None, end_ms: int = None): q = BinanceETHTrades.select().order_by(BinanceETHTrades.timestamp) if start_ms: q = q.where(BinanceETHTrades.timestamp >= start_ms) if end_ms: q = q.where(BinanceETHTrades.timestamp <= end_ms) sec_data = {} for t in q: ts = (t.timestamp // 1000) * 1000 if ts not in sec_data: sec_data[ts] = { 'open': t.price, 'high': t.price, 'low': t.price, 'close': t.price, 'volume': t.volume, 'trade_count': 1, } else: sec_data[ts]['high'] = max(sec_data[ts]['high'], t.price) sec_data[ts]['low'] = min(sec_data[ts]['low'], t.price) sec_data[ts]['close'] = t.price sec_data[ts]['volume'] += t.volume sec_data[ts]['trade_count'] += 1 for ts, d in sec_data.items(): try: BinanceETHSecond.insert( id=ts, open=d['open'], high=d['high'], low=d['low'], close=d['close'], volume=d['volume'], trade_count=d['trade_count'] ).on_conflict( conflict_target=[BinanceETHSecond.id], update={ BinanceETHSecond.open: d['open'], BinanceETHSecond.high: d['high'], BinanceETHSecond.low: d['low'], BinanceETHSecond.close: d['close'], BinanceETHSecond.volume: d['volume'], BinanceETHSecond.trade_count: d['trade_count'], } ).execute() except Exception as e: logger.error(f"保存秒级失败 {ts}: {e}") logger.info(f"聚合 {len(sec_data)} 条秒级数据") def collect_second_data(self, start_date: str, end_date: str = None): start_ts = int(datetime.datetime.strptime(start_date, '%Y-%m-%d').timestamp()) end_ts = int(time.time()) if end_date is None else int( datetime.datetime.strptime(end_date, '%Y-%m-%d').timestamp()) try: hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar() fetch_start = (hi + 1000) if hi else start_ts * 1000 except Exception: fetch_start = start_ts * 1000 total_saved = 0 since = fetch_start while since < end_ts * 1000: trades = self.fetch_trades(since_ms=since, limit=1000) if trades: saved = self.save_trades(trades) total_saved += saved since = trades[-1]['timestamp'] + 1 logger.info(f"[1s] {datetime.datetime.fromtimestamp(since/1000).strftime('%Y-%m-%d %H:%M:%S')} | " f"成交 {len(trades)} 新增 {saved}") else: since += 3600 * 1000 time.sleep(0.2) if total_saved > 0: self.aggregate_to_seconds(start_ms=fetch_start, end_ms=end_ts*1000) logger.success(f"秒级抓取完成 新增 {total_saved}") return total_saved def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.5): end = time.time() + duration_seconds total = 0 since = None while time.time() < end: try: trades = self.fetch_trades(since_ms=since, limit=1000) if trades: total += self.save_trades(trades) since = trades[-1]['timestamp'] + 1 except Exception as e: logger.warning(f"实时采集异常: {e}") time.sleep(interval) self.aggregate_to_seconds() logger.success(f"实时采集完成 新增 {total}") return total def get_stats(self): logger.info("币安(ccxt) 数据库统计:") for step, model in self.models.items(): try: c = model.select().count() lo, hi = self.get_db_time_range(step) s = f" {KLINE_CONFIGS[step]:>4}: {c:>8} 条" if lo and hi: s += f" | {time.strftime('%Y-%m-%d %H:%M', time.localtime(lo//1000))} ~ " \ f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(hi//1000))}" logger.info(s) except Exception as e: logger.error(f" {KLINE_CONFIGS[step]}: {e}") try: tc = BinanceETHTrades.select().count() sc = BinanceETHSecond.select().count() if sc > 0: lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar() hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar() logger.info(f" trades: {tc:>8} | 1s: {sc:>8} | " f"{datetime.datetime.fromtimestamp(lo/1000).strftime('%Y-%m-%d %H:%M:%S')} ~ " f"{datetime.datetime.fromtimestamp(hi/1000).strftime('%Y-%m-%d %H:%M:%S')}") else: logger.info(f" trades: {tc:>8} | 1s: {sc:>8}") except Exception as e: logger.error(f" trades/1s: {e}") def close(self): if not db.is_closed(): db.close() if __name__ == '__main__': proxy = os.environ.get('BINANCE_PROXY') or None collector = BinanceCCXTCollector(symbol="ETHUSDT", proxy=proxy) try: collector.get_stats() # K线(不指定 periods 则抓取全部:1m、3m、5m、15m、30m、1h) collector.collect_from_date(start_date='2025-01-01') # 秒级 collector.collect_second_data(start_date='2025-01-01') collector.get_stats() finally: collector.close()