From 343f3a403c317c100d5fe89097fe3facfcd79241 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Sat, 21 Feb 2026 18:25:34 +0800 Subject: [PATCH] chore: initial commit with config ignores --- .gitignore | 25 ++ main.py | 16 + models/__init__.py | 58 +++ models/bitmart.py | 21 ++ models/bitmart_15.py | 21 ++ models/bitmart_klines.py | 97 +++++ models/ips.py | 21 ++ models/mexc.py | 57 +++ models/weex.py | 71 ++++ models/xstart.py | 22 ++ models/xtoken.py | 24 ++ 抓取多周期K线.py | 762 +++++++++++++++++++++++++++++++++++++++ 12 files changed, 1195 insertions(+) create mode 100644 .gitignore create mode 100644 main.py create mode 100644 models/__init__.py create mode 100644 models/bitmart.py create mode 100644 models/bitmart_15.py create mode 100644 models/bitmart_klines.py create mode 100644 models/ips.py create mode 100644 models/mexc.py create mode 100644 models/weex.py create mode 100644 models/xstart.py create mode 100644 models/xtoken.py create mode 100644 抓取多周期K线.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..84a4c05 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# IDE +.idea/ +.vscode/ + +# Python cache/build artifacts +__pycache__/ +*.py[cod] +*.so +*.egg-info/ +.pytest_cache/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Local environment/config files +.env +.env.* +*.local +local_settings.py + +# Local runtime data +models/database.db +*.log diff --git a/main.py b/main.py new file mode 100644 index 0000000..b56c695 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +# 这是一个示例 Python 脚本。 + +# 按 ⌃R 执行或将其替换为您的代码。 +# 按 双击 ⇧ 在所有地方搜索类、文件、工具窗口、操作和设置。 + + +def print_hi(name): + # 在下面的代码行中使用断点来调试脚本。 + print(f'Hi, {name}') # 按 ⌘F8 切换断点。 + + +# 按装订区域中的绿色按钮以运行脚本。 +if __name__ == '__main__': + print_hi('PyCharm') + +# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助 diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..ca256ce --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,58 @@ +from pathlib import Path + +from peewee import * + +# 连接到 SQLite 数据库,如果文件不存在会自动创建 +db = SqliteDatabase(fr'{Path(__file__).parent}/database.db') + +import pymysql + +from peewee import * +from playhouse.pool import PooledMySQLDatabase + +pymysql.install_as_MySQLdb() + +# 数据库配置 +db_config = { + 'database': 'lm', + 'user': 'lm', + 'password': 'HhyAsGbrrbsJfpyy', + 'host': '192.168.1.87', + 'port': 3306 +} + +# 全局数据库实例 +db1 = MySQLDatabase( + db_config['database'], + user=db_config['user'], + password=db_config['password'], + host=db_config['host'], + port=db_config['port'] +) + +# class BaseModel(Model): +# class Meta: +# database = db1 +# +# def save(self, *args, **kwargs): +# """在调用 save 时自动连接和关闭(若无事务)""" +# db.connect(reuse_if_open=True) +# try: +# result = super().save(*args, **kwargs) +# finally: +# # 若当前没有事务且连接仍然打开,则关闭连接 +# if not db.in_transaction() and not db.is_closed(): +# db.close() +# return result +# +# @classmethod +# def get_or_create(cls, defaults=None, **kwargs): +# """在调用 get_or_create 时自动连接和关闭(若无事务)""" +# db.connect(reuse_if_open=True) +# try: +# obj, created = super().get_or_create(defaults=defaults, **kwargs) +# finally: +# # 若当前没有事务且连接仍然打开,则关闭连接 +# if not db.in_transaction() and not db.is_closed(): +# db.close() +# return obj, created diff --git a/models/bitmart.py b/models/bitmart.py new file mode 100644 index 0000000..106fb7a --- /dev/null +++ b/models/bitmart.py @@ -0,0 +1,21 @@ +from peewee import * + +from models import db + + +class BitMart30(Model): + id = IntegerField(primary_key=True) # 时间戳(毫秒级) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_30' + + +# 连接到数据库 +db.connect() +# 创建表(如果表不存在) +db.create_tables([BitMart30]) diff --git a/models/bitmart_15.py b/models/bitmart_15.py new file mode 100644 index 0000000..5360111 --- /dev/null +++ b/models/bitmart_15.py @@ -0,0 +1,21 @@ +from peewee import * + +from models import db + + +class BitMart15(Model): + id = IntegerField(primary_key=True) # 时间戳(毫秒级) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_15' + + +# 连接到数据库 +db.connect() +# 创建表(如果表不存在) +db.create_tables([BitMart15]) diff --git a/models/bitmart_klines.py b/models/bitmart_klines.py new file mode 100644 index 0000000..d80f162 --- /dev/null +++ b/models/bitmart_klines.py @@ -0,0 +1,97 @@ +""" +BitMart 多周期K线数据模型 +包含 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据表 +""" + +from peewee import * +from models import db + + +# ==================== 1分钟 K线 ==================== +class BitMartETH1M(Model): + id = BigIntegerField(primary_key=True) # 时间戳(毫秒级) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_1m' + + +# ==================== 3分钟 K线 ==================== +class BitMartETH3M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_3m' + + +# ==================== 5分钟 K线 ==================== +class BitMartETH5M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_5m' + + +# ==================== 15分钟 K线 ==================== +class BitMartETH15M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_15m' + + +# ==================== 30分钟 K线 ==================== +class BitMartETH30M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_30m' + + +# ==================== 1小时 K线 ==================== +class BitMartETH1H(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_1h' + + +# 连接数据库并创建表 +db.connect(reuse_if_open=True) +db.create_tables([ + BitMartETH1M, + BitMartETH3M, + BitMartETH5M, + BitMartETH15M, + BitMartETH30M, + BitMartETH1H, +], safe=True) diff --git a/models/ips.py b/models/ips.py new file mode 100644 index 0000000..d3e2ac7 --- /dev/null +++ b/models/ips.py @@ -0,0 +1,21 @@ +from peewee import * + +from models import db1 + + +class Ips(Model): + id = IntegerField(primary_key=True) + host = CharField(null=True) + port = CharField(null=True) + username = CharField(null=True) + password = CharField(null=True) + start = IntegerField(null=True) + country = CharField(null=True) + + class Meta: + database = db1 + table_name = 'ips' + + +# if __name__ == '__main__': +# Ips.create_table() diff --git a/models/mexc.py b/models/mexc.py new file mode 100644 index 0000000..54e05e9 --- /dev/null +++ b/models/mexc.py @@ -0,0 +1,57 @@ +from peewee import * + +from models import db + + +class Mexc1(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_1' + + +class Mexc15(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_15' + + +class Mexc30(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_30' + + +class Mexc1Hour(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_1_hour' + + +# 连接到数据库 +db.connect() +# 创建表(如果表不存在) +db.create_tables([Mexc1, Mexc15, Mexc30, Mexc1Hour]) diff --git a/models/weex.py b/models/weex.py new file mode 100644 index 0000000..7cfb894 --- /dev/null +++ b/models/weex.py @@ -0,0 +1,71 @@ +from peewee import * + +from models import db + + +class Weex15(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_15' + + +class Weex1(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_1' + + +class Weex1Hour(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_1_hour' + +class Weex30(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_30' + +class Weex30Copy(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_30_copy1' + + +# 连接到数据库 +db.connect() +# +# # 创建表(如果表不存在) +# db.create_tables([Weex15]) +db.create_tables([Weex30]) + + diff --git a/models/xstart.py b/models/xstart.py new file mode 100644 index 0000000..69dce38 --- /dev/null +++ b/models/xstart.py @@ -0,0 +1,22 @@ +from peewee import * + +from models import db1 +from models.ips import Ips + + +class Xstart(Model): + id = AutoField(primary_key=True) # 自增主键 + bit_id = CharField(null=True) + start = IntegerField(null=True) + x_id = IntegerField(null=True) + ip_id = IntegerField(null=True) + url_id = CharField(null=True) + + class Meta: + database = db1 # 所属数据库 + table_name = 'xstart' + + +if __name__ == '__main__': + + Xstart.create_table() \ No newline at end of file diff --git a/models/xtoken.py b/models/xtoken.py new file mode 100644 index 0000000..89041bf --- /dev/null +++ b/models/xtoken.py @@ -0,0 +1,24 @@ +from peewee import * +# 假设 db 已经在其他地方定义并连接到数据库 +from models import db1 + + +class XToken(Model): + id = AutoField(primary_key=True) # 自增主键 + hub_id = IntegerField(null=True) # hub_id 字段,整型,可为空 + start = IntegerField(null=True) # start 字段,整型,可为空 + account_start = IntegerField(null=True) # account_start 字段,整型,可为空 + user_name = CharField(max_length=255, null=True) # user_name 字段,最大长度 255,可为空 + password = CharField(max_length=255, null=True) # password 字段,最大长度 255,可为空 + email = CharField(max_length=255, null=True) # email 字段,最大长度 255,可为空 + two_fa = CharField(max_length=255, null=True) # 2fa 字段,由于 2fa 是 Python 中的无效标识符,这里使用 two_fa 替代,最大长度 255,可为空 + token = CharField(max_length=255, null=True) # token 字段,最大长度 255,可为空 + email_pwd = CharField(max_length=255, null=True) # token 字段,最大长度 255,可为空 + + class Meta: + database = db1 # 所属数据库 + table_name = 'x_token' # 表名 + + +if __name__ == '__main__': + XToken.create_table() diff --git a/抓取多周期K线.py b/抓取多周期K线.py new file mode 100644 index 0000000..64eeb78 --- /dev/null +++ b/抓取多周期K线.py @@ -0,0 +1,762 @@ +""" +BitMart 多周期K线数据抓取脚本 +支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据 +支持秒级价格数据(通过成交记录API) +支持断点续传,从数据库最新/最早记录继续抓取 +""" + +import time +import datetime +from pathlib import Path +from loguru import logger +from peewee import * +from bitmart.api_contract import APIContract + +# 数据库配置(使用脚本所在项目目录下的 models) +DB_PATH = Path(__file__).parent / 'models' / 'database.db' +db = SqliteDatabase(str(DB_PATH)) + +# K线周期配置:step值 -> 表名后缀 +KLINE_CONFIGS = { + 1: '1m', # 1分钟 + 3: '3m', # 3分钟 + 5: '5m', # 5分钟 + 15: '15m', # 15分钟 + 30: '30m', # 30分钟 + 60: '1h', # 1小时 +} + + +class BitMartETHTrades(Model): + """成交记录模型(秒级/毫秒级原始数据)""" + id = BigIntegerField(primary_key=True) # 成交ID + timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒) + price = FloatField() # 成交价格 + volume = FloatField() # 成交量 + side = IntegerField() # 方向: 1=买, -1=卖 + + class Meta: + database = db + table_name = 'bitmart_eth_trades' + + +class BitMartETHSecond(Model): + """秒级K线模型(由成交记录聚合而来)""" + id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + volume = FloatField(null=True) + trade_count = IntegerField(null=True) # 该秒内成交笔数 + + class Meta: + database = db + table_name = 'bitmart_eth_1s' + + +def create_kline_model(step: int): + """ + 动态创建K线数据模型 + :param step: K线周期(分钟) + :return: Model类 + """ + suffix = KLINE_CONFIGS.get(step, f'{step}m') + tbl_name = f'bitmart_eth_{suffix}' + + # 使用 type() 动态创建类,避免闭包问题 + attrs = { + 'id': BigIntegerField(primary_key=True), + 'open': FloatField(null=True), + 'high': FloatField(null=True), + 'low': FloatField(null=True), + 'close': FloatField(null=True), + } + + # 创建 Meta 类 + meta_attrs = { + 'database': db, + 'table_name': tbl_name, + } + Meta = type('Meta', (), meta_attrs) + attrs['Meta'] = Meta + + # 动态创建 Model 类 + model_name = f'BitMartETH{suffix.upper()}' + KlineModel = type(model_name, (Model,), attrs) + + return KlineModel + + +class BitMartMultiKlineCollector: + """多周期K线数据抓取器""" + + def __init__(self): + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "数据抓取" + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + # 存储各周期的模型 + self.models = {} + + # 初始化数据库连接和表 + self._init_database() + + def _init_database(self): + """初始化数据库,创建所有周期的表""" + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + db.connect(reuse_if_open=True) + + for step in KLINE_CONFIGS.keys(): + model = create_kline_model(step) + self.models[step] = model + # 创建表(如果不存在) + db.create_tables([model], safe=True) + logger.info(f"初始化表: {model._meta.table_name}") + + # 创建成交记录表和秒级K线表 + db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True) + logger.info(f"初始化表: bitmart_eth_trades (成交记录)") + logger.info(f"初始化表: bitmart_eth_1s (秒级K线)") + + def get_db_time_range(self, step: int): + """ + 获取数据库中已有数据的时间范围 + :param step: K线周期 + :return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None) + """ + model = self.models.get(step) + if not model: + return None, None + + try: + # 获取最早记录 + earliest = model.select(fn.MIN(model.id)).scalar() + # 获取最新记录 + latest = model.select(fn.MAX(model.id)).scalar() + return earliest, latest + except Exception as e: + logger.error(f"查询数据库时间范围异常: {e}") + return None, None + + def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3): + """ + 获取K线数据(带重试) + :param step: K线周期(分钟) + :param start_time: 开始时间戳(秒级) + :param end_time: 结束时间戳(秒级) + :param max_retries: 最大重试次数 + :return: K线数据列表 + """ + for attempt in range(max_retries): + try: + start_time = int(start_time) + end_time = int(end_time) + + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=step, + start_time=start_time, + end_time=end_time + )[0] + + if response['code'] != 1000: + logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}") + if attempt < max_retries - 1: + time.sleep(1) + continue + return [] + + klines = response.get('data', []) + formatted = [] + for k in klines: + timestamp_ms = int(k["timestamp"]) * 1000 + formatted.append({ + 'id': timestamp_ms, + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + + formatted.sort(key=lambda x: x['id']) + return formatted + + except Exception as e: + logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}") + if attempt < max_retries - 1: + time.sleep(2) + continue + return [] + + return [] + + def save_klines(self, step: int, klines: list): + """ + 保存K线数据到数据库 + :param step: K线周期 + :param klines: K线数据列表 + :return: 新保存的数量 + """ + model = self.models.get(step) + if not model: + logger.error(f"未找到 {step}分钟 的数据模型") + return 0 + + new_count = 0 + for kline in klines: + try: + _, created = model.get_or_create( + id=kline['id'], + defaults={ + 'open': kline['open'], + 'high': kline['high'], + 'low': kline['low'], + 'close': kline['close'], + } + ) + if created: + new_count += 1 + except Exception as e: + logger.error(f"保存K线数据失败 {kline['id']}: {e}") + + return new_count + + def get_batch_seconds(self, step: int): + """根据周期获取合适的批次大小""" + if step == 1: + return 3600 * 4 # 1分钟: 每次4小时 + elif step == 3: + return 3600 * 8 # 3分钟: 每次8小时 + elif step == 5: + return 3600 * 12 # 5分钟: 每次12小时 + elif step == 15: + return 3600 * 24 # 15分钟: 每次1天 + elif step == 30: + return 3600 * 48 # 30分钟: 每次2天 + else: + return 3600 * 72 # 1小时: 每次3天 + + def collect_period_range(self, step: int, target_start: int, target_end: int): + """ + 抓取指定时间范围的K线数据(支持断点续传) + :param step: K线周期(分钟) + :param target_start: 目标开始时间戳(秒) + :param target_end: 目标结束时间戳(秒) + :return: 保存的总数量 + """ + suffix = KLINE_CONFIGS.get(step, f'{step}m') + batch_seconds = self.get_batch_seconds(step) + + # 获取数据库已有数据范围 + db_earliest, db_latest = self.get_db_time_range(step) + + if db_earliest and db_latest: + db_earliest_sec = db_earliest // 1000 + db_latest_sec = db_latest // 1000 + logger.info(f"[{suffix}] 数据库已有数据: " + f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ " + f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}") + else: + db_earliest_sec = None + db_latest_sec = None + logger.info(f"[{suffix}] 数据库暂无数据") + + total_saved = 0 + + # === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start)=== + if db_earliest_sec: + backward_end = db_earliest_sec + else: + backward_end = target_end + + if backward_end > target_start: + logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===") + total_backward = backward_end - target_start + + current_end = backward_end + fail_count = 0 + max_fail = 5 + + while current_end > target_start and fail_count < max_fail: + current_start = max(current_end - batch_seconds, target_start) + + # 计算进度 + progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0 + start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) + end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) + + klines = self.get_klines(step, current_start, current_end) + if klines: + saved = self.save_klines(step, klines) + total_saved += saved + logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | " + f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") + fail_count = 0 + else: + fail_count += 1 + logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})") + if fail_count >= max_fail: + earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end)) + logger.warning(f"[{suffix}] 已达到API历史数据限制,最早可获取: {earliest_date}") + break + + current_end = current_start + time.sleep(0.3) + + # === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end)=== + if db_latest_sec: + forward_start = db_latest_sec + else: + # 如果没有数据,从第一阶段结束的地方开始 + forward_start = target_start + + if forward_start < target_end: + logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===") + total_forward = target_end - forward_start + + current_start = forward_start + fail_count = 0 + max_fail = 3 + + while current_start < target_end and fail_count < max_fail: + current_end = min(current_start + batch_seconds, target_end) + + # 计算进度 + progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0 + start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start)) + end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end)) + + klines = self.get_klines(step, current_start, current_end) + if klines: + saved = self.save_klines(step, klines) + total_saved += saved + logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | " + f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%") + fail_count = 0 + else: + fail_count += 1 + logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})") + + current_start = current_end + time.sleep(0.3) + + # 统计最终数据范围 + final_earliest, final_latest = self.get_db_time_range(step) + if final_earliest and final_latest: + logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: " + f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ " + f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}") + else: + logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条") + + return total_saved + + def collect_from_date(self, start_date: str, periods: list = None): + """ + 从指定日期抓取到当前时间 + :param start_date: 起始日期 'YYYY-MM-DD' + :param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部 + """ + if periods is None: + periods = list(KLINE_CONFIGS.keys()) + + # 计算时间范围 + start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') + target_start = int(start_dt.timestamp()) + target_end = int(time.time()) + + start_str = start_dt.strftime('%Y-%m-%d') + end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M') + + logger.info(f"{'='*60}") + logger.info(f"目标时间范围: {start_str} ~ {end_str}") + logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}") + logger.info(f"{'='*60}") + + results = {} + for step in periods: + if step not in KLINE_CONFIGS: + logger.warning(f"不支持的周期: {step}分钟,跳过") + continue + + logger.info(f"\n{'='*60}") + logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线") + logger.info(f"{'='*60}") + + saved = self.collect_period_range(step, target_start, target_end) + results[KLINE_CONFIGS[step]] = saved + + time.sleep(1) # 不同周期之间间隔 + + # 打印总结 + logger.info(f"\n{'='*60}") + logger.info("所有周期抓取完成!统计:") + for period, count in results.items(): + logger.info(f" {period}: 新增 {count} 条") + logger.info(f"{'='*60}") + + return results + + def get_stats(self): + """获取各周期数据统计""" + logger.info(f"\n{'='*60}") + logger.info("数据库统计:") + logger.info(f"{'='*60}") + + for step, model in self.models.items(): + suffix = KLINE_CONFIGS.get(step, f'{step}m') + try: + count = model.select().count() + earliest, latest = self.get_db_time_range(step) + if earliest and latest: + earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000)) + logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f" {suffix:>4}: {count:>8} 条") + except Exception as e: + logger.error(f" {suffix}: 查询失败 - {e}") + + # 成交记录统计 + try: + trades_count = BitMartETHTrades.select().count() + if trades_count > 0: + earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar() + latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar() + earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000)) + logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f"trades: {trades_count:>8} 条") + except Exception as e: + logger.error(f"trades: 查询失败 - {e}") + + # 秒级K线统计 + try: + second_count = BitMartETHSecond.select().count() + if second_count > 0: + earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar() + latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar() + earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000)) + latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000)) + logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}") + else: + logger.info(f" 1s: {second_count:>8} 条") + except Exception as e: + logger.error(f" 1s: 查询失败 - {e}") + + logger.info(f"{'='*60}") + + # ==================== 秒级数据相关方法 ==================== + + def get_trades(self, limit: int = 100): + """ + 获取最近成交记录 + :param limit: 获取条数 + :return: 成交记录列表 + """ + try: + response = self.contractAPI.get_trades( + contract_symbol=self.contract_symbol, + )[0] + + if response['code'] != 1000: + logger.error(f"获取成交记录失败: {response}") + return [] + + trades = response.get('data', {}).get('trades', []) + formatted = [] + for t in trades: + formatted.append({ + 'id': int(t.get('trade_id', 0)), + 'timestamp': int(t.get('create_time', 0)), + 'price': float(t.get('deal_price', 0)), + 'volume': float(t.get('deal_vol', 0)), + 'side': int(t.get('way', 0)), + }) + + return formatted + except Exception as e: + logger.error(f"获取成交记录异常: {e}") + return [] + + def save_trades(self, trades: list): + """保存成交记录到数据库""" + new_count = 0 + for trade in trades: + try: + _, created = BitMartETHTrades.get_or_create( + id=trade['id'], + defaults={ + 'timestamp': trade['timestamp'], + 'price': trade['price'], + 'volume': trade['volume'], + 'side': trade['side'], + } + ) + if created: + new_count += 1 + except Exception as e: + pass # 忽略重复数据 + return new_count + + def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3): + """ + 实时持续采集成交记录(秒级数据源) + :param duration_seconds: 采集时长(秒),默认1小时 + :param interval: 采集间隔(秒),默认0.3秒 + """ + logger.info(f"{'='*60}") + logger.info(f"开始实时采集成交记录") + logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)") + logger.info(f"间隔: {interval}秒") + logger.info(f"{'='*60}") + + start_time = time.time() + end_time = start_time + duration_seconds + total_saved = 0 + batch_count = 0 + + while time.time() < end_time: + trades = self.get_trades(limit=100) + if trades: + saved = self.save_trades(trades) + total_saved += saved + batch_count += 1 + + # 每10批显示一次进度 + if batch_count % 10 == 0: + elapsed = time.time() - start_time + remaining = end_time - time.time() + latest = trades[-1] + ts_str = datetime.datetime.fromtimestamp( + latest['timestamp']/1000 + ).strftime('%H:%M:%S') + logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | " + f"本批新增: {saved} | 累计: {total_saved} | " + f"剩余: {remaining/60:.1f}分钟") + + time.sleep(interval) + + logger.success(f"采集完成!共新增 {total_saved} 条成交记录") + + # 自动聚合为秒级K线 + logger.info("正在将成交记录聚合为秒级K线...") + self.aggregate_trades_to_seconds() + + return total_saved + + def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None): + """ + 将成交记录聚合为秒级K线数据 + :param start_ts: 开始时间戳(毫秒),默认全部 + :param end_ts: 结束时间戳(毫秒),默认全部 + :return: 聚合的秒级K线数量 + """ + # 构建查询 + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + + # 按秒聚合 + second_data = {} + trade_count = 0 + + for trade in query: + trade_count += 1 + # 取整到秒(毫秒时间戳) + second_ts = (trade.timestamp // 1000) * 1000 + + if second_ts not in second_data: + second_data[second_ts] = { + 'open': trade.price, + 'high': trade.price, + 'low': trade.price, + 'close': trade.price, + 'volume': trade.volume, + 'trade_count': 1 + } + else: + second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price) + second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price) + second_data[second_ts]['close'] = trade.price + second_data[second_ts]['volume'] += trade.volume + second_data[second_ts]['trade_count'] += 1 + + # 保存到数据库 + saved_count = 0 + for ts, ohlc in second_data.items(): + try: + BitMartETHSecond.insert( + id=ts, + open=ohlc['open'], + high=ohlc['high'], + low=ohlc['low'], + close=ohlc['close'], + volume=ohlc['volume'], + trade_count=ohlc['trade_count'] + ).on_conflict( + conflict_target=[BitMartETHSecond.id], + update={ + BitMartETHSecond.open: ohlc['open'], + BitMartETHSecond.high: ohlc['high'], + BitMartETHSecond.low: ohlc['low'], + BitMartETHSecond.close: ohlc['close'], + BitMartETHSecond.volume: ohlc['volume'], + BitMartETHSecond.trade_count: ohlc['trade_count'], + } + ).execute() + saved_count += 1 + except Exception as e: + logger.error(f"保存秒级K线失败 {ts}: {e}") + + logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线") + return saved_count + + def get_second_klines(self, start_ts: int = None, end_ts: int = None): + """ + 获取秒级K线数据 + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :return: 秒级K线列表 + """ + query = BitMartETHSecond.select().order_by(BitMartETHSecond.id) + if start_ts: + query = query.where(BitMartETHSecond.id >= start_ts) + if end_ts: + query = query.where(BitMartETHSecond.id <= end_ts) + + return [{ + 'timestamp': k.id, + 'open': k.open, + 'high': k.high, + 'low': k.low, + 'close': k.close, + 'volume': k.volume, + 'trade_count': k.trade_count + } for k in query] + + def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None): + """ + 将成交记录聚合为自定义毫秒级K线数据(不保存到数据库,直接返回) + :param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒 + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...] + """ + # 构建查询 + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + + # 按指定间隔聚合 + interval_data = {} + trade_count = 0 + + for trade in query: + trade_count += 1 + # 取整到指定间隔 + interval_ts = (trade.timestamp // interval_ms) * interval_ms + + if interval_ts not in interval_data: + interval_data[interval_ts] = { + 'open': trade.price, + 'high': trade.price, + 'low': trade.price, + 'close': trade.price, + 'volume': trade.volume, + 'trade_count': 1 + } + else: + interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price) + interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price) + interval_data[interval_ts]['close'] = trade.price + interval_data[interval_ts]['volume'] += trade.volume + interval_data[interval_ts]['trade_count'] += 1 + + # 转换为列表 + result = [] + for ts, ohlc in sorted(interval_data.items()): + result.append({ + 'timestamp': ts, + 'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], + 'open': ohlc['open'], + 'high': ohlc['high'], + 'low': ohlc['low'], + 'close': ohlc['close'], + 'volume': ohlc['volume'], + 'trade_count': ohlc['trade_count'] + }) + + logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)} 条 {interval_ms}ms K线") + return result + + def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None): + """ + 获取原始成交记录(逐笔数据,毫秒级) + :param start_ts: 开始时间戳(毫秒) + :param end_ts: 结束时间戳(毫秒) + :param limit: 最大返回条数 + :return: 成交记录列表 + """ + query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp) + if start_ts: + query = query.where(BitMartETHTrades.timestamp >= start_ts) + if end_ts: + query = query.where(BitMartETHTrades.timestamp <= end_ts) + if limit: + query = query.limit(limit) + + return [{ + 'id': t.id, + 'timestamp': t.timestamp, + 'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], + 'price': t.price, + 'volume': t.volume, + 'side': '买' if t.side == 1 else '卖' + } for t in query] + + def close(self): + """关闭数据库连接""" + if not db.is_closed(): + db.close() + + +if __name__ == '__main__': + collector = BitMartMultiKlineCollector() + + try: + # 查看当前数据统计 + collector.get_stats() + + # ============ 选择要执行的任务 ============ + + # 任务1: 抓取K线数据(1分钟~1小时周期) + # 从 2025-01-01 抓取到当前时间(支持断点续传) + collector.collect_from_date( + start_date='2010-01-01', + periods=[1, 3, 5, 15, 30, 60] # 所有周期 + ) + + # 任务2: 实时采集秒级数据(成交记录) + # 注意: 秒级数据只能实时采集,无法获取历史 + # collector.collect_trades_realtime( + # duration_seconds=3600, # 采集1小时 + # interval=0.3 # 每0.3秒请求一次 + # ) + + # 任务3: 将已采集的成交记录聚合为秒级K线 + # collector.aggregate_trades_to_seconds() + + # 再次查看统计 + collector.get_stats() + + finally: + collector.close()