Files
codex_jxs_code/抓取多周期K线.py
2026-02-21 18:25:34 +08:00

763 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 多周期K线数据抓取脚本
支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据
支持秒级价格数据通过成交记录API
支持断点续传,从数据库最新/最早记录继续抓取
"""
import time
import datetime
from pathlib import Path
from loguru import logger
from peewee import *
from bitmart.api_contract import APIContract
# 数据库配置(使用脚本所在项目目录下的 models
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# K线周期配置step值 -> 表名后缀
KLINE_CONFIGS = {
1: '1m', # 1分钟
3: '3m', # 3分钟
5: '5m', # 5分钟
15: '15m', # 15分钟
30: '30m', # 30分钟
60: '1h', # 1小时
}
class BitMartETHTrades(Model):
"""成交记录模型(秒级/毫秒级原始数据)"""
id = BigIntegerField(primary_key=True) # 成交ID
timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒)
price = FloatField() # 成交价格
volume = FloatField() # 成交量
side = IntegerField() # 方向: 1=买, -1=卖
class Meta:
database = db
table_name = 'bitmart_eth_trades'
class BitMartETHSecond(Model):
"""秒级K线模型由成交记录聚合而来"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
volume = FloatField(null=True)
trade_count = IntegerField(null=True) # 该秒内成交笔数
class Meta:
database = db
table_name = 'bitmart_eth_1s'
def create_kline_model(step: int):
"""
动态创建K线数据模型
:param step: K线周期分钟
:return: Model类
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
tbl_name = f'bitmart_eth_{suffix}'
# 使用 type() 动态创建类,避免闭包问题
attrs = {
'id': BigIntegerField(primary_key=True),
'open': FloatField(null=True),
'high': FloatField(null=True),
'low': FloatField(null=True),
'close': FloatField(null=True),
}
# 创建 Meta 类
meta_attrs = {
'database': db,
'table_name': tbl_name,
}
Meta = type('Meta', (), meta_attrs)
attrs['Meta'] = Meta
# 动态创建 Model 类
model_name = f'BitMartETH{suffix.upper()}'
KlineModel = type(model_name, (Model,), attrs)
return KlineModel
class BitMartMultiKlineCollector:
"""多周期K线数据抓取器"""
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 存储各周期的模型
self.models = {}
# 初始化数据库连接和表
self._init_database()
def _init_database(self):
"""初始化数据库,创建所有周期的表"""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db.connect(reuse_if_open=True)
for step in KLINE_CONFIGS.keys():
model = create_kline_model(step)
self.models[step] = model
# 创建表(如果不存在)
db.create_tables([model], safe=True)
logger.info(f"初始化表: {model._meta.table_name}")
# 创建成交记录表和秒级K线表
db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True)
logger.info(f"初始化表: bitmart_eth_trades (成交记录)")
logger.info(f"初始化表: bitmart_eth_1s (秒级K线)")
def get_db_time_range(self, step: int):
"""
获取数据库中已有数据的时间范围
:param step: K线周期
:return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None)
"""
model = self.models.get(step)
if not model:
return None, None
try:
# 获取最早记录
earliest = model.select(fn.MIN(model.id)).scalar()
# 获取最新记录
latest = model.select(fn.MAX(model.id)).scalar()
return earliest, latest
except Exception as e:
logger.error(f"查询数据库时间范围异常: {e}")
return None, None
def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3):
"""
获取K线数据带重试
:param step: K线周期分钟
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:param max_retries: 最大重试次数
:return: K线数据列表
"""
for attempt in range(max_retries):
try:
start_time = int(start_time)
end_time = int(end_time)
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=step,
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}")
if attempt < max_retries - 1:
time.sleep(1)
continue
return []
klines = response.get('data', [])
formatted = []
for k in klines:
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2)
continue
return []
return []
def save_klines(self, step: int, klines: list):
"""
保存K线数据到数据库
:param step: K线周期
:param klines: K线数据列表
:return: 新保存的数量
"""
model = self.models.get(step)
if not model:
logger.error(f"未找到 {step}分钟 的数据模型")
return 0
new_count = 0
for kline in klines:
try:
_, created = model.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
if created:
new_count += 1
except Exception as e:
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
return new_count
def get_batch_seconds(self, step: int):
"""根据周期获取合适的批次大小"""
if step == 1:
return 3600 * 4 # 1分钟: 每次4小时
elif step == 3:
return 3600 * 8 # 3分钟: 每次8小时
elif step == 5:
return 3600 * 12 # 5分钟: 每次12小时
elif step == 15:
return 3600 * 24 # 15分钟: 每次1天
elif step == 30:
return 3600 * 48 # 30分钟: 每次2天
else:
return 3600 * 72 # 1小时: 每次3天
def collect_period_range(self, step: int, target_start: int, target_end: int):
"""
抓取指定时间范围的K线数据支持断点续传
:param step: K线周期分钟
:param target_start: 目标开始时间戳(秒)
:param target_end: 目标结束时间戳(秒)
:return: 保存的总数量
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
batch_seconds = self.get_batch_seconds(step)
# 获取数据库已有数据范围
db_earliest, db_latest = self.get_db_time_range(step)
if db_earliest and db_latest:
db_earliest_sec = db_earliest // 1000
db_latest_sec = db_latest // 1000
logger.info(f"[{suffix}] 数据库已有数据: "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ "
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}")
else:
db_earliest_sec = None
db_latest_sec = None
logger.info(f"[{suffix}] 数据库暂无数据")
total_saved = 0
# === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start===
if db_earliest_sec:
backward_end = db_earliest_sec
else:
backward_end = target_end
if backward_end > target_start:
logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===")
total_backward = backward_end - target_start
current_end = backward_end
fail_count = 0
max_fail = 5
while current_end > target_start and fail_count < max_fail:
current_start = max(current_end - batch_seconds, target_start)
# 计算进度
progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | "
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
fail_count = 0
else:
fail_count += 1
logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})")
if fail_count >= max_fail:
earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end))
logger.warning(f"[{suffix}] 已达到API历史数据限制最早可获取: {earliest_date}")
break
current_end = current_start
time.sleep(0.3)
# === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end===
if db_latest_sec:
forward_start = db_latest_sec
else:
# 如果没有数据,从第一阶段结束的地方开始
forward_start = target_start
if forward_start < target_end:
logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===")
total_forward = target_end - forward_start
current_start = forward_start
fail_count = 0
max_fail = 3
while current_start < target_end and fail_count < max_fail:
current_end = min(current_start + batch_seconds, target_end)
# 计算进度
progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | "
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
fail_count = 0
else:
fail_count += 1
logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})")
current_start = current_end
time.sleep(0.3)
# 统计最终数据范围
final_earliest, final_latest = self.get_db_time_range(step)
if final_earliest and final_latest:
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: "
f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ "
f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}")
else:
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved}")
return total_saved
def collect_from_date(self, start_date: str, periods: list = None):
"""
从指定日期抓取到当前时间
:param start_date: 起始日期 'YYYY-MM-DD'
:param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部
"""
if periods is None:
periods = list(KLINE_CONFIGS.keys())
# 计算时间范围
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start = int(start_dt.timestamp())
target_end = int(time.time())
start_str = start_dt.strftime('%Y-%m-%d')
end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
logger.info(f"{'='*60}")
logger.info(f"目标时间范围: {start_str} ~ {end_str}")
logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}")
logger.info(f"{'='*60}")
results = {}
for step in periods:
if step not in KLINE_CONFIGS:
logger.warning(f"不支持的周期: {step}分钟,跳过")
continue
logger.info(f"\n{'='*60}")
logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线")
logger.info(f"{'='*60}")
saved = self.collect_period_range(step, target_start, target_end)
results[KLINE_CONFIGS[step]] = saved
time.sleep(1) # 不同周期之间间隔
# 打印总结
logger.info(f"\n{'='*60}")
logger.info("所有周期抓取完成!统计:")
for period, count in results.items():
logger.info(f" {period}: 新增 {count}")
logger.info(f"{'='*60}")
return results
def get_stats(self):
"""获取各周期数据统计"""
logger.info(f"\n{'='*60}")
logger.info("数据库统计:")
logger.info(f"{'='*60}")
for step, model in self.models.items():
suffix = KLINE_CONFIGS.get(step, f'{step}m')
try:
count = model.select().count()
earliest, latest = self.get_db_time_range(step)
if earliest and latest:
earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))
latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))
logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}")
else:
logger.info(f" {suffix:>4}: {count:>8}")
except Exception as e:
logger.error(f" {suffix}: 查询失败 - {e}")
# 成交记录统计
try:
trades_count = BitMartETHTrades.select().count()
if trades_count > 0:
earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar()
latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar()
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000))
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000))
logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}")
else:
logger.info(f"trades: {trades_count:>8}")
except Exception as e:
logger.error(f"trades: 查询失败 - {e}")
# 秒级K线统计
try:
second_count = BitMartETHSecond.select().count()
if second_count > 0:
earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar()
latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar()
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000))
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000))
logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}")
else:
logger.info(f" 1s: {second_count:>8}")
except Exception as e:
logger.error(f" 1s: 查询失败 - {e}")
logger.info(f"{'='*60}")
# ==================== 秒级数据相关方法 ====================
def get_trades(self, limit: int = 100):
"""
获取最近成交记录
:param limit: 获取条数
:return: 成交记录列表
"""
try:
response = self.contractAPI.get_trades(
contract_symbol=self.contract_symbol,
)[0]
if response['code'] != 1000:
logger.error(f"获取成交记录失败: {response}")
return []
trades = response.get('data', {}).get('trades', [])
formatted = []
for t in trades:
formatted.append({
'id': int(t.get('trade_id', 0)),
'timestamp': int(t.get('create_time', 0)),
'price': float(t.get('deal_price', 0)),
'volume': float(t.get('deal_vol', 0)),
'side': int(t.get('way', 0)),
})
return formatted
except Exception as e:
logger.error(f"获取成交记录异常: {e}")
return []
def save_trades(self, trades: list):
"""保存成交记录到数据库"""
new_count = 0
for trade in trades:
try:
_, created = BitMartETHTrades.get_or_create(
id=trade['id'],
defaults={
'timestamp': trade['timestamp'],
'price': trade['price'],
'volume': trade['volume'],
'side': trade['side'],
}
)
if created:
new_count += 1
except Exception as e:
pass # 忽略重复数据
return new_count
def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3):
"""
实时持续采集成交记录(秒级数据源)
:param duration_seconds: 采集时长默认1小时
:param interval: 采集间隔默认0.3秒
"""
logger.info(f"{'='*60}")
logger.info(f"开始实时采集成交记录")
logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)")
logger.info(f"间隔: {interval}")
logger.info(f"{'='*60}")
start_time = time.time()
end_time = start_time + duration_seconds
total_saved = 0
batch_count = 0
while time.time() < end_time:
trades = self.get_trades(limit=100)
if trades:
saved = self.save_trades(trades)
total_saved += saved
batch_count += 1
# 每10批显示一次进度
if batch_count % 10 == 0:
elapsed = time.time() - start_time
remaining = end_time - time.time()
latest = trades[-1]
ts_str = datetime.datetime.fromtimestamp(
latest['timestamp']/1000
).strftime('%H:%M:%S')
logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | "
f"本批新增: {saved} | 累计: {total_saved} | "
f"剩余: {remaining/60:.1f}分钟")
time.sleep(interval)
logger.success(f"采集完成!共新增 {total_saved} 条成交记录")
# 自动聚合为秒级K线
logger.info("正在将成交记录聚合为秒级K线...")
self.aggregate_trades_to_seconds()
return total_saved
def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None):
"""
将成交记录聚合为秒级K线数据
:param start_ts: 开始时间戳(毫秒),默认全部
:param end_ts: 结束时间戳(毫秒),默认全部
:return: 聚合的秒级K线数量
"""
# 构建查询
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
if start_ts:
query = query.where(BitMartETHTrades.timestamp >= start_ts)
if end_ts:
query = query.where(BitMartETHTrades.timestamp <= end_ts)
# 按秒聚合
second_data = {}
trade_count = 0
for trade in query:
trade_count += 1
# 取整到秒(毫秒时间戳)
second_ts = (trade.timestamp // 1000) * 1000
if second_ts not in second_data:
second_data[second_ts] = {
'open': trade.price,
'high': trade.price,
'low': trade.price,
'close': trade.price,
'volume': trade.volume,
'trade_count': 1
}
else:
second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price)
second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price)
second_data[second_ts]['close'] = trade.price
second_data[second_ts]['volume'] += trade.volume
second_data[second_ts]['trade_count'] += 1
# 保存到数据库
saved_count = 0
for ts, ohlc in second_data.items():
try:
BitMartETHSecond.insert(
id=ts,
open=ohlc['open'],
high=ohlc['high'],
low=ohlc['low'],
close=ohlc['close'],
volume=ohlc['volume'],
trade_count=ohlc['trade_count']
).on_conflict(
conflict_target=[BitMartETHSecond.id],
update={
BitMartETHSecond.open: ohlc['open'],
BitMartETHSecond.high: ohlc['high'],
BitMartETHSecond.low: ohlc['low'],
BitMartETHSecond.close: ohlc['close'],
BitMartETHSecond.volume: ohlc['volume'],
BitMartETHSecond.trade_count: ohlc['trade_count'],
}
).execute()
saved_count += 1
except Exception as e:
logger.error(f"保存秒级K线失败 {ts}: {e}")
logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线")
return saved_count
def get_second_klines(self, start_ts: int = None, end_ts: int = None):
"""
获取秒级K线数据
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:return: 秒级K线列表
"""
query = BitMartETHSecond.select().order_by(BitMartETHSecond.id)
if start_ts:
query = query.where(BitMartETHSecond.id >= start_ts)
if end_ts:
query = query.where(BitMartETHSecond.id <= end_ts)
return [{
'timestamp': k.id,
'open': k.open,
'high': k.high,
'low': k.low,
'close': k.close,
'volume': k.volume,
'trade_count': k.trade_count
} for k in query]
def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None):
"""
将成交记录聚合为自定义毫秒级K线数据不保存到数据库直接返回
:param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...]
"""
# 构建查询
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
if start_ts:
query = query.where(BitMartETHTrades.timestamp >= start_ts)
if end_ts:
query = query.where(BitMartETHTrades.timestamp <= end_ts)
# 按指定间隔聚合
interval_data = {}
trade_count = 0
for trade in query:
trade_count += 1
# 取整到指定间隔
interval_ts = (trade.timestamp // interval_ms) * interval_ms
if interval_ts not in interval_data:
interval_data[interval_ts] = {
'open': trade.price,
'high': trade.price,
'low': trade.price,
'close': trade.price,
'volume': trade.volume,
'trade_count': 1
}
else:
interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price)
interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price)
interval_data[interval_ts]['close'] = trade.price
interval_data[interval_ts]['volume'] += trade.volume
interval_data[interval_ts]['trade_count'] += 1
# 转换为列表
result = []
for ts, ohlc in sorted(interval_data.items()):
result.append({
'timestamp': ts,
'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
'open': ohlc['open'],
'high': ohlc['high'],
'low': ohlc['low'],
'close': ohlc['close'],
'volume': ohlc['volume'],
'trade_count': ohlc['trade_count']
})
logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)}{interval_ms}ms K线")
return result
def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None):
"""
获取原始成交记录(逐笔数据,毫秒级)
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:param limit: 最大返回条数
:return: 成交记录列表
"""
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
if start_ts:
query = query.where(BitMartETHTrades.timestamp >= start_ts)
if end_ts:
query = query.where(BitMartETHTrades.timestamp <= end_ts)
if limit:
query = query.limit(limit)
return [{
'id': t.id,
'timestamp': t.timestamp,
'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
'price': t.price,
'volume': t.volume,
'side': '' if t.side == 1 else ''
} for t in query]
def close(self):
"""关闭数据库连接"""
if not db.is_closed():
db.close()
if __name__ == '__main__':
collector = BitMartMultiKlineCollector()
try:
# 查看当前数据统计
collector.get_stats()
# ============ 选择要执行的任务 ============
# 任务1: 抓取K线数据1分钟~1小时周期
# 从 2025-01-01 抓取到当前时间(支持断点续传)
collector.collect_from_date(
start_date='2010-01-01',
periods=[1, 3, 5, 15, 30, 60] # 所有周期
)
# 任务2: 实时采集秒级数据(成交记录)
# 注意: 秒级数据只能实时采集,无法获取历史
# collector.collect_trades_realtime(
# duration_seconds=3600, # 采集1小时
# interval=0.3 # 每0.3秒请求一次
# )
# 任务3: 将已采集的成交记录聚合为秒级K线
# collector.aggregate_trades_to_seconds()
# 再次查看统计
collector.get_stats()
finally:
collector.close()