541 lines
21 KiB
Python
541 lines
21 KiB
Python
"""
|
||
币安 永续合约 ETHUSDT 多周期K线 + 秒级数据抓取
|
||
- K线: 1m、3m、5m、15m、30m、1h
|
||
- 秒级: 通过 aggTrades 逐笔成交聚合,每秒一条 (时间, 价格 OHLC)
|
||
支持断点续传
|
||
"""
|
||
|
||
import time
|
||
import datetime
|
||
from pathlib import Path
|
||
from loguru import logger
|
||
from peewee import *
|
||
|
||
try:
|
||
import requests
|
||
except ImportError:
|
||
requests = None
|
||
|
||
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
|
||
db = SqliteDatabase(str(DB_PATH))
|
||
|
||
KLINE_CONFIGS = {
|
||
1: '1m',
|
||
3: '3m',
|
||
5: '5m',
|
||
15: '15m',
|
||
30: '30m',
|
||
60: '1h',
|
||
}
|
||
|
||
BINANCE_INTERVALS = {
|
||
1: '1m',
|
||
3: '3m',
|
||
5: '5m',
|
||
15: '15m',
|
||
30: '30m',
|
||
60: '1h',
|
||
}
|
||
|
||
BINANCE_BASE = 'https://fapi.binance.com'
|
||
|
||
|
||
class BinanceETHTrades(Model):
|
||
"""逐笔成交(aggTrades 原始数据)"""
|
||
id = BigIntegerField(primary_key=True)
|
||
timestamp = BigIntegerField(index=True)
|
||
price = FloatField()
|
||
volume = FloatField()
|
||
side = IntegerField() # 1=买, 0=卖
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'binance_eth_trades'
|
||
|
||
|
||
class BinanceETHSecond(Model):
|
||
"""秒级数据:每秒一条,id=时间戳(毫秒取整到秒), close=该秒收盘价"""
|
||
id = BigIntegerField(primary_key=True)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
volume = FloatField(null=True)
|
||
trade_count = IntegerField(null=True)
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'binance_eth_1s'
|
||
|
||
|
||
def create_binance_kline_model(step: int):
|
||
"""动态创建币安K线模型"""
|
||
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||
tbl_name = f'binance_eth_{suffix}'
|
||
attrs = {
|
||
'id': BigIntegerField(primary_key=True),
|
||
'open': FloatField(null=True),
|
||
'high': FloatField(null=True),
|
||
'low': FloatField(null=True),
|
||
'close': FloatField(null=True),
|
||
}
|
||
meta_attrs = {'database': db, 'table_name': tbl_name}
|
||
Meta = type('Meta', (), meta_attrs)
|
||
attrs['Meta'] = Meta
|
||
name = f'BinanceETH{suffix.replace("m", "m").replace("h", "h")}'
|
||
return type(name, (Model,), attrs)
|
||
|
||
|
||
class BinanceKlineCollector:
|
||
"""币安永续合约 K线抓取器"""
|
||
|
||
def __init__(self, symbol: str = "ETHUSDT", proxy: str = None, verify_ssl: bool = True):
|
||
"""
|
||
:param symbol: 合约代码
|
||
:param proxy: 代理地址,如 'http://127.0.0.1:7890',也可通过环境变量 BINANCE_PROXY 设置
|
||
:param verify_ssl: 是否校验 SSL 证书,网络异常时可设为 False
|
||
"""
|
||
self.symbol = symbol
|
||
self.proxy = proxy or __import__('os').environ.get('BINANCE_PROXY', '').strip() or None
|
||
self.verify_ssl = verify_ssl
|
||
self._session = requests.Session() if requests else None
|
||
self.models = {}
|
||
self._init_database()
|
||
|
||
def _init_database(self):
|
||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
db.connect(reuse_if_open=True)
|
||
for step in KLINE_CONFIGS.keys():
|
||
model = create_binance_kline_model(step)
|
||
self.models[step] = model
|
||
db.create_tables([model], safe=True)
|
||
logger.info(f"初始化表: {model._meta.table_name}")
|
||
db.create_tables([BinanceETHTrades, BinanceETHSecond], safe=True)
|
||
logger.info("初始化表: binance_eth_trades, binance_eth_1s")
|
||
|
||
def get_db_time_range(self, step: int):
|
||
model = self.models.get(step)
|
||
if not model:
|
||
return None, None
|
||
try:
|
||
earliest = model.select(fn.MIN(model.id)).scalar()
|
||
latest = model.select(fn.MAX(model.id)).scalar()
|
||
return earliest, latest
|
||
except Exception as e:
|
||
logger.error(f"查询数据库异常: {e}")
|
||
return None, None
|
||
|
||
def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3):
|
||
"""
|
||
从币安 API 获取K线
|
||
:param step: 周期(分钟)
|
||
:param start_time: 开始时间戳(秒)
|
||
:param end_time: 结束时间戳(秒)
|
||
:return: [{'id', 'open', 'high', 'low', 'close'}, ...]
|
||
"""
|
||
if not requests:
|
||
logger.error("需要安装 requests: pip install requests")
|
||
return []
|
||
|
||
interval = BINANCE_INTERVALS.get(step, f'{step}m')
|
||
start_ms = int(start_time) * 1000
|
||
end_ms = int(end_time) * 1000
|
||
url = f"{BINANCE_BASE}/fapi/v1/klines"
|
||
|
||
kwargs = {'timeout': 30, 'verify': self.verify_ssl}
|
||
if self.proxy:
|
||
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
|
||
sess = self._session
|
||
for attempt in range(max_retries):
|
||
try:
|
||
params = {
|
||
'symbol': self.symbol,
|
||
'interval': interval,
|
||
'startTime': start_ms,
|
||
'endTime': end_ms,
|
||
'limit': 1500,
|
||
}
|
||
r = (sess or requests).get(url, params=params, **kwargs)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
|
||
formatted = []
|
||
for k in data:
|
||
# [open_time, open, high, low, close, volume, ...]
|
||
formatted.append({
|
||
'id': k[0],
|
||
'open': float(k[1]),
|
||
'high': float(k[2]),
|
||
'low': float(k[3]),
|
||
'close': float(k[4]),
|
||
})
|
||
formatted.sort(key=lambda x: x['id'])
|
||
return formatted
|
||
|
||
except Exception as e:
|
||
delay = 3 + attempt * 2
|
||
logger.warning(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e},{delay}s 后重试")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(delay)
|
||
return []
|
||
|
||
def save_klines(self, step: int, klines: list):
|
||
model = self.models.get(step)
|
||
if not model:
|
||
return 0
|
||
new_count = 0
|
||
for kline in klines:
|
||
try:
|
||
_, created = model.get_or_create(
|
||
id=kline['id'],
|
||
defaults={
|
||
'open': kline['open'],
|
||
'high': kline['high'],
|
||
'low': kline['low'],
|
||
'close': kline['close'],
|
||
}
|
||
)
|
||
if created:
|
||
new_count += 1
|
||
except Exception as e:
|
||
logger.error(f"保存失败 {kline['id']}: {e}")
|
||
return new_count
|
||
|
||
def get_batch_seconds(self, step: int):
|
||
if step == 1:
|
||
return 3600 * 4
|
||
elif step == 3:
|
||
return 3600 * 8
|
||
elif step == 5:
|
||
return 3600 * 12
|
||
elif step == 15:
|
||
return 3600 * 24
|
||
elif step == 30:
|
||
return 3600 * 48
|
||
else:
|
||
return 3600 * 72
|
||
|
||
def collect_period_range(self, step: int, target_start: int, target_end: int):
|
||
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||
batch_seconds = self.get_batch_seconds(step)
|
||
db_earliest, db_latest = self.get_db_time_range(step)
|
||
|
||
if db_earliest and db_latest:
|
||
db_earliest_sec = db_earliest // 1000
|
||
db_latest_sec = db_latest // 1000
|
||
logger.info(f"[{suffix}] 数据库已有: "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}")
|
||
else:
|
||
db_earliest_sec = db_latest_sec = None
|
||
|
||
total_saved = 0
|
||
|
||
# 向前抓取历史
|
||
backward_end = db_earliest_sec if db_earliest_sec else target_end
|
||
if backward_end > target_start:
|
||
logger.info(f"[{suffix}] === 向前抓取历史 ===")
|
||
total_backward = backward_end - target_start
|
||
current_end = backward_end
|
||
fail_count = 0
|
||
while current_end > target_start and fail_count < 5:
|
||
current_start = max(current_end - batch_seconds, target_start)
|
||
klines = self.get_klines(step, current_start, current_end)
|
||
if klines:
|
||
saved = self.save_klines(step, klines)
|
||
total_saved += saved
|
||
progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0
|
||
logger.info(f"[{suffix}] ← {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | "
|
||
f"获取 {len(klines)} 新增 {saved} | 进度 {progress:.1f}%")
|
||
fail_count = 0
|
||
else:
|
||
fail_count += 1
|
||
current_end = current_start
|
||
time.sleep(0.2)
|
||
|
||
# 向后抓取最新
|
||
forward_start = db_latest_sec if db_latest_sec else target_start
|
||
if forward_start < target_end:
|
||
logger.info(f"[{suffix}] === 向后抓取最新 ===")
|
||
current_start = forward_start
|
||
fail_count = 0
|
||
while current_start < target_end and fail_count < 3:
|
||
current_end = min(current_start + batch_seconds, target_end)
|
||
klines = self.get_klines(step, current_start, current_end)
|
||
if klines:
|
||
saved = self.save_klines(step, klines)
|
||
total_saved += saved
|
||
logger.info(f"[{suffix}] → {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | "
|
||
f"获取 {len(klines)} 新增 {saved}")
|
||
fail_count = 0
|
||
else:
|
||
fail_count += 1
|
||
current_start = current_end
|
||
time.sleep(0.2)
|
||
|
||
final_earliest, final_latest = self.get_db_time_range(step)
|
||
if final_earliest and final_latest:
|
||
logger.success(f"[{suffix}] 完成!本次新增 {total_saved} | "
|
||
f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ "
|
||
f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}")
|
||
return total_saved
|
||
|
||
def collect_from_date(self, start_date: str, periods: list = None):
|
||
if periods is None:
|
||
periods = list(KLINE_CONFIGS.keys())
|
||
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
|
||
target_start = int(start_dt.timestamp())
|
||
target_end = int(time.time())
|
||
|
||
logger.info(f"币安 ETHUSDT 永续 | {start_date} ~ 当前 | 周期: {[KLINE_CONFIGS[p] for p in periods]}")
|
||
results = {}
|
||
for step in periods:
|
||
saved = self.collect_period_range(step, target_start, target_end)
|
||
results[KLINE_CONFIGS[step]] = saved
|
||
time.sleep(0.5)
|
||
logger.info("抓取完成: " + ", ".join(f"{k}: {v}" for k, v in results.items()))
|
||
return results
|
||
|
||
# ==================== 秒级数据 ====================
|
||
|
||
def get_agg_trades(self, start_ms: int, end_ms: int, max_retries: int = 3):
|
||
"""
|
||
获取逐笔成交 (aggTrades),时间范围需 <= 1 小时
|
||
:return: [{'id', 'timestamp', 'price', 'volume', 'side'}, ...]
|
||
"""
|
||
if not requests:
|
||
return []
|
||
url = f"{BINANCE_BASE}/fapi/v1/aggTrades"
|
||
kwargs = {'timeout': 30, 'verify': self.verify_ssl}
|
||
if self.proxy:
|
||
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
|
||
sess = self._session
|
||
for attempt in range(max_retries):
|
||
try:
|
||
params = {
|
||
'symbol': self.symbol,
|
||
'startTime': start_ms,
|
||
'endTime': end_ms,
|
||
'limit': 1000,
|
||
}
|
||
r = (sess or requests).get(url, params=params, **kwargs)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
out = []
|
||
for t in data:
|
||
out.append({
|
||
'id': t['a'],
|
||
'timestamp': t['T'],
|
||
'price': float(t['p']),
|
||
'volume': float(t['q']),
|
||
'side': 0 if t.get('m', False) else 1,
|
||
})
|
||
return out
|
||
except Exception as e:
|
||
delay = 3 + attempt * 2
|
||
logger.warning(f"获取 aggTrades 异常 ({attempt+1}/{max_retries}): {e},{delay}s 后重试")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(delay)
|
||
return []
|
||
|
||
def save_trades(self, trades: list):
|
||
n = 0
|
||
for t in trades:
|
||
try:
|
||
_, created = BinanceETHTrades.get_or_create(
|
||
id=t['id'],
|
||
defaults={'timestamp': t['timestamp'], 'price': t['price'],
|
||
'volume': t['volume'], 'side': t['side']}
|
||
)
|
||
if created:
|
||
n += 1
|
||
except Exception:
|
||
pass
|
||
return n
|
||
|
||
def get_second_db_range(self):
|
||
try:
|
||
lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar()
|
||
hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar()
|
||
return lo, hi
|
||
except Exception:
|
||
return None, None
|
||
|
||
def aggregate_trades_to_seconds(self, start_ms: int = None, end_ms: int = None):
|
||
"""将 binance_eth_trades 聚合为秒级 K 线,写入 binance_eth_1s"""
|
||
q = BinanceETHTrades.select().order_by(BinanceETHTrades.timestamp)
|
||
if start_ms:
|
||
q = q.where(BinanceETHTrades.timestamp >= start_ms)
|
||
if end_ms:
|
||
q = q.where(BinanceETHTrades.timestamp <= end_ms)
|
||
|
||
sec_data = {}
|
||
for t in q:
|
||
sec_ts = (t.timestamp // 1000) * 1000
|
||
if sec_ts not in sec_data:
|
||
sec_data[sec_ts] = {
|
||
'open': t.price, 'high': t.price, 'low': t.price, 'close': t.price,
|
||
'volume': t.volume, 'trade_count': 1
|
||
}
|
||
else:
|
||
sec_data[sec_ts]['high'] = max(sec_data[sec_ts]['high'], t.price)
|
||
sec_data[sec_ts]['low'] = min(sec_data[sec_ts]['low'], t.price)
|
||
sec_data[sec_ts]['close'] = t.price
|
||
sec_data[sec_ts]['volume'] += t.volume
|
||
sec_data[sec_ts]['trade_count'] += 1
|
||
|
||
n = 0
|
||
for ts, d in sec_data.items():
|
||
try:
|
||
BinanceETHSecond.insert(
|
||
id=ts, open=d['open'], high=d['high'], low=d['low'], close=d['close'],
|
||
volume=d['volume'], trade_count=d['trade_count']
|
||
).on_conflict(
|
||
conflict_target=[BinanceETHSecond.id],
|
||
update={BinanceETHSecond.open: d['open'], BinanceETHSecond.high: d['high'],
|
||
BinanceETHSecond.low: d['low'], BinanceETHSecond.close: d['close'],
|
||
BinanceETHSecond.volume: d['volume'],
|
||
BinanceETHSecond.trade_count: d['trade_count']}
|
||
).execute()
|
||
n += 1
|
||
except Exception as e:
|
||
logger.error(f"保存秒级失败 {ts}: {e}")
|
||
logger.info(f"聚合: {len(sec_data)} 条秒级数据")
|
||
return n
|
||
|
||
def collect_second_data(self, start_date: str, end_date: str = None):
|
||
"""
|
||
抓取秒级数据:逐小时拉 aggTrades,聚合为秒级后入库
|
||
:param start_date: 'YYYY-MM-DD'
|
||
:param end_date: 'YYYY-MM-DD' 或 None(当前)
|
||
"""
|
||
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
|
||
start_ts = int(start_dt.timestamp())
|
||
end_ts = int(time.time()) if end_date is None else int(
|
||
datetime.datetime.strptime(end_date, '%Y-%m-%d').timestamp())
|
||
start_ms = start_ts * 1000
|
||
end_ms = end_ts * 1000
|
||
|
||
# 每次最多 1 小时
|
||
chunk_ms = 3600 * 1000
|
||
lo, hi = self.get_second_db_range()
|
||
if lo and hi:
|
||
# 向后补
|
||
fetch_start = hi + 1000
|
||
else:
|
||
fetch_start = start_ms
|
||
|
||
total_trades = 0
|
||
total_saved = 0
|
||
current = fetch_start
|
||
while current < end_ms:
|
||
chunk_end = min(current + chunk_ms, end_ms)
|
||
trades = self.get_agg_trades(current, chunk_end)
|
||
if trades:
|
||
saved = self.save_trades(trades)
|
||
total_trades += len(trades)
|
||
total_saved += saved
|
||
ts_str = datetime.datetime.fromtimestamp(current/1000).strftime('%Y-%m-%d %H:%M')
|
||
logger.info(f"[1s] {ts_str} | 成交 {len(trades)} 条, 新增 {saved}")
|
||
current = chunk_end
|
||
time.sleep(0.15)
|
||
|
||
if total_trades > 0:
|
||
self.aggregate_trades_to_seconds(start_ms=fetch_start, end_ms=end_ms)
|
||
logger.success(f"秒级抓取完成: 成交 {total_trades}, 新增 {total_saved}")
|
||
return total_saved
|
||
|
||
def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.5):
|
||
"""实时采集逐笔成交(秒级数据源)"""
|
||
url = f"{BINANCE_BASE}/fapi/v1/aggTrades"
|
||
kwargs = {'timeout': 10, 'verify': self.verify_ssl}
|
||
if self.proxy:
|
||
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
|
||
sess = self._session
|
||
start = time.time()
|
||
end = start + duration_seconds
|
||
total = 0
|
||
last_id = None
|
||
while time.time() < end:
|
||
try:
|
||
params = {'symbol': self.symbol, 'limit': 1000}
|
||
if last_id:
|
||
params['fromId'] = last_id + 1
|
||
r = (sess or requests).get(url, params=params, **kwargs)
|
||
data = r.json()
|
||
if data:
|
||
trades = [{'id': t['a'], 'timestamp': t['T'], 'price': float(t['p']),
|
||
'volume': float(t['q']), 'side': 0 if t.get('m') else 1} for t in data]
|
||
saved = self.save_trades(trades)
|
||
total += saved
|
||
last_id = data[-1]['a']
|
||
if saved > 0 and total % 500 == 0:
|
||
logger.info(f"[实时] 累计新增 {total} | 价格 {trades[-1]['price']:.2f}")
|
||
except Exception as e:
|
||
logger.warning(f"实时采集异常: {e}")
|
||
time.sleep(interval)
|
||
self.aggregate_trades_to_seconds()
|
||
logger.success(f"实时采集完成: 新增 {total} 条")
|
||
return total
|
||
|
||
def get_stats(self):
|
||
logger.info("币安 K线 数据库统计:")
|
||
for step, model in self.models.items():
|
||
try:
|
||
count = model.select().count()
|
||
earliest, latest = self.get_db_time_range(step)
|
||
if earliest and latest:
|
||
s = f" {KLINE_CONFIGS[step]:>4}: {count:>8} 条 | " \
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))} ~ " \
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))}"
|
||
else:
|
||
s = f" {KLINE_CONFIGS[step]:>4}: {count:>8} 条"
|
||
logger.info(s)
|
||
except Exception as e:
|
||
logger.error(f" {KLINE_CONFIGS[step]}: {e}")
|
||
try:
|
||
tc = BinanceETHTrades.select().count()
|
||
sc = BinanceETHSecond.select().count()
|
||
if sc > 0:
|
||
lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar()
|
||
hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar()
|
||
logger.info(f" trades: {tc:>8} | 1s: {sc:>8} | "
|
||
f"{datetime.datetime.fromtimestamp(lo/1000).strftime('%Y-%m-%d %H:%M:%S')} ~ "
|
||
f"{datetime.datetime.fromtimestamp(hi/1000).strftime('%Y-%m-%d %H:%M:%S')}")
|
||
else:
|
||
logger.info(f" trades: {tc:>8} | 1s: {sc:>8}")
|
||
except Exception as e:
|
||
logger.error(f" trades/1s: {e}")
|
||
|
||
def close(self):
|
||
if not db.is_closed():
|
||
db.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import os
|
||
# 代理示例: export BINANCE_PROXY='http://127.0.0.1:7890'
|
||
# 或: collector = BinanceKlineCollector(proxy='http://127.0.0.1:7890', verify_ssl=False)
|
||
collector = BinanceKlineCollector(
|
||
symbol="ETHUSDT",
|
||
proxy=os.environ.get('BINANCE_PROXY') or None,
|
||
verify_ssl=os.environ.get('BINANCE_VERIFY_SSL', '1') != '0',
|
||
)
|
||
try:
|
||
collector.get_stats()
|
||
|
||
# 可选: K线
|
||
# collector.collect_from_date(start_date='2020-01-01', periods=[1, 3, 5, 15, 30, 60])
|
||
|
||
# 秒级数据:历史按天抓取(从数据库最新继续)
|
||
collector.collect_second_data(start_date='2025-01-01')
|
||
|
||
# 或实时采集一段时间
|
||
# collector.collect_trades_realtime(duration_seconds=3600, interval=0.5)
|
||
|
||
collector.get_stats()
|
||
finally:
|
||
collector.close()
|