""" 币安 永续合约 ETHUSDT 多周期K线 + 秒级数据抓取 - K线: 1m、3m、5m、15m、30m、1h - 秒级: 通过 aggTrades 逐笔成交聚合,每秒一条 (时间, 价格 OHLC) 支持断点续传 """ import time import datetime from pathlib import Path from loguru import logger from peewee import * try: import requests except ImportError: requests = None DB_PATH = Path(__file__).parent / 'models' / 'database.db' db = SqliteDatabase(str(DB_PATH)) KLINE_CONFIGS = { 1: '1m', 3: '3m', 5: '5m', 15: '15m', 30: '30m', 60: '1h', } BINANCE_INTERVALS = { 1: '1m', 3: '3m', 5: '5m', 15: '15m', 30: '30m', 60: '1h', } BINANCE_BASE = 'https://fapi.binance.com' class BinanceETHTrades(Model): """逐笔成交(aggTrades 原始数据)""" id = BigIntegerField(primary_key=True) timestamp = BigIntegerField(index=True) price = FloatField() volume = FloatField() side = IntegerField() # 1=买, 0=卖 class Meta: database = db table_name = 'binance_eth_trades' class BinanceETHSecond(Model): """秒级数据:每秒一条,id=时间戳(毫秒取整到秒), close=该秒收盘价""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) volume = FloatField(null=True) trade_count = IntegerField(null=True) class Meta: database = db table_name = 'binance_eth_1s' def create_binance_kline_model(step: int): """动态创建币安K线模型""" suffix = KLINE_CONFIGS.get(step, f'{step}m') tbl_name = f'binance_eth_{suffix}' attrs = { 'id': BigIntegerField(primary_key=True), 'open': FloatField(null=True), 'high': FloatField(null=True), 'low': FloatField(null=True), 'close': FloatField(null=True), } meta_attrs = {'database': db, 'table_name': tbl_name} Meta = type('Meta', (), meta_attrs) attrs['Meta'] = Meta name = f'BinanceETH{suffix.replace("m", "m").replace("h", "h")}' return type(name, (Model,), attrs) class BinanceKlineCollector: """币安永续合约 K线抓取器""" def __init__(self, symbol: str = "ETHUSDT", proxy: str = None, verify_ssl: bool = True): """ :param symbol: 合约代码 :param proxy: 代理地址,如 'http://127.0.0.1:7890',也可通过环境变量 BINANCE_PROXY 设置 :param verify_ssl: 是否校验 SSL 证书,网络异常时可设为 False """ self.symbol = symbol self.proxy = proxy or __import__('os').environ.get('BINANCE_PROXY', '').strip() or None self.verify_ssl = verify_ssl self._session = requests.Session() if requests else None self.models = {} self._init_database() def _init_database(self): DB_PATH.parent.mkdir(parents=True, exist_ok=True) db.connect(reuse_if_open=True) for step in KLINE_CONFIGS.keys(): model = create_binance_kline_model(step) self.models[step] = model db.create_tables([model], safe=True) logger.info(f"初始化表: {model._meta.table_name}") db.create_tables([BinanceETHTrades, BinanceETHSecond], safe=True) logger.info("初始化表: binance_eth_trades, binance_eth_1s") def get_db_time_range(self, step: int): model = self.models.get(step) if not model: return None, None try: earliest = model.select(fn.MIN(model.id)).scalar() latest = model.select(fn.MAX(model.id)).scalar() return earliest, latest except Exception as e: logger.error(f"查询数据库异常: {e}") return None, None def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3): """ 从币安 API 获取K线 :param step: 周期(分钟) :param start_time: 开始时间戳(秒) :param end_time: 结束时间戳(秒) :return: [{'id', 'open', 'high', 'low', 'close'}, ...] """ if not requests: logger.error("需要安装 requests: pip install requests") return [] interval = BINANCE_INTERVALS.get(step, f'{step}m') start_ms = int(start_time) * 1000 end_ms = int(end_time) * 1000 url = f"{BINANCE_BASE}/fapi/v1/klines" kwargs = {'timeout': 30, 'verify': self.verify_ssl} if self.proxy: kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy} sess = self._session for attempt in range(max_retries): try: params = { 'symbol': self.symbol, 'interval': interval, 'startTime': start_ms, 'endTime': end_ms, 'limit': 1500, } r = (sess or requests).get(url, params=params, **kwargs) r.raise_for_status() data = r.json() formatted = [] for k in data: # [open_time, open, high, low, close, volume, ...] formatted.append({ 'id': k[0], 'open': float(k[1]), 'high': float(k[2]), 'low': float(k[3]), 'close': float(k[4]), }) formatted.sort(key=lambda x: x['id']) return formatted except Exception as e: delay = 3 + attempt * 2 logger.warning(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e},{delay}s 后重试") if attempt < max_retries - 1: time.sleep(delay) return [] def save_klines(self, step: int, klines: list): model = self.models.get(step) if not model: return 0 new_count = 0 for kline in klines: try: _, created = model.get_or_create( id=kline['id'], defaults={ 'open': kline['open'], 'high': kline['high'], 'low': kline['low'], 'close': kline['close'], } ) if created: new_count += 1 except Exception as e: logger.error(f"保存失败 {kline['id']}: {e}") return new_count def get_batch_seconds(self, step: int): if step == 1: return 3600 * 4 elif step == 3: return 3600 * 8 elif step == 5: return 3600 * 12 elif step == 15: return 3600 * 24 elif step == 30: return 3600 * 48 else: return 3600 * 72 def collect_period_range(self, step: int, target_start: int, target_end: int): suffix = KLINE_CONFIGS.get(step, f'{step}m') batch_seconds = self.get_batch_seconds(step) db_earliest, db_latest = self.get_db_time_range(step) if db_earliest and db_latest: db_earliest_sec = db_earliest // 1000 db_latest_sec = db_latest // 1000 logger.info(f"[{suffix}] 数据库已有: " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}") else: db_earliest_sec = db_latest_sec = None total_saved = 0 # 向前抓取历史 backward_end = db_earliest_sec if db_earliest_sec else target_end if backward_end > target_start: logger.info(f"[{suffix}] === 向前抓取历史 ===") total_backward = backward_end - target_start current_end = backward_end fail_count = 0 while current_end > target_start and fail_count < 5: current_start = max(current_end - batch_seconds, target_start) klines = self.get_klines(step, current_start, current_end) if klines: saved = self.save_klines(step, klines) total_saved += saved progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0 logger.info(f"[{suffix}] ← {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | " f"获取 {len(klines)} 新增 {saved} | 进度 {progress:.1f}%") fail_count = 0 else: fail_count += 1 current_end = current_start time.sleep(0.2) # 向后抓取最新 forward_start = db_latest_sec if db_latest_sec else target_start if forward_start < target_end: logger.info(f"[{suffix}] === 向后抓取最新 ===") current_start = forward_start fail_count = 0 while current_start < target_end and fail_count < 3: current_end = min(current_start + batch_seconds, target_end) klines = self.get_klines(step, current_start, current_end) if klines: saved = self.save_klines(step, klines) total_saved += saved logger.info(f"[{suffix}] → {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ " f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | " f"获取 {len(klines)} 新增 {saved}") fail_count = 0 else: fail_count += 1 current_start = current_end time.sleep(0.2) final_earliest, final_latest = self.get_db_time_range(step) if final_earliest and final_latest: logger.success(f"[{suffix}] 完成!本次新增 {total_saved} | " f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ " f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}") return total_saved def collect_from_date(self, start_date: str, periods: list = None): if periods is None: periods = list(KLINE_CONFIGS.keys()) start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') target_start = int(start_dt.timestamp()) target_end = int(time.time()) logger.info(f"币安 ETHUSDT 永续 | {start_date} ~ 当前 | 周期: {[KLINE_CONFIGS[p] for p in periods]}") results = {} for step in periods: saved = self.collect_period_range(step, target_start, target_end) results[KLINE_CONFIGS[step]] = saved time.sleep(0.5) logger.info("抓取完成: " + ", ".join(f"{k}: {v}" for k, v in results.items())) return results # ==================== 秒级数据 ==================== def get_agg_trades(self, start_ms: int, end_ms: int, max_retries: int = 3): """ 获取逐笔成交 (aggTrades),时间范围需 <= 1 小时 :return: [{'id', 'timestamp', 'price', 'volume', 'side'}, ...] """ if not requests: return [] url = f"{BINANCE_BASE}/fapi/v1/aggTrades" kwargs = {'timeout': 30, 'verify': self.verify_ssl} if self.proxy: kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy} sess = self._session for attempt in range(max_retries): try: params = { 'symbol': self.symbol, 'startTime': start_ms, 'endTime': end_ms, 'limit': 1000, } r = (sess or requests).get(url, params=params, **kwargs) r.raise_for_status() data = r.json() out = [] for t in data: out.append({ 'id': t['a'], 'timestamp': t['T'], 'price': float(t['p']), 'volume': float(t['q']), 'side': 0 if t.get('m', False) else 1, }) return out except Exception as e: delay = 3 + attempt * 2 logger.warning(f"获取 aggTrades 异常 ({attempt+1}/{max_retries}): {e},{delay}s 后重试") if attempt < max_retries - 1: time.sleep(delay) return [] def save_trades(self, trades: list): n = 0 for t in trades: try: _, created = BinanceETHTrades.get_or_create( id=t['id'], defaults={'timestamp': t['timestamp'], 'price': t['price'], 'volume': t['volume'], 'side': t['side']} ) if created: n += 1 except Exception: pass return n def get_second_db_range(self): try: lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar() hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar() return lo, hi except Exception: return None, None def aggregate_trades_to_seconds(self, start_ms: int = None, end_ms: int = None): """将 binance_eth_trades 聚合为秒级 K 线,写入 binance_eth_1s""" q = BinanceETHTrades.select().order_by(BinanceETHTrades.timestamp) if start_ms: q = q.where(BinanceETHTrades.timestamp >= start_ms) if end_ms: q = q.where(BinanceETHTrades.timestamp <= end_ms) sec_data = {} for t in q: sec_ts = (t.timestamp // 1000) * 1000 if sec_ts not in sec_data: sec_data[sec_ts] = { 'open': t.price, 'high': t.price, 'low': t.price, 'close': t.price, 'volume': t.volume, 'trade_count': 1 } else: sec_data[sec_ts]['high'] = max(sec_data[sec_ts]['high'], t.price) sec_data[sec_ts]['low'] = min(sec_data[sec_ts]['low'], t.price) sec_data[sec_ts]['close'] = t.price sec_data[sec_ts]['volume'] += t.volume sec_data[sec_ts]['trade_count'] += 1 n = 0 for ts, d in sec_data.items(): try: BinanceETHSecond.insert( id=ts, open=d['open'], high=d['high'], low=d['low'], close=d['close'], volume=d['volume'], trade_count=d['trade_count'] ).on_conflict( conflict_target=[BinanceETHSecond.id], update={BinanceETHSecond.open: d['open'], BinanceETHSecond.high: d['high'], BinanceETHSecond.low: d['low'], BinanceETHSecond.close: d['close'], BinanceETHSecond.volume: d['volume'], BinanceETHSecond.trade_count: d['trade_count']} ).execute() n += 1 except Exception as e: logger.error(f"保存秒级失败 {ts}: {e}") logger.info(f"聚合: {len(sec_data)} 条秒级数据") return n def collect_second_data(self, start_date: str, end_date: str = None): """ 抓取秒级数据:逐小时拉 aggTrades,聚合为秒级后入库 :param start_date: 'YYYY-MM-DD' :param end_date: 'YYYY-MM-DD' 或 None(当前) """ start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') start_ts = int(start_dt.timestamp()) end_ts = int(time.time()) if end_date is None else int( datetime.datetime.strptime(end_date, '%Y-%m-%d').timestamp()) start_ms = start_ts * 1000 end_ms = end_ts * 1000 # 每次最多 1 小时 chunk_ms = 3600 * 1000 lo, hi = self.get_second_db_range() if lo and hi: # 向后补 fetch_start = hi + 1000 else: fetch_start = start_ms total_trades = 0 total_saved = 0 current = fetch_start while current < end_ms: chunk_end = min(current + chunk_ms, end_ms) trades = self.get_agg_trades(current, chunk_end) if trades: saved = self.save_trades(trades) total_trades += len(trades) total_saved += saved ts_str = datetime.datetime.fromtimestamp(current/1000).strftime('%Y-%m-%d %H:%M') logger.info(f"[1s] {ts_str} | 成交 {len(trades)} 条, 新增 {saved}") current = chunk_end time.sleep(0.15) if total_trades > 0: self.aggregate_trades_to_seconds(start_ms=fetch_start, end_ms=end_ms) logger.success(f"秒级抓取完成: 成交 {total_trades}, 新增 {total_saved}") return total_saved def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.5): """实时采集逐笔成交(秒级数据源)""" url = f"{BINANCE_BASE}/fapi/v1/aggTrades" kwargs = {'timeout': 10, 'verify': self.verify_ssl} if self.proxy: kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy} sess = self._session start = time.time() end = start + duration_seconds total = 0 last_id = None while time.time() < end: try: params = {'symbol': self.symbol, 'limit': 1000} if last_id: params['fromId'] = last_id + 1 r = (sess or requests).get(url, params=params, **kwargs) data = r.json() if data: trades = [{'id': t['a'], 'timestamp': t['T'], 'price': float(t['p']), 'volume': float(t['q']), 'side': 0 if t.get('m') else 1} for t in data] saved = self.save_trades(trades) total += saved last_id = data[-1]['a'] if saved > 0 and total % 500 == 0: logger.info(f"[实时] 累计新增 {total} | 价格 {trades[-1]['price']:.2f}") except Exception as e: logger.warning(f"实时采集异常: {e}") time.sleep(interval) self.aggregate_trades_to_seconds() logger.success(f"实时采集完成: 新增 {total} 条") return total def get_stats(self): logger.info("币安 K线 数据库统计:") for step, model in self.models.items(): try: count = model.select().count() earliest, latest = self.get_db_time_range(step) if earliest and latest: s = f" {KLINE_CONFIGS[step]:>4}: {count:>8} 条 | " \ f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))} ~ " \ f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))}" else: s = f" {KLINE_CONFIGS[step]:>4}: {count:>8} 条" logger.info(s) except Exception as e: logger.error(f" {KLINE_CONFIGS[step]}: {e}") try: tc = BinanceETHTrades.select().count() sc = BinanceETHSecond.select().count() if sc > 0: lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar() hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar() logger.info(f" trades: {tc:>8} | 1s: {sc:>8} | " f"{datetime.datetime.fromtimestamp(lo/1000).strftime('%Y-%m-%d %H:%M:%S')} ~ " f"{datetime.datetime.fromtimestamp(hi/1000).strftime('%Y-%m-%d %H:%M:%S')}") else: logger.info(f" trades: {tc:>8} | 1s: {sc:>8}") except Exception as e: logger.error(f" trades/1s: {e}") def close(self): if not db.is_closed(): db.close() if __name__ == '__main__': import os # 代理示例: export BINANCE_PROXY='http://127.0.0.1:7890' # 或: collector = BinanceKlineCollector(proxy='http://127.0.0.1:7890', verify_ssl=False) collector = BinanceKlineCollector( symbol="ETHUSDT", proxy=os.environ.get('BINANCE_PROXY') or None, verify_ssl=os.environ.get('BINANCE_VERIFY_SSL', '1') != '0', ) try: collector.get_stats() # 可选: K线 # collector.collect_from_date(start_date='2020-01-01', periods=[1, 3, 5, 15, 30, 60]) # 秒级数据:历史按天抓取(从数据库最新继续) collector.collect_second_data(start_date='2025-01-01') # 或实时采集一段时间 # collector.collect_trades_realtime(duration_seconds=3600, interval=0.5) collector.get_stats() finally: collector.close()