Files
codex_jxs_code/抓取币安K线.py
2026-02-26 19:05:17 +08:00

541 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安 永续合约 ETHUSDT 多周期K线 + 秒级数据抓取
- K线: 1m、3m、5m、15m、30m、1h
- 秒级: 通过 aggTrades 逐笔成交聚合,每秒一条 (时间, 价格 OHLC)
支持断点续传
"""
import time
import datetime
from pathlib import Path
from loguru import logger
from peewee import *
try:
import requests
except ImportError:
requests = None
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
KLINE_CONFIGS = {
1: '1m',
3: '3m',
5: '5m',
15: '15m',
30: '30m',
60: '1h',
}
BINANCE_INTERVALS = {
1: '1m',
3: '3m',
5: '5m',
15: '15m',
30: '30m',
60: '1h',
}
BINANCE_BASE = 'https://fapi.binance.com'
class BinanceETHTrades(Model):
"""逐笔成交aggTrades 原始数据)"""
id = BigIntegerField(primary_key=True)
timestamp = BigIntegerField(index=True)
price = FloatField()
volume = FloatField()
side = IntegerField() # 1=买, 0=卖
class Meta:
database = db
table_name = 'binance_eth_trades'
class BinanceETHSecond(Model):
"""秒级数据每秒一条id=时间戳(毫秒取整到秒), close=该秒收盘价"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
volume = FloatField(null=True)
trade_count = IntegerField(null=True)
class Meta:
database = db
table_name = 'binance_eth_1s'
def create_binance_kline_model(step: int):
"""动态创建币安K线模型"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
tbl_name = f'binance_eth_{suffix}'
attrs = {
'id': BigIntegerField(primary_key=True),
'open': FloatField(null=True),
'high': FloatField(null=True),
'low': FloatField(null=True),
'close': FloatField(null=True),
}
meta_attrs = {'database': db, 'table_name': tbl_name}
Meta = type('Meta', (), meta_attrs)
attrs['Meta'] = Meta
name = f'BinanceETH{suffix.replace("m", "m").replace("h", "h")}'
return type(name, (Model,), attrs)
class BinanceKlineCollector:
"""币安永续合约 K线抓取器"""
def __init__(self, symbol: str = "ETHUSDT", proxy: str = None, verify_ssl: bool = True):
"""
:param symbol: 合约代码
:param proxy: 代理地址,如 'http://127.0.0.1:7890',也可通过环境变量 BINANCE_PROXY 设置
:param verify_ssl: 是否校验 SSL 证书,网络异常时可设为 False
"""
self.symbol = symbol
self.proxy = proxy or __import__('os').environ.get('BINANCE_PROXY', '').strip() or None
self.verify_ssl = verify_ssl
self._session = requests.Session() if requests else None
self.models = {}
self._init_database()
def _init_database(self):
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db.connect(reuse_if_open=True)
for step in KLINE_CONFIGS.keys():
model = create_binance_kline_model(step)
self.models[step] = model
db.create_tables([model], safe=True)
logger.info(f"初始化表: {model._meta.table_name}")
db.create_tables([BinanceETHTrades, BinanceETHSecond], safe=True)
logger.info("初始化表: binance_eth_trades, binance_eth_1s")
def get_db_time_range(self, step: int):
model = self.models.get(step)
if not model:
return None, None
try:
earliest = model.select(fn.MIN(model.id)).scalar()
latest = model.select(fn.MAX(model.id)).scalar()
return earliest, latest
except Exception as e:
logger.error(f"查询数据库异常: {e}")
return None, None
def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3):
"""
从币安 API 获取K线
:param step: 周期(分钟)
:param start_time: 开始时间戳(秒)
:param end_time: 结束时间戳(秒)
:return: [{'id', 'open', 'high', 'low', 'close'}, ...]
"""
if not requests:
logger.error("需要安装 requests: pip install requests")
return []
interval = BINANCE_INTERVALS.get(step, f'{step}m')
start_ms = int(start_time) * 1000
end_ms = int(end_time) * 1000
url = f"{BINANCE_BASE}/fapi/v1/klines"
kwargs = {'timeout': 30, 'verify': self.verify_ssl}
if self.proxy:
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
sess = self._session
for attempt in range(max_retries):
try:
params = {
'symbol': self.symbol,
'interval': interval,
'startTime': start_ms,
'endTime': end_ms,
'limit': 1500,
}
r = (sess or requests).get(url, params=params, **kwargs)
r.raise_for_status()
data = r.json()
formatted = []
for k in data:
# [open_time, open, high, low, close, volume, ...]
formatted.append({
'id': k[0],
'open': float(k[1]),
'high': float(k[2]),
'low': float(k[3]),
'close': float(k[4]),
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
delay = 3 + attempt * 2
logger.warning(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}{delay}s 后重试")
if attempt < max_retries - 1:
time.sleep(delay)
return []
def save_klines(self, step: int, klines: list):
model = self.models.get(step)
if not model:
return 0
new_count = 0
for kline in klines:
try:
_, created = model.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
if created:
new_count += 1
except Exception as e:
logger.error(f"保存失败 {kline['id']}: {e}")
return new_count
def get_batch_seconds(self, step: int):
if step == 1:
return 3600 * 4
elif step == 3:
return 3600 * 8
elif step == 5:
return 3600 * 12
elif step == 15:
return 3600 * 24
elif step == 30:
return 3600 * 48
else:
return 3600 * 72
def collect_period_range(self, step: int, target_start: int, target_end: int):
suffix = KLINE_CONFIGS.get(step, f'{step}m')
batch_seconds = self.get_batch_seconds(step)
db_earliest, db_latest = self.get_db_time_range(step)
if db_earliest and db_latest:
db_earliest_sec = db_earliest // 1000
db_latest_sec = db_latest // 1000
logger.info(f"[{suffix}] 数据库已有: "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}")
else:
db_earliest_sec = db_latest_sec = None
total_saved = 0
# 向前抓取历史
backward_end = db_earliest_sec if db_earliest_sec else target_end
if backward_end > target_start:
logger.info(f"[{suffix}] === 向前抓取历史 ===")
total_backward = backward_end - target_start
current_end = backward_end
fail_count = 0
while current_end > target_start and fail_count < 5:
current_start = max(current_end - batch_seconds, target_start)
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0
logger.info(f"[{suffix}] ← {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | "
f"获取 {len(klines)} 新增 {saved} | 进度 {progress:.1f}%")
fail_count = 0
else:
fail_count += 1
current_end = current_start
time.sleep(0.2)
# 向后抓取最新
forward_start = db_latest_sec if db_latest_sec else target_start
if forward_start < target_end:
logger.info(f"[{suffix}] === 向后抓取最新 ===")
current_start = forward_start
fail_count = 0
while current_start < target_end and fail_count < 3:
current_end = min(current_start + batch_seconds, target_end)
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
logger.info(f"[{suffix}] → {time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))} ~ "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))} | "
f"获取 {len(klines)} 新增 {saved}")
fail_count = 0
else:
fail_count += 1
current_start = current_end
time.sleep(0.2)
final_earliest, final_latest = self.get_db_time_range(step)
if final_earliest and final_latest:
logger.success(f"[{suffix}] 完成!本次新增 {total_saved} | "
f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ "
f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}")
return total_saved
def collect_from_date(self, start_date: str, periods: list = None):
if periods is None:
periods = list(KLINE_CONFIGS.keys())
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start = int(start_dt.timestamp())
target_end = int(time.time())
logger.info(f"币安 ETHUSDT 永续 | {start_date} ~ 当前 | 周期: {[KLINE_CONFIGS[p] for p in periods]}")
results = {}
for step in periods:
saved = self.collect_period_range(step, target_start, target_end)
results[KLINE_CONFIGS[step]] = saved
time.sleep(0.5)
logger.info("抓取完成: " + ", ".join(f"{k}: {v}" for k, v in results.items()))
return results
# ==================== 秒级数据 ====================
def get_agg_trades(self, start_ms: int, end_ms: int, max_retries: int = 3):
"""
获取逐笔成交 (aggTrades),时间范围需 <= 1 小时
:return: [{'id', 'timestamp', 'price', 'volume', 'side'}, ...]
"""
if not requests:
return []
url = f"{BINANCE_BASE}/fapi/v1/aggTrades"
kwargs = {'timeout': 30, 'verify': self.verify_ssl}
if self.proxy:
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
sess = self._session
for attempt in range(max_retries):
try:
params = {
'symbol': self.symbol,
'startTime': start_ms,
'endTime': end_ms,
'limit': 1000,
}
r = (sess or requests).get(url, params=params, **kwargs)
r.raise_for_status()
data = r.json()
out = []
for t in data:
out.append({
'id': t['a'],
'timestamp': t['T'],
'price': float(t['p']),
'volume': float(t['q']),
'side': 0 if t.get('m', False) else 1,
})
return out
except Exception as e:
delay = 3 + attempt * 2
logger.warning(f"获取 aggTrades 异常 ({attempt+1}/{max_retries}): {e}{delay}s 后重试")
if attempt < max_retries - 1:
time.sleep(delay)
return []
def save_trades(self, trades: list):
n = 0
for t in trades:
try:
_, created = BinanceETHTrades.get_or_create(
id=t['id'],
defaults={'timestamp': t['timestamp'], 'price': t['price'],
'volume': t['volume'], 'side': t['side']}
)
if created:
n += 1
except Exception:
pass
return n
def get_second_db_range(self):
try:
lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar()
hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar()
return lo, hi
except Exception:
return None, None
def aggregate_trades_to_seconds(self, start_ms: int = None, end_ms: int = None):
"""将 binance_eth_trades 聚合为秒级 K 线,写入 binance_eth_1s"""
q = BinanceETHTrades.select().order_by(BinanceETHTrades.timestamp)
if start_ms:
q = q.where(BinanceETHTrades.timestamp >= start_ms)
if end_ms:
q = q.where(BinanceETHTrades.timestamp <= end_ms)
sec_data = {}
for t in q:
sec_ts = (t.timestamp // 1000) * 1000
if sec_ts not in sec_data:
sec_data[sec_ts] = {
'open': t.price, 'high': t.price, 'low': t.price, 'close': t.price,
'volume': t.volume, 'trade_count': 1
}
else:
sec_data[sec_ts]['high'] = max(sec_data[sec_ts]['high'], t.price)
sec_data[sec_ts]['low'] = min(sec_data[sec_ts]['low'], t.price)
sec_data[sec_ts]['close'] = t.price
sec_data[sec_ts]['volume'] += t.volume
sec_data[sec_ts]['trade_count'] += 1
n = 0
for ts, d in sec_data.items():
try:
BinanceETHSecond.insert(
id=ts, open=d['open'], high=d['high'], low=d['low'], close=d['close'],
volume=d['volume'], trade_count=d['trade_count']
).on_conflict(
conflict_target=[BinanceETHSecond.id],
update={BinanceETHSecond.open: d['open'], BinanceETHSecond.high: d['high'],
BinanceETHSecond.low: d['low'], BinanceETHSecond.close: d['close'],
BinanceETHSecond.volume: d['volume'],
BinanceETHSecond.trade_count: d['trade_count']}
).execute()
n += 1
except Exception as e:
logger.error(f"保存秒级失败 {ts}: {e}")
logger.info(f"聚合: {len(sec_data)} 条秒级数据")
return n
def collect_second_data(self, start_date: str, end_date: str = None):
"""
抓取秒级数据:逐小时拉 aggTrades聚合为秒级后入库
:param start_date: 'YYYY-MM-DD'
:param end_date: 'YYYY-MM-DD' 或 None(当前)
"""
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
start_ts = int(start_dt.timestamp())
end_ts = int(time.time()) if end_date is None else int(
datetime.datetime.strptime(end_date, '%Y-%m-%d').timestamp())
start_ms = start_ts * 1000
end_ms = end_ts * 1000
# 每次最多 1 小时
chunk_ms = 3600 * 1000
lo, hi = self.get_second_db_range()
if lo and hi:
# 向后补
fetch_start = hi + 1000
else:
fetch_start = start_ms
total_trades = 0
total_saved = 0
current = fetch_start
while current < end_ms:
chunk_end = min(current + chunk_ms, end_ms)
trades = self.get_agg_trades(current, chunk_end)
if trades:
saved = self.save_trades(trades)
total_trades += len(trades)
total_saved += saved
ts_str = datetime.datetime.fromtimestamp(current/1000).strftime('%Y-%m-%d %H:%M')
logger.info(f"[1s] {ts_str} | 成交 {len(trades)} 条, 新增 {saved}")
current = chunk_end
time.sleep(0.15)
if total_trades > 0:
self.aggregate_trades_to_seconds(start_ms=fetch_start, end_ms=end_ms)
logger.success(f"秒级抓取完成: 成交 {total_trades}, 新增 {total_saved}")
return total_saved
def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.5):
"""实时采集逐笔成交(秒级数据源)"""
url = f"{BINANCE_BASE}/fapi/v1/aggTrades"
kwargs = {'timeout': 10, 'verify': self.verify_ssl}
if self.proxy:
kwargs['proxies'] = {'http': self.proxy, 'https': self.proxy}
sess = self._session
start = time.time()
end = start + duration_seconds
total = 0
last_id = None
while time.time() < end:
try:
params = {'symbol': self.symbol, 'limit': 1000}
if last_id:
params['fromId'] = last_id + 1
r = (sess or requests).get(url, params=params, **kwargs)
data = r.json()
if data:
trades = [{'id': t['a'], 'timestamp': t['T'], 'price': float(t['p']),
'volume': float(t['q']), 'side': 0 if t.get('m') else 1} for t in data]
saved = self.save_trades(trades)
total += saved
last_id = data[-1]['a']
if saved > 0 and total % 500 == 0:
logger.info(f"[实时] 累计新增 {total} | 价格 {trades[-1]['price']:.2f}")
except Exception as e:
logger.warning(f"实时采集异常: {e}")
time.sleep(interval)
self.aggregate_trades_to_seconds()
logger.success(f"实时采集完成: 新增 {total}")
return total
def get_stats(self):
logger.info("币安 K线 数据库统计:")
for step, model in self.models.items():
try:
count = model.select().count()
earliest, latest = self.get_db_time_range(step)
if earliest and latest:
s = f" {KLINE_CONFIGS[step]:>4}: {count:>8} 条 | " \
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))} ~ " \
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))}"
else:
s = f" {KLINE_CONFIGS[step]:>4}: {count:>8}"
logger.info(s)
except Exception as e:
logger.error(f" {KLINE_CONFIGS[step]}: {e}")
try:
tc = BinanceETHTrades.select().count()
sc = BinanceETHSecond.select().count()
if sc > 0:
lo = BinanceETHSecond.select(fn.MIN(BinanceETHSecond.id)).scalar()
hi = BinanceETHSecond.select(fn.MAX(BinanceETHSecond.id)).scalar()
logger.info(f" trades: {tc:>8} | 1s: {sc:>8} | "
f"{datetime.datetime.fromtimestamp(lo/1000).strftime('%Y-%m-%d %H:%M:%S')} ~ "
f"{datetime.datetime.fromtimestamp(hi/1000).strftime('%Y-%m-%d %H:%M:%S')}")
else:
logger.info(f" trades: {tc:>8} | 1s: {sc:>8}")
except Exception as e:
logger.error(f" trades/1s: {e}")
def close(self):
if not db.is_closed():
db.close()
if __name__ == '__main__':
import os
# 代理示例: export BINANCE_PROXY='http://127.0.0.1:7890'
# 或: collector = BinanceKlineCollector(proxy='http://127.0.0.1:7890', verify_ssl=False)
collector = BinanceKlineCollector(
symbol="ETHUSDT",
proxy=os.environ.get('BINANCE_PROXY') or None,
verify_ssl=os.environ.get('BINANCE_VERIFY_SSL', '1') != '0',
)
try:
collector.get_stats()
# 可选: K线
# collector.collect_from_date(start_date='2020-01-01', periods=[1, 3, 5, 15, 30, 60])
# 秒级数据:历史按天抓取(从数据库最新继续)
collector.collect_second_data(start_date='2025-01-01')
# 或实时采集一段时间
# collector.collect_trades_realtime(duration_seconds=3600, interval=0.5)
collector.get_stats()
finally:
collector.close()