763 lines
30 KiB
Python
763 lines
30 KiB
Python
"""
|
||
BitMart 多周期K线数据抓取脚本
|
||
支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据
|
||
支持秒级价格数据(通过成交记录API)
|
||
支持断点续传,从数据库最新/最早记录继续抓取
|
||
"""
|
||
|
||
import time
|
||
import datetime
|
||
from pathlib import Path
|
||
from loguru import logger
|
||
from peewee import *
|
||
from bitmart.api_contract import APIContract
|
||
|
||
# 数据库配置(使用脚本所在项目目录下的 models)
|
||
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
|
||
db = SqliteDatabase(str(DB_PATH))
|
||
|
||
# K线周期配置:step值 -> 表名后缀
|
||
KLINE_CONFIGS = {
|
||
1: '1m', # 1分钟
|
||
3: '3m', # 3分钟
|
||
5: '5m', # 5分钟
|
||
15: '15m', # 15分钟
|
||
30: '30m', # 30分钟
|
||
60: '1h', # 1小时
|
||
}
|
||
|
||
|
||
class BitMartETHTrades(Model):
|
||
"""成交记录模型(秒级/毫秒级原始数据)"""
|
||
id = BigIntegerField(primary_key=True) # 成交ID
|
||
timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒)
|
||
price = FloatField() # 成交价格
|
||
volume = FloatField() # 成交量
|
||
side = IntegerField() # 方向: 1=买, -1=卖
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_trades'
|
||
|
||
|
||
class BitMartETHSecond(Model):
|
||
"""秒级K线模型(由成交记录聚合而来)"""
|
||
id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
volume = FloatField(null=True)
|
||
trade_count = IntegerField(null=True) # 该秒内成交笔数
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_1s'
|
||
|
||
|
||
def create_kline_model(step: int):
|
||
"""
|
||
动态创建K线数据模型
|
||
:param step: K线周期(分钟)
|
||
:return: Model类
|
||
"""
|
||
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||
tbl_name = f'bitmart_eth_{suffix}'
|
||
|
||
# 使用 type() 动态创建类,避免闭包问题
|
||
attrs = {
|
||
'id': BigIntegerField(primary_key=True),
|
||
'open': FloatField(null=True),
|
||
'high': FloatField(null=True),
|
||
'low': FloatField(null=True),
|
||
'close': FloatField(null=True),
|
||
}
|
||
|
||
# 创建 Meta 类
|
||
meta_attrs = {
|
||
'database': db,
|
||
'table_name': tbl_name,
|
||
}
|
||
Meta = type('Meta', (), meta_attrs)
|
||
attrs['Meta'] = Meta
|
||
|
||
# 动态创建 Model 类
|
||
model_name = f'BitMartETH{suffix.upper()}'
|
||
KlineModel = type(model_name, (Model,), attrs)
|
||
|
||
return KlineModel
|
||
|
||
|
||
class BitMartMultiKlineCollector:
|
||
"""多周期K线数据抓取器"""
|
||
|
||
def __init__(self):
|
||
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
|
||
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
|
||
self.memo = "数据抓取"
|
||
self.contract_symbol = "ETHUSDT"
|
||
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
|
||
|
||
# 存储各周期的模型
|
||
self.models = {}
|
||
|
||
# 初始化数据库连接和表
|
||
self._init_database()
|
||
|
||
def _init_database(self):
|
||
"""初始化数据库,创建所有周期的表"""
|
||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
db.connect(reuse_if_open=True)
|
||
|
||
for step in KLINE_CONFIGS.keys():
|
||
model = create_kline_model(step)
|
||
self.models[step] = model
|
||
# 创建表(如果不存在)
|
||
db.create_tables([model], safe=True)
|
||
logger.info(f"初始化表: {model._meta.table_name}")
|
||
|
||
# 创建成交记录表和秒级K线表
|
||
db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True)
|
||
logger.info(f"初始化表: bitmart_eth_trades (成交记录)")
|
||
logger.info(f"初始化表: bitmart_eth_1s (秒级K线)")
|
||
|
||
def get_db_time_range(self, step: int):
|
||
"""
|
||
获取数据库中已有数据的时间范围
|
||
:param step: K线周期
|
||
:return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None)
|
||
"""
|
||
model = self.models.get(step)
|
||
if not model:
|
||
return None, None
|
||
|
||
try:
|
||
# 获取最早记录
|
||
earliest = model.select(fn.MIN(model.id)).scalar()
|
||
# 获取最新记录
|
||
latest = model.select(fn.MAX(model.id)).scalar()
|
||
return earliest, latest
|
||
except Exception as e:
|
||
logger.error(f"查询数据库时间范围异常: {e}")
|
||
return None, None
|
||
|
||
def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3):
|
||
"""
|
||
获取K线数据(带重试)
|
||
:param step: K线周期(分钟)
|
||
:param start_time: 开始时间戳(秒级)
|
||
:param end_time: 结束时间戳(秒级)
|
||
:param max_retries: 最大重试次数
|
||
:return: K线数据列表
|
||
"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
start_time = int(start_time)
|
||
end_time = int(end_time)
|
||
|
||
response = self.contractAPI.get_kline(
|
||
contract_symbol=self.contract_symbol,
|
||
step=step,
|
||
start_time=start_time,
|
||
end_time=end_time
|
||
)[0]
|
||
|
||
if response['code'] != 1000:
|
||
logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(1)
|
||
continue
|
||
return []
|
||
|
||
klines = response.get('data', [])
|
||
formatted = []
|
||
for k in klines:
|
||
timestamp_ms = int(k["timestamp"]) * 1000
|
||
formatted.append({
|
||
'id': timestamp_ms,
|
||
'open': float(k["open_price"]),
|
||
'high': float(k["high_price"]),
|
||
'low': float(k["low_price"]),
|
||
'close': float(k["close_price"])
|
||
})
|
||
|
||
formatted.sort(key=lambda x: x['id'])
|
||
return formatted
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2)
|
||
continue
|
||
return []
|
||
|
||
return []
|
||
|
||
def save_klines(self, step: int, klines: list):
|
||
"""
|
||
保存K线数据到数据库
|
||
:param step: K线周期
|
||
:param klines: K线数据列表
|
||
:return: 新保存的数量
|
||
"""
|
||
model = self.models.get(step)
|
||
if not model:
|
||
logger.error(f"未找到 {step}分钟 的数据模型")
|
||
return 0
|
||
|
||
new_count = 0
|
||
for kline in klines:
|
||
try:
|
||
_, created = model.get_or_create(
|
||
id=kline['id'],
|
||
defaults={
|
||
'open': kline['open'],
|
||
'high': kline['high'],
|
||
'low': kline['low'],
|
||
'close': kline['close'],
|
||
}
|
||
)
|
||
if created:
|
||
new_count += 1
|
||
except Exception as e:
|
||
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
|
||
|
||
return new_count
|
||
|
||
def get_batch_seconds(self, step: int):
|
||
"""根据周期获取合适的批次大小"""
|
||
if step == 1:
|
||
return 3600 * 4 # 1分钟: 每次4小时
|
||
elif step == 3:
|
||
return 3600 * 8 # 3分钟: 每次8小时
|
||
elif step == 5:
|
||
return 3600 * 12 # 5分钟: 每次12小时
|
||
elif step == 15:
|
||
return 3600 * 24 # 15分钟: 每次1天
|
||
elif step == 30:
|
||
return 3600 * 48 # 30分钟: 每次2天
|
||
else:
|
||
return 3600 * 72 # 1小时: 每次3天
|
||
|
||
def collect_period_range(self, step: int, target_start: int, target_end: int):
|
||
"""
|
||
抓取指定时间范围的K线数据(支持断点续传)
|
||
:param step: K线周期(分钟)
|
||
:param target_start: 目标开始时间戳(秒)
|
||
:param target_end: 目标结束时间戳(秒)
|
||
:return: 保存的总数量
|
||
"""
|
||
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||
batch_seconds = self.get_batch_seconds(step)
|
||
|
||
# 获取数据库已有数据范围
|
||
db_earliest, db_latest = self.get_db_time_range(step)
|
||
|
||
if db_earliest and db_latest:
|
||
db_earliest_sec = db_earliest // 1000
|
||
db_latest_sec = db_latest // 1000
|
||
logger.info(f"[{suffix}] 数据库已有数据: "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ "
|
||
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}")
|
||
else:
|
||
db_earliest_sec = None
|
||
db_latest_sec = None
|
||
logger.info(f"[{suffix}] 数据库暂无数据")
|
||
|
||
total_saved = 0
|
||
|
||
# === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start)===
|
||
if db_earliest_sec:
|
||
backward_end = db_earliest_sec
|
||
else:
|
||
backward_end = target_end
|
||
|
||
if backward_end > target_start:
|
||
logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===")
|
||
total_backward = backward_end - target_start
|
||
|
||
current_end = backward_end
|
||
fail_count = 0
|
||
max_fail = 5
|
||
|
||
while current_end > target_start and fail_count < max_fail:
|
||
current_start = max(current_end - batch_seconds, target_start)
|
||
|
||
# 计算进度
|
||
progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0
|
||
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
|
||
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
|
||
|
||
klines = self.get_klines(step, current_start, current_end)
|
||
if klines:
|
||
saved = self.save_klines(step, klines)
|
||
total_saved += saved
|
||
logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | "
|
||
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
|
||
fail_count = 0
|
||
else:
|
||
fail_count += 1
|
||
logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})")
|
||
if fail_count >= max_fail:
|
||
earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end))
|
||
logger.warning(f"[{suffix}] 已达到API历史数据限制,最早可获取: {earliest_date}")
|
||
break
|
||
|
||
current_end = current_start
|
||
time.sleep(0.3)
|
||
|
||
# === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end)===
|
||
if db_latest_sec:
|
||
forward_start = db_latest_sec
|
||
else:
|
||
# 如果没有数据,从第一阶段结束的地方开始
|
||
forward_start = target_start
|
||
|
||
if forward_start < target_end:
|
||
logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===")
|
||
total_forward = target_end - forward_start
|
||
|
||
current_start = forward_start
|
||
fail_count = 0
|
||
max_fail = 3
|
||
|
||
while current_start < target_end and fail_count < max_fail:
|
||
current_end = min(current_start + batch_seconds, target_end)
|
||
|
||
# 计算进度
|
||
progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0
|
||
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
|
||
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
|
||
|
||
klines = self.get_klines(step, current_start, current_end)
|
||
if klines:
|
||
saved = self.save_klines(step, klines)
|
||
total_saved += saved
|
||
logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | "
|
||
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
|
||
fail_count = 0
|
||
else:
|
||
fail_count += 1
|
||
logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})")
|
||
|
||
current_start = current_end
|
||
time.sleep(0.3)
|
||
|
||
# 统计最终数据范围
|
||
final_earliest, final_latest = self.get_db_time_range(step)
|
||
if final_earliest and final_latest:
|
||
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: "
|
||
f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ "
|
||
f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}")
|
||
else:
|
||
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条")
|
||
|
||
return total_saved
|
||
|
||
def collect_from_date(self, start_date: str, periods: list = None):
|
||
"""
|
||
从指定日期抓取到当前时间
|
||
:param start_date: 起始日期 'YYYY-MM-DD'
|
||
:param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部
|
||
"""
|
||
if periods is None:
|
||
periods = list(KLINE_CONFIGS.keys())
|
||
|
||
# 计算时间范围
|
||
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
|
||
target_start = int(start_dt.timestamp())
|
||
target_end = int(time.time())
|
||
|
||
start_str = start_dt.strftime('%Y-%m-%d')
|
||
end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
|
||
|
||
logger.info(f"{'='*60}")
|
||
logger.info(f"目标时间范围: {start_str} ~ {end_str}")
|
||
logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}")
|
||
logger.info(f"{'='*60}")
|
||
|
||
results = {}
|
||
for step in periods:
|
||
if step not in KLINE_CONFIGS:
|
||
logger.warning(f"不支持的周期: {step}分钟,跳过")
|
||
continue
|
||
|
||
logger.info(f"\n{'='*60}")
|
||
logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线")
|
||
logger.info(f"{'='*60}")
|
||
|
||
saved = self.collect_period_range(step, target_start, target_end)
|
||
results[KLINE_CONFIGS[step]] = saved
|
||
|
||
time.sleep(1) # 不同周期之间间隔
|
||
|
||
# 打印总结
|
||
logger.info(f"\n{'='*60}")
|
||
logger.info("所有周期抓取完成!统计:")
|
||
for period, count in results.items():
|
||
logger.info(f" {period}: 新增 {count} 条")
|
||
logger.info(f"{'='*60}")
|
||
|
||
return results
|
||
|
||
def get_stats(self):
|
||
"""获取各周期数据统计"""
|
||
logger.info(f"\n{'='*60}")
|
||
logger.info("数据库统计:")
|
||
logger.info(f"{'='*60}")
|
||
|
||
for step, model in self.models.items():
|
||
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||
try:
|
||
count = model.select().count()
|
||
earliest, latest = self.get_db_time_range(step)
|
||
if earliest and latest:
|
||
earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))
|
||
latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))
|
||
logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||
else:
|
||
logger.info(f" {suffix:>4}: {count:>8} 条")
|
||
except Exception as e:
|
||
logger.error(f" {suffix}: 查询失败 - {e}")
|
||
|
||
# 成交记录统计
|
||
try:
|
||
trades_count = BitMartETHTrades.select().count()
|
||
if trades_count > 0:
|
||
earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar()
|
||
latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar()
|
||
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000))
|
||
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000))
|
||
logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||
else:
|
||
logger.info(f"trades: {trades_count:>8} 条")
|
||
except Exception as e:
|
||
logger.error(f"trades: 查询失败 - {e}")
|
||
|
||
# 秒级K线统计
|
||
try:
|
||
second_count = BitMartETHSecond.select().count()
|
||
if second_count > 0:
|
||
earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar()
|
||
latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar()
|
||
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000))
|
||
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000))
|
||
logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||
else:
|
||
logger.info(f" 1s: {second_count:>8} 条")
|
||
except Exception as e:
|
||
logger.error(f" 1s: 查询失败 - {e}")
|
||
|
||
logger.info(f"{'='*60}")
|
||
|
||
# ==================== 秒级数据相关方法 ====================
|
||
|
||
def get_trades(self, limit: int = 100):
|
||
"""
|
||
获取最近成交记录
|
||
:param limit: 获取条数
|
||
:return: 成交记录列表
|
||
"""
|
||
try:
|
||
response = self.contractAPI.get_trades(
|
||
contract_symbol=self.contract_symbol,
|
||
)[0]
|
||
|
||
if response['code'] != 1000:
|
||
logger.error(f"获取成交记录失败: {response}")
|
||
return []
|
||
|
||
trades = response.get('data', {}).get('trades', [])
|
||
formatted = []
|
||
for t in trades:
|
||
formatted.append({
|
||
'id': int(t.get('trade_id', 0)),
|
||
'timestamp': int(t.get('create_time', 0)),
|
||
'price': float(t.get('deal_price', 0)),
|
||
'volume': float(t.get('deal_vol', 0)),
|
||
'side': int(t.get('way', 0)),
|
||
})
|
||
|
||
return formatted
|
||
except Exception as e:
|
||
logger.error(f"获取成交记录异常: {e}")
|
||
return []
|
||
|
||
def save_trades(self, trades: list):
|
||
"""保存成交记录到数据库"""
|
||
new_count = 0
|
||
for trade in trades:
|
||
try:
|
||
_, created = BitMartETHTrades.get_or_create(
|
||
id=trade['id'],
|
||
defaults={
|
||
'timestamp': trade['timestamp'],
|
||
'price': trade['price'],
|
||
'volume': trade['volume'],
|
||
'side': trade['side'],
|
||
}
|
||
)
|
||
if created:
|
||
new_count += 1
|
||
except Exception as e:
|
||
pass # 忽略重复数据
|
||
return new_count
|
||
|
||
def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3):
|
||
"""
|
||
实时持续采集成交记录(秒级数据源)
|
||
:param duration_seconds: 采集时长(秒),默认1小时
|
||
:param interval: 采集间隔(秒),默认0.3秒
|
||
"""
|
||
logger.info(f"{'='*60}")
|
||
logger.info(f"开始实时采集成交记录")
|
||
logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)")
|
||
logger.info(f"间隔: {interval}秒")
|
||
logger.info(f"{'='*60}")
|
||
|
||
start_time = time.time()
|
||
end_time = start_time + duration_seconds
|
||
total_saved = 0
|
||
batch_count = 0
|
||
|
||
while time.time() < end_time:
|
||
trades = self.get_trades(limit=100)
|
||
if trades:
|
||
saved = self.save_trades(trades)
|
||
total_saved += saved
|
||
batch_count += 1
|
||
|
||
# 每10批显示一次进度
|
||
if batch_count % 10 == 0:
|
||
elapsed = time.time() - start_time
|
||
remaining = end_time - time.time()
|
||
latest = trades[-1]
|
||
ts_str = datetime.datetime.fromtimestamp(
|
||
latest['timestamp']/1000
|
||
).strftime('%H:%M:%S')
|
||
logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | "
|
||
f"本批新增: {saved} | 累计: {total_saved} | "
|
||
f"剩余: {remaining/60:.1f}分钟")
|
||
|
||
time.sleep(interval)
|
||
|
||
logger.success(f"采集完成!共新增 {total_saved} 条成交记录")
|
||
|
||
# 自动聚合为秒级K线
|
||
logger.info("正在将成交记录聚合为秒级K线...")
|
||
self.aggregate_trades_to_seconds()
|
||
|
||
return total_saved
|
||
|
||
def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None):
|
||
"""
|
||
将成交记录聚合为秒级K线数据
|
||
:param start_ts: 开始时间戳(毫秒),默认全部
|
||
:param end_ts: 结束时间戳(毫秒),默认全部
|
||
:return: 聚合的秒级K线数量
|
||
"""
|
||
# 构建查询
|
||
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||
if start_ts:
|
||
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||
if end_ts:
|
||
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||
|
||
# 按秒聚合
|
||
second_data = {}
|
||
trade_count = 0
|
||
|
||
for trade in query:
|
||
trade_count += 1
|
||
# 取整到秒(毫秒时间戳)
|
||
second_ts = (trade.timestamp // 1000) * 1000
|
||
|
||
if second_ts not in second_data:
|
||
second_data[second_ts] = {
|
||
'open': trade.price,
|
||
'high': trade.price,
|
||
'low': trade.price,
|
||
'close': trade.price,
|
||
'volume': trade.volume,
|
||
'trade_count': 1
|
||
}
|
||
else:
|
||
second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price)
|
||
second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price)
|
||
second_data[second_ts]['close'] = trade.price
|
||
second_data[second_ts]['volume'] += trade.volume
|
||
second_data[second_ts]['trade_count'] += 1
|
||
|
||
# 保存到数据库
|
||
saved_count = 0
|
||
for ts, ohlc in second_data.items():
|
||
try:
|
||
BitMartETHSecond.insert(
|
||
id=ts,
|
||
open=ohlc['open'],
|
||
high=ohlc['high'],
|
||
low=ohlc['low'],
|
||
close=ohlc['close'],
|
||
volume=ohlc['volume'],
|
||
trade_count=ohlc['trade_count']
|
||
).on_conflict(
|
||
conflict_target=[BitMartETHSecond.id],
|
||
update={
|
||
BitMartETHSecond.open: ohlc['open'],
|
||
BitMartETHSecond.high: ohlc['high'],
|
||
BitMartETHSecond.low: ohlc['low'],
|
||
BitMartETHSecond.close: ohlc['close'],
|
||
BitMartETHSecond.volume: ohlc['volume'],
|
||
BitMartETHSecond.trade_count: ohlc['trade_count'],
|
||
}
|
||
).execute()
|
||
saved_count += 1
|
||
except Exception as e:
|
||
logger.error(f"保存秒级K线失败 {ts}: {e}")
|
||
|
||
logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线")
|
||
return saved_count
|
||
|
||
def get_second_klines(self, start_ts: int = None, end_ts: int = None):
|
||
"""
|
||
获取秒级K线数据
|
||
:param start_ts: 开始时间戳(毫秒)
|
||
:param end_ts: 结束时间戳(毫秒)
|
||
:return: 秒级K线列表
|
||
"""
|
||
query = BitMartETHSecond.select().order_by(BitMartETHSecond.id)
|
||
if start_ts:
|
||
query = query.where(BitMartETHSecond.id >= start_ts)
|
||
if end_ts:
|
||
query = query.where(BitMartETHSecond.id <= end_ts)
|
||
|
||
return [{
|
||
'timestamp': k.id,
|
||
'open': k.open,
|
||
'high': k.high,
|
||
'low': k.low,
|
||
'close': k.close,
|
||
'volume': k.volume,
|
||
'trade_count': k.trade_count
|
||
} for k in query]
|
||
|
||
def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None):
|
||
"""
|
||
将成交记录聚合为自定义毫秒级K线数据(不保存到数据库,直接返回)
|
||
:param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒
|
||
:param start_ts: 开始时间戳(毫秒)
|
||
:param end_ts: 结束时间戳(毫秒)
|
||
:return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...]
|
||
"""
|
||
# 构建查询
|
||
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||
if start_ts:
|
||
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||
if end_ts:
|
||
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||
|
||
# 按指定间隔聚合
|
||
interval_data = {}
|
||
trade_count = 0
|
||
|
||
for trade in query:
|
||
trade_count += 1
|
||
# 取整到指定间隔
|
||
interval_ts = (trade.timestamp // interval_ms) * interval_ms
|
||
|
||
if interval_ts not in interval_data:
|
||
interval_data[interval_ts] = {
|
||
'open': trade.price,
|
||
'high': trade.price,
|
||
'low': trade.price,
|
||
'close': trade.price,
|
||
'volume': trade.volume,
|
||
'trade_count': 1
|
||
}
|
||
else:
|
||
interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price)
|
||
interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price)
|
||
interval_data[interval_ts]['close'] = trade.price
|
||
interval_data[interval_ts]['volume'] += trade.volume
|
||
interval_data[interval_ts]['trade_count'] += 1
|
||
|
||
# 转换为列表
|
||
result = []
|
||
for ts, ohlc in sorted(interval_data.items()):
|
||
result.append({
|
||
'timestamp': ts,
|
||
'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
||
'open': ohlc['open'],
|
||
'high': ohlc['high'],
|
||
'low': ohlc['low'],
|
||
'close': ohlc['close'],
|
||
'volume': ohlc['volume'],
|
||
'trade_count': ohlc['trade_count']
|
||
})
|
||
|
||
logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)} 条 {interval_ms}ms K线")
|
||
return result
|
||
|
||
def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None):
|
||
"""
|
||
获取原始成交记录(逐笔数据,毫秒级)
|
||
:param start_ts: 开始时间戳(毫秒)
|
||
:param end_ts: 结束时间戳(毫秒)
|
||
:param limit: 最大返回条数
|
||
:return: 成交记录列表
|
||
"""
|
||
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||
if start_ts:
|
||
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||
if end_ts:
|
||
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||
if limit:
|
||
query = query.limit(limit)
|
||
|
||
return [{
|
||
'id': t.id,
|
||
'timestamp': t.timestamp,
|
||
'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
||
'price': t.price,
|
||
'volume': t.volume,
|
||
'side': '买' if t.side == 1 else '卖'
|
||
} for t in query]
|
||
|
||
def close(self):
|
||
"""关闭数据库连接"""
|
||
if not db.is_closed():
|
||
db.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
collector = BitMartMultiKlineCollector()
|
||
|
||
try:
|
||
# 查看当前数据统计
|
||
collector.get_stats()
|
||
|
||
# ============ 选择要执行的任务 ============
|
||
|
||
# 任务1: 抓取K线数据(1分钟~1小时周期)
|
||
# 从 2025-01-01 抓取到当前时间(支持断点续传)
|
||
collector.collect_from_date(
|
||
start_date='2010-01-01',
|
||
periods=[1, 3, 5, 15, 30, 60] # 所有周期
|
||
)
|
||
|
||
# 任务2: 实时采集秒级数据(成交记录)
|
||
# 注意: 秒级数据只能实时采集,无法获取历史
|
||
# collector.collect_trades_realtime(
|
||
# duration_seconds=3600, # 采集1小时
|
||
# interval=0.3 # 每0.3秒请求一次
|
||
# )
|
||
|
||
# 任务3: 将已采集的成交记录聚合为秒级K线
|
||
# collector.aggregate_trades_to_seconds()
|
||
|
||
# 再次查看统计
|
||
collector.get_stats()
|
||
|
||
finally:
|
||
collector.close()
|