chore: initial commit with config ignores
This commit is contained in:
25
.gitignore
vendored
Normal file
25
.gitignore
vendored
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# IDE
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
|
||||||
|
# Python cache/build artifacts
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*.so
|
||||||
|
*.egg-info/
|
||||||
|
.pytest_cache/
|
||||||
|
|
||||||
|
# Virtual environments
|
||||||
|
.venv/
|
||||||
|
venv/
|
||||||
|
env/
|
||||||
|
|
||||||
|
# Local environment/config files
|
||||||
|
.env
|
||||||
|
.env.*
|
||||||
|
*.local
|
||||||
|
local_settings.py
|
||||||
|
|
||||||
|
# Local runtime data
|
||||||
|
models/database.db
|
||||||
|
*.log
|
||||||
16
main.py
Normal file
16
main.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# 这是一个示例 Python 脚本。
|
||||||
|
|
||||||
|
# 按 ⌃R 执行或将其替换为您的代码。
|
||||||
|
# 按 双击 ⇧ 在所有地方搜索类、文件、工具窗口、操作和设置。
|
||||||
|
|
||||||
|
|
||||||
|
def print_hi(name):
|
||||||
|
# 在下面的代码行中使用断点来调试脚本。
|
||||||
|
print(f'Hi, {name}') # 按 ⌘F8 切换断点。
|
||||||
|
|
||||||
|
|
||||||
|
# 按装订区域中的绿色按钮以运行脚本。
|
||||||
|
if __name__ == '__main__':
|
||||||
|
print_hi('PyCharm')
|
||||||
|
|
||||||
|
# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助
|
||||||
58
models/__init__.py
Normal file
58
models/__init__.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from peewee import *
|
||||||
|
|
||||||
|
# 连接到 SQLite 数据库,如果文件不存在会自动创建
|
||||||
|
db = SqliteDatabase(fr'{Path(__file__).parent}/database.db')
|
||||||
|
|
||||||
|
import pymysql
|
||||||
|
|
||||||
|
from peewee import *
|
||||||
|
from playhouse.pool import PooledMySQLDatabase
|
||||||
|
|
||||||
|
pymysql.install_as_MySQLdb()
|
||||||
|
|
||||||
|
# 数据库配置
|
||||||
|
db_config = {
|
||||||
|
'database': 'lm',
|
||||||
|
'user': 'lm',
|
||||||
|
'password': 'HhyAsGbrrbsJfpyy',
|
||||||
|
'host': '192.168.1.87',
|
||||||
|
'port': 3306
|
||||||
|
}
|
||||||
|
|
||||||
|
# 全局数据库实例
|
||||||
|
db1 = MySQLDatabase(
|
||||||
|
db_config['database'],
|
||||||
|
user=db_config['user'],
|
||||||
|
password=db_config['password'],
|
||||||
|
host=db_config['host'],
|
||||||
|
port=db_config['port']
|
||||||
|
)
|
||||||
|
|
||||||
|
# class BaseModel(Model):
|
||||||
|
# class Meta:
|
||||||
|
# database = db1
|
||||||
|
#
|
||||||
|
# def save(self, *args, **kwargs):
|
||||||
|
# """在调用 save 时自动连接和关闭(若无事务)"""
|
||||||
|
# db.connect(reuse_if_open=True)
|
||||||
|
# try:
|
||||||
|
# result = super().save(*args, **kwargs)
|
||||||
|
# finally:
|
||||||
|
# # 若当前没有事务且连接仍然打开,则关闭连接
|
||||||
|
# if not db.in_transaction() and not db.is_closed():
|
||||||
|
# db.close()
|
||||||
|
# return result
|
||||||
|
#
|
||||||
|
# @classmethod
|
||||||
|
# def get_or_create(cls, defaults=None, **kwargs):
|
||||||
|
# """在调用 get_or_create 时自动连接和关闭(若无事务)"""
|
||||||
|
# db.connect(reuse_if_open=True)
|
||||||
|
# try:
|
||||||
|
# obj, created = super().get_or_create(defaults=defaults, **kwargs)
|
||||||
|
# finally:
|
||||||
|
# # 若当前没有事务且连接仍然打开,则关闭连接
|
||||||
|
# if not db.in_transaction() and not db.is_closed():
|
||||||
|
# db.close()
|
||||||
|
# return obj, created
|
||||||
21
models/bitmart.py
Normal file
21
models/bitmart.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db
|
||||||
|
|
||||||
|
|
||||||
|
class BitMart30(Model):
|
||||||
|
id = IntegerField(primary_key=True) # 时间戳(毫秒级)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_30'
|
||||||
|
|
||||||
|
|
||||||
|
# 连接到数据库
|
||||||
|
db.connect()
|
||||||
|
# 创建表(如果表不存在)
|
||||||
|
db.create_tables([BitMart30])
|
||||||
21
models/bitmart_15.py
Normal file
21
models/bitmart_15.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db
|
||||||
|
|
||||||
|
|
||||||
|
class BitMart15(Model):
|
||||||
|
id = IntegerField(primary_key=True) # 时间戳(毫秒级)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_15'
|
||||||
|
|
||||||
|
|
||||||
|
# 连接到数据库
|
||||||
|
db.connect()
|
||||||
|
# 创建表(如果表不存在)
|
||||||
|
db.create_tables([BitMart15])
|
||||||
97
models/bitmart_klines.py
Normal file
97
models/bitmart_klines.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
"""
|
||||||
|
BitMart 多周期K线数据模型
|
||||||
|
包含 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据表
|
||||||
|
"""
|
||||||
|
|
||||||
|
from peewee import *
|
||||||
|
from models import db
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 1分钟 K线 ====================
|
||||||
|
class BitMartETH1M(Model):
|
||||||
|
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_1m'
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 3分钟 K线 ====================
|
||||||
|
class BitMartETH3M(Model):
|
||||||
|
id = BigIntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_3m'
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 5分钟 K线 ====================
|
||||||
|
class BitMartETH5M(Model):
|
||||||
|
id = BigIntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_5m'
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 15分钟 K线 ====================
|
||||||
|
class BitMartETH15M(Model):
|
||||||
|
id = BigIntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_15m'
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 30分钟 K线 ====================
|
||||||
|
class BitMartETH30M(Model):
|
||||||
|
id = BigIntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_30m'
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== 1小时 K线 ====================
|
||||||
|
class BitMartETH1H(Model):
|
||||||
|
id = BigIntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_1h'
|
||||||
|
|
||||||
|
|
||||||
|
# 连接数据库并创建表
|
||||||
|
db.connect(reuse_if_open=True)
|
||||||
|
db.create_tables([
|
||||||
|
BitMartETH1M,
|
||||||
|
BitMartETH3M,
|
||||||
|
BitMartETH5M,
|
||||||
|
BitMartETH15M,
|
||||||
|
BitMartETH30M,
|
||||||
|
BitMartETH1H,
|
||||||
|
], safe=True)
|
||||||
21
models/ips.py
Normal file
21
models/ips.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db1
|
||||||
|
|
||||||
|
|
||||||
|
class Ips(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
host = CharField(null=True)
|
||||||
|
port = CharField(null=True)
|
||||||
|
username = CharField(null=True)
|
||||||
|
password = CharField(null=True)
|
||||||
|
start = IntegerField(null=True)
|
||||||
|
country = CharField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db1
|
||||||
|
table_name = 'ips'
|
||||||
|
|
||||||
|
|
||||||
|
# if __name__ == '__main__':
|
||||||
|
# Ips.create_table()
|
||||||
57
models/mexc.py
Normal file
57
models/mexc.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db
|
||||||
|
|
||||||
|
|
||||||
|
class Mexc1(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'mexc_1'
|
||||||
|
|
||||||
|
|
||||||
|
class Mexc15(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'mexc_15'
|
||||||
|
|
||||||
|
|
||||||
|
class Mexc30(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'mexc_30'
|
||||||
|
|
||||||
|
|
||||||
|
class Mexc1Hour(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'mexc_1_hour'
|
||||||
|
|
||||||
|
|
||||||
|
# 连接到数据库
|
||||||
|
db.connect()
|
||||||
|
# 创建表(如果表不存在)
|
||||||
|
db.create_tables([Mexc1, Mexc15, Mexc30, Mexc1Hour])
|
||||||
71
models/weex.py
Normal file
71
models/weex.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db
|
||||||
|
|
||||||
|
|
||||||
|
class Weex15(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'weex_15'
|
||||||
|
|
||||||
|
|
||||||
|
class Weex1(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'weex_1'
|
||||||
|
|
||||||
|
|
||||||
|
class Weex1Hour(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'weex_1_hour'
|
||||||
|
|
||||||
|
class Weex30(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'weex_30'
|
||||||
|
|
||||||
|
class Weex30Copy(Model):
|
||||||
|
id = IntegerField(primary_key=True)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'weex_30_copy1'
|
||||||
|
|
||||||
|
|
||||||
|
# 连接到数据库
|
||||||
|
db.connect()
|
||||||
|
#
|
||||||
|
# # 创建表(如果表不存在)
|
||||||
|
# db.create_tables([Weex15])
|
||||||
|
db.create_tables([Weex30])
|
||||||
|
|
||||||
|
|
||||||
22
models/xstart.py
Normal file
22
models/xstart.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
from peewee import *
|
||||||
|
|
||||||
|
from models import db1
|
||||||
|
from models.ips import Ips
|
||||||
|
|
||||||
|
|
||||||
|
class Xstart(Model):
|
||||||
|
id = AutoField(primary_key=True) # 自增主键
|
||||||
|
bit_id = CharField(null=True)
|
||||||
|
start = IntegerField(null=True)
|
||||||
|
x_id = IntegerField(null=True)
|
||||||
|
ip_id = IntegerField(null=True)
|
||||||
|
url_id = CharField(null=True)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db1 # 所属数据库
|
||||||
|
table_name = 'xstart'
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
Xstart.create_table()
|
||||||
24
models/xtoken.py
Normal file
24
models/xtoken.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
from peewee import *
|
||||||
|
# 假设 db 已经在其他地方定义并连接到数据库
|
||||||
|
from models import db1
|
||||||
|
|
||||||
|
|
||||||
|
class XToken(Model):
|
||||||
|
id = AutoField(primary_key=True) # 自增主键
|
||||||
|
hub_id = IntegerField(null=True) # hub_id 字段,整型,可为空
|
||||||
|
start = IntegerField(null=True) # start 字段,整型,可为空
|
||||||
|
account_start = IntegerField(null=True) # account_start 字段,整型,可为空
|
||||||
|
user_name = CharField(max_length=255, null=True) # user_name 字段,最大长度 255,可为空
|
||||||
|
password = CharField(max_length=255, null=True) # password 字段,最大长度 255,可为空
|
||||||
|
email = CharField(max_length=255, null=True) # email 字段,最大长度 255,可为空
|
||||||
|
two_fa = CharField(max_length=255, null=True) # 2fa 字段,由于 2fa 是 Python 中的无效标识符,这里使用 two_fa 替代,最大长度 255,可为空
|
||||||
|
token = CharField(max_length=255, null=True) # token 字段,最大长度 255,可为空
|
||||||
|
email_pwd = CharField(max_length=255, null=True) # token 字段,最大长度 255,可为空
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db1 # 所属数据库
|
||||||
|
table_name = 'x_token' # 表名
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
XToken.create_table()
|
||||||
762
抓取多周期K线.py
Normal file
762
抓取多周期K线.py
Normal file
@@ -0,0 +1,762 @@
|
|||||||
|
"""
|
||||||
|
BitMart 多周期K线数据抓取脚本
|
||||||
|
支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据
|
||||||
|
支持秒级价格数据(通过成交记录API)
|
||||||
|
支持断点续传,从数据库最新/最早记录继续抓取
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from loguru import logger
|
||||||
|
from peewee import *
|
||||||
|
from bitmart.api_contract import APIContract
|
||||||
|
|
||||||
|
# 数据库配置(使用脚本所在项目目录下的 models)
|
||||||
|
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
|
||||||
|
db = SqliteDatabase(str(DB_PATH))
|
||||||
|
|
||||||
|
# K线周期配置:step值 -> 表名后缀
|
||||||
|
KLINE_CONFIGS = {
|
||||||
|
1: '1m', # 1分钟
|
||||||
|
3: '3m', # 3分钟
|
||||||
|
5: '5m', # 5分钟
|
||||||
|
15: '15m', # 15分钟
|
||||||
|
30: '30m', # 30分钟
|
||||||
|
60: '1h', # 1小时
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class BitMartETHTrades(Model):
|
||||||
|
"""成交记录模型(秒级/毫秒级原始数据)"""
|
||||||
|
id = BigIntegerField(primary_key=True) # 成交ID
|
||||||
|
timestamp = BigIntegerField(index=True) # 成交时间戳(毫秒)
|
||||||
|
price = FloatField() # 成交价格
|
||||||
|
volume = FloatField() # 成交量
|
||||||
|
side = IntegerField() # 方向: 1=买, -1=卖
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_trades'
|
||||||
|
|
||||||
|
|
||||||
|
class BitMartETHSecond(Model):
|
||||||
|
"""秒级K线模型(由成交记录聚合而来)"""
|
||||||
|
id = BigIntegerField(primary_key=True) # 时间戳(毫秒,取整到秒)
|
||||||
|
open = FloatField(null=True)
|
||||||
|
high = FloatField(null=True)
|
||||||
|
low = FloatField(null=True)
|
||||||
|
close = FloatField(null=True)
|
||||||
|
volume = FloatField(null=True)
|
||||||
|
trade_count = IntegerField(null=True) # 该秒内成交笔数
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
database = db
|
||||||
|
table_name = 'bitmart_eth_1s'
|
||||||
|
|
||||||
|
|
||||||
|
def create_kline_model(step: int):
|
||||||
|
"""
|
||||||
|
动态创建K线数据模型
|
||||||
|
:param step: K线周期(分钟)
|
||||||
|
:return: Model类
|
||||||
|
"""
|
||||||
|
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||||||
|
tbl_name = f'bitmart_eth_{suffix}'
|
||||||
|
|
||||||
|
# 使用 type() 动态创建类,避免闭包问题
|
||||||
|
attrs = {
|
||||||
|
'id': BigIntegerField(primary_key=True),
|
||||||
|
'open': FloatField(null=True),
|
||||||
|
'high': FloatField(null=True),
|
||||||
|
'low': FloatField(null=True),
|
||||||
|
'close': FloatField(null=True),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 创建 Meta 类
|
||||||
|
meta_attrs = {
|
||||||
|
'database': db,
|
||||||
|
'table_name': tbl_name,
|
||||||
|
}
|
||||||
|
Meta = type('Meta', (), meta_attrs)
|
||||||
|
attrs['Meta'] = Meta
|
||||||
|
|
||||||
|
# 动态创建 Model 类
|
||||||
|
model_name = f'BitMartETH{suffix.upper()}'
|
||||||
|
KlineModel = type(model_name, (Model,), attrs)
|
||||||
|
|
||||||
|
return KlineModel
|
||||||
|
|
||||||
|
|
||||||
|
class BitMartMultiKlineCollector:
|
||||||
|
"""多周期K线数据抓取器"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
|
||||||
|
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
|
||||||
|
self.memo = "数据抓取"
|
||||||
|
self.contract_symbol = "ETHUSDT"
|
||||||
|
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
|
||||||
|
|
||||||
|
# 存储各周期的模型
|
||||||
|
self.models = {}
|
||||||
|
|
||||||
|
# 初始化数据库连接和表
|
||||||
|
self._init_database()
|
||||||
|
|
||||||
|
def _init_database(self):
|
||||||
|
"""初始化数据库,创建所有周期的表"""
|
||||||
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
db.connect(reuse_if_open=True)
|
||||||
|
|
||||||
|
for step in KLINE_CONFIGS.keys():
|
||||||
|
model = create_kline_model(step)
|
||||||
|
self.models[step] = model
|
||||||
|
# 创建表(如果不存在)
|
||||||
|
db.create_tables([model], safe=True)
|
||||||
|
logger.info(f"初始化表: {model._meta.table_name}")
|
||||||
|
|
||||||
|
# 创建成交记录表和秒级K线表
|
||||||
|
db.create_tables([BitMartETHTrades, BitMartETHSecond], safe=True)
|
||||||
|
logger.info(f"初始化表: bitmart_eth_trades (成交记录)")
|
||||||
|
logger.info(f"初始化表: bitmart_eth_1s (秒级K线)")
|
||||||
|
|
||||||
|
def get_db_time_range(self, step: int):
|
||||||
|
"""
|
||||||
|
获取数据库中已有数据的时间范围
|
||||||
|
:param step: K线周期
|
||||||
|
:return: (earliest_ts, latest_ts) 毫秒时间戳,无数据返回 (None, None)
|
||||||
|
"""
|
||||||
|
model = self.models.get(step)
|
||||||
|
if not model:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 获取最早记录
|
||||||
|
earliest = model.select(fn.MIN(model.id)).scalar()
|
||||||
|
# 获取最新记录
|
||||||
|
latest = model.select(fn.MAX(model.id)).scalar()
|
||||||
|
return earliest, latest
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询数据库时间范围异常: {e}")
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def get_klines(self, step: int, start_time: int, end_time: int, max_retries: int = 3):
|
||||||
|
"""
|
||||||
|
获取K线数据(带重试)
|
||||||
|
:param step: K线周期(分钟)
|
||||||
|
:param start_time: 开始时间戳(秒级)
|
||||||
|
:param end_time: 结束时间戳(秒级)
|
||||||
|
:param max_retries: 最大重试次数
|
||||||
|
:return: K线数据列表
|
||||||
|
"""
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
start_time = int(start_time)
|
||||||
|
end_time = int(end_time)
|
||||||
|
|
||||||
|
response = self.contractAPI.get_kline(
|
||||||
|
contract_symbol=self.contract_symbol,
|
||||||
|
step=step,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
if response['code'] != 1000:
|
||||||
|
logger.warning(f"API返回错误 (尝试 {attempt+1}/{max_retries}): {response}")
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(1)
|
||||||
|
continue
|
||||||
|
return []
|
||||||
|
|
||||||
|
klines = response.get('data', [])
|
||||||
|
formatted = []
|
||||||
|
for k in klines:
|
||||||
|
timestamp_ms = int(k["timestamp"]) * 1000
|
||||||
|
formatted.append({
|
||||||
|
'id': timestamp_ms,
|
||||||
|
'open': float(k["open_price"]),
|
||||||
|
'high': float(k["high_price"]),
|
||||||
|
'low': float(k["low_price"]),
|
||||||
|
'close': float(k["close_price"])
|
||||||
|
})
|
||||||
|
|
||||||
|
formatted.sort(key=lambda x: x['id'])
|
||||||
|
return formatted
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"获取K线异常 (尝试 {attempt+1}/{max_retries}): {e}")
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(2)
|
||||||
|
continue
|
||||||
|
return []
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def save_klines(self, step: int, klines: list):
|
||||||
|
"""
|
||||||
|
保存K线数据到数据库
|
||||||
|
:param step: K线周期
|
||||||
|
:param klines: K线数据列表
|
||||||
|
:return: 新保存的数量
|
||||||
|
"""
|
||||||
|
model = self.models.get(step)
|
||||||
|
if not model:
|
||||||
|
logger.error(f"未找到 {step}分钟 的数据模型")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
new_count = 0
|
||||||
|
for kline in klines:
|
||||||
|
try:
|
||||||
|
_, created = model.get_or_create(
|
||||||
|
id=kline['id'],
|
||||||
|
defaults={
|
||||||
|
'open': kline['open'],
|
||||||
|
'high': kline['high'],
|
||||||
|
'low': kline['low'],
|
||||||
|
'close': kline['close'],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if created:
|
||||||
|
new_count += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
|
||||||
|
|
||||||
|
return new_count
|
||||||
|
|
||||||
|
def get_batch_seconds(self, step: int):
|
||||||
|
"""根据周期获取合适的批次大小"""
|
||||||
|
if step == 1:
|
||||||
|
return 3600 * 4 # 1分钟: 每次4小时
|
||||||
|
elif step == 3:
|
||||||
|
return 3600 * 8 # 3分钟: 每次8小时
|
||||||
|
elif step == 5:
|
||||||
|
return 3600 * 12 # 5分钟: 每次12小时
|
||||||
|
elif step == 15:
|
||||||
|
return 3600 * 24 # 15分钟: 每次1天
|
||||||
|
elif step == 30:
|
||||||
|
return 3600 * 48 # 30分钟: 每次2天
|
||||||
|
else:
|
||||||
|
return 3600 * 72 # 1小时: 每次3天
|
||||||
|
|
||||||
|
def collect_period_range(self, step: int, target_start: int, target_end: int):
|
||||||
|
"""
|
||||||
|
抓取指定时间范围的K线数据(支持断点续传)
|
||||||
|
:param step: K线周期(分钟)
|
||||||
|
:param target_start: 目标开始时间戳(秒)
|
||||||
|
:param target_end: 目标结束时间戳(秒)
|
||||||
|
:return: 保存的总数量
|
||||||
|
"""
|
||||||
|
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||||||
|
batch_seconds = self.get_batch_seconds(step)
|
||||||
|
|
||||||
|
# 获取数据库已有数据范围
|
||||||
|
db_earliest, db_latest = self.get_db_time_range(step)
|
||||||
|
|
||||||
|
if db_earliest and db_latest:
|
||||||
|
db_earliest_sec = db_earliest // 1000
|
||||||
|
db_latest_sec = db_latest // 1000
|
||||||
|
logger.info(f"[{suffix}] 数据库已有数据: "
|
||||||
|
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_earliest_sec))} ~ "
|
||||||
|
f"{time.strftime('%Y-%m-%d %H:%M', time.localtime(db_latest_sec))}")
|
||||||
|
else:
|
||||||
|
db_earliest_sec = None
|
||||||
|
db_latest_sec = None
|
||||||
|
logger.info(f"[{suffix}] 数据库暂无数据")
|
||||||
|
|
||||||
|
total_saved = 0
|
||||||
|
|
||||||
|
# === 第一阶段:向前抓取历史数据(从数据库最早记录向前,直到 target_start)===
|
||||||
|
if db_earliest_sec:
|
||||||
|
backward_end = db_earliest_sec
|
||||||
|
else:
|
||||||
|
backward_end = target_end
|
||||||
|
|
||||||
|
if backward_end > target_start:
|
||||||
|
logger.info(f"[{suffix}] === 开始向前抓取历史数据 ===")
|
||||||
|
total_backward = backward_end - target_start
|
||||||
|
|
||||||
|
current_end = backward_end
|
||||||
|
fail_count = 0
|
||||||
|
max_fail = 5
|
||||||
|
|
||||||
|
while current_end > target_start and fail_count < max_fail:
|
||||||
|
current_start = max(current_end - batch_seconds, target_start)
|
||||||
|
|
||||||
|
# 计算进度
|
||||||
|
progress = (backward_end - current_end) / total_backward * 100 if total_backward > 0 else 0
|
||||||
|
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
|
||||||
|
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
|
||||||
|
|
||||||
|
klines = self.get_klines(step, current_start, current_end)
|
||||||
|
if klines:
|
||||||
|
saved = self.save_klines(step, klines)
|
||||||
|
total_saved += saved
|
||||||
|
logger.info(f"[{suffix}] ← 历史 {start_str} ~ {end_str} | "
|
||||||
|
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
|
||||||
|
fail_count = 0
|
||||||
|
else:
|
||||||
|
fail_count += 1
|
||||||
|
logger.warning(f"[{suffix}] ← 历史 {start_str} 无数据 (连续失败 {fail_count}/{max_fail})")
|
||||||
|
if fail_count >= max_fail:
|
||||||
|
earliest_date = time.strftime('%Y-%m-%d', time.localtime(current_end))
|
||||||
|
logger.warning(f"[{suffix}] 已达到API历史数据限制,最早可获取: {earliest_date}")
|
||||||
|
break
|
||||||
|
|
||||||
|
current_end = current_start
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
# === 第二阶段:向后抓取最新数据(从数据库最新记录向后,直到 target_end)===
|
||||||
|
if db_latest_sec:
|
||||||
|
forward_start = db_latest_sec
|
||||||
|
else:
|
||||||
|
# 如果没有数据,从第一阶段结束的地方开始
|
||||||
|
forward_start = target_start
|
||||||
|
|
||||||
|
if forward_start < target_end:
|
||||||
|
logger.info(f"[{suffix}] === 开始向后抓取最新数据 ===")
|
||||||
|
total_forward = target_end - forward_start
|
||||||
|
|
||||||
|
current_start = forward_start
|
||||||
|
fail_count = 0
|
||||||
|
max_fail = 3
|
||||||
|
|
||||||
|
while current_start < target_end and fail_count < max_fail:
|
||||||
|
current_end = min(current_start + batch_seconds, target_end)
|
||||||
|
|
||||||
|
# 计算进度
|
||||||
|
progress = (current_start - forward_start) / total_forward * 100 if total_forward > 0 else 0
|
||||||
|
start_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_start))
|
||||||
|
end_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(current_end))
|
||||||
|
|
||||||
|
klines = self.get_klines(step, current_start, current_end)
|
||||||
|
if klines:
|
||||||
|
saved = self.save_klines(step, klines)
|
||||||
|
total_saved += saved
|
||||||
|
logger.info(f"[{suffix}] → 最新 {start_str} ~ {end_str} | "
|
||||||
|
f"获取 {len(klines)} 条, 新增 {saved} 条 | 进度 {progress:.1f}%")
|
||||||
|
fail_count = 0
|
||||||
|
else:
|
||||||
|
fail_count += 1
|
||||||
|
logger.warning(f"[{suffix}] → 最新 {start_str} 无数据 (失败 {fail_count}/{max_fail})")
|
||||||
|
|
||||||
|
current_start = current_end
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
# 统计最终数据范围
|
||||||
|
final_earliest, final_latest = self.get_db_time_range(step)
|
||||||
|
if final_earliest and final_latest:
|
||||||
|
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条 | 数据范围: "
|
||||||
|
f"{time.strftime('%Y-%m-%d', time.localtime(final_earliest//1000))} ~ "
|
||||||
|
f"{time.strftime('%Y-%m-%d', time.localtime(final_latest//1000))}")
|
||||||
|
else:
|
||||||
|
logger.success(f"[{suffix}] 抓取完成!本次新增 {total_saved} 条")
|
||||||
|
|
||||||
|
return total_saved
|
||||||
|
|
||||||
|
def collect_from_date(self, start_date: str, periods: list = None):
|
||||||
|
"""
|
||||||
|
从指定日期抓取到当前时间
|
||||||
|
:param start_date: 起始日期 'YYYY-MM-DD'
|
||||||
|
:param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部
|
||||||
|
"""
|
||||||
|
if periods is None:
|
||||||
|
periods = list(KLINE_CONFIGS.keys())
|
||||||
|
|
||||||
|
# 计算时间范围
|
||||||
|
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
|
||||||
|
target_start = int(start_dt.timestamp())
|
||||||
|
target_end = int(time.time())
|
||||||
|
|
||||||
|
start_str = start_dt.strftime('%Y-%m-%d')
|
||||||
|
end_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
|
||||||
|
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
logger.info(f"目标时间范围: {start_str} ~ {end_str}")
|
||||||
|
logger.info(f"抓取周期: {[KLINE_CONFIGS[p] for p in periods]}")
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
for step in periods:
|
||||||
|
if step not in KLINE_CONFIGS:
|
||||||
|
logger.warning(f"不支持的周期: {step}分钟,跳过")
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(f"\n{'='*60}")
|
||||||
|
logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线")
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
saved = self.collect_period_range(step, target_start, target_end)
|
||||||
|
results[KLINE_CONFIGS[step]] = saved
|
||||||
|
|
||||||
|
time.sleep(1) # 不同周期之间间隔
|
||||||
|
|
||||||
|
# 打印总结
|
||||||
|
logger.info(f"\n{'='*60}")
|
||||||
|
logger.info("所有周期抓取完成!统计:")
|
||||||
|
for period, count in results.items():
|
||||||
|
logger.info(f" {period}: 新增 {count} 条")
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def get_stats(self):
|
||||||
|
"""获取各周期数据统计"""
|
||||||
|
logger.info(f"\n{'='*60}")
|
||||||
|
logger.info("数据库统计:")
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
for step, model in self.models.items():
|
||||||
|
suffix = KLINE_CONFIGS.get(step, f'{step}m')
|
||||||
|
try:
|
||||||
|
count = model.select().count()
|
||||||
|
earliest, latest = self.get_db_time_range(step)
|
||||||
|
if earliest and latest:
|
||||||
|
earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest//1000))
|
||||||
|
latest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(latest//1000))
|
||||||
|
logger.info(f" {suffix:>4}: {count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||||||
|
else:
|
||||||
|
logger.info(f" {suffix:>4}: {count:>8} 条")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f" {suffix}: 查询失败 - {e}")
|
||||||
|
|
||||||
|
# 成交记录统计
|
||||||
|
try:
|
||||||
|
trades_count = BitMartETHTrades.select().count()
|
||||||
|
if trades_count > 0:
|
||||||
|
earliest_trade = BitMartETHTrades.select(fn.MIN(BitMartETHTrades.timestamp)).scalar()
|
||||||
|
latest_trade = BitMartETHTrades.select(fn.MAX(BitMartETHTrades.timestamp)).scalar()
|
||||||
|
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_trade//1000))
|
||||||
|
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_trade//1000))
|
||||||
|
logger.info(f"trades: {trades_count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||||||
|
else:
|
||||||
|
logger.info(f"trades: {trades_count:>8} 条")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"trades: 查询失败 - {e}")
|
||||||
|
|
||||||
|
# 秒级K线统计
|
||||||
|
try:
|
||||||
|
second_count = BitMartETHSecond.select().count()
|
||||||
|
if second_count > 0:
|
||||||
|
earliest_sec = BitMartETHSecond.select(fn.MIN(BitMartETHSecond.id)).scalar()
|
||||||
|
latest_sec = BitMartETHSecond.select(fn.MAX(BitMartETHSecond.id)).scalar()
|
||||||
|
earliest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(earliest_sec//1000))
|
||||||
|
latest_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(latest_sec//1000))
|
||||||
|
logger.info(f" 1s: {second_count:>8} 条 | {earliest_str} ~ {latest_str}")
|
||||||
|
else:
|
||||||
|
logger.info(f" 1s: {second_count:>8} 条")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f" 1s: 查询失败 - {e}")
|
||||||
|
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
# ==================== 秒级数据相关方法 ====================
|
||||||
|
|
||||||
|
def get_trades(self, limit: int = 100):
|
||||||
|
"""
|
||||||
|
获取最近成交记录
|
||||||
|
:param limit: 获取条数
|
||||||
|
:return: 成交记录列表
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
response = self.contractAPI.get_trades(
|
||||||
|
contract_symbol=self.contract_symbol,
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
if response['code'] != 1000:
|
||||||
|
logger.error(f"获取成交记录失败: {response}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
trades = response.get('data', {}).get('trades', [])
|
||||||
|
formatted = []
|
||||||
|
for t in trades:
|
||||||
|
formatted.append({
|
||||||
|
'id': int(t.get('trade_id', 0)),
|
||||||
|
'timestamp': int(t.get('create_time', 0)),
|
||||||
|
'price': float(t.get('deal_price', 0)),
|
||||||
|
'volume': float(t.get('deal_vol', 0)),
|
||||||
|
'side': int(t.get('way', 0)),
|
||||||
|
})
|
||||||
|
|
||||||
|
return formatted
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"获取成交记录异常: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def save_trades(self, trades: list):
|
||||||
|
"""保存成交记录到数据库"""
|
||||||
|
new_count = 0
|
||||||
|
for trade in trades:
|
||||||
|
try:
|
||||||
|
_, created = BitMartETHTrades.get_or_create(
|
||||||
|
id=trade['id'],
|
||||||
|
defaults={
|
||||||
|
'timestamp': trade['timestamp'],
|
||||||
|
'price': trade['price'],
|
||||||
|
'volume': trade['volume'],
|
||||||
|
'side': trade['side'],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if created:
|
||||||
|
new_count += 1
|
||||||
|
except Exception as e:
|
||||||
|
pass # 忽略重复数据
|
||||||
|
return new_count
|
||||||
|
|
||||||
|
def collect_trades_realtime(self, duration_seconds: int = 3600, interval: float = 0.3):
|
||||||
|
"""
|
||||||
|
实时持续采集成交记录(秒级数据源)
|
||||||
|
:param duration_seconds: 采集时长(秒),默认1小时
|
||||||
|
:param interval: 采集间隔(秒),默认0.3秒
|
||||||
|
"""
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
logger.info(f"开始实时采集成交记录")
|
||||||
|
logger.info(f"时长: {duration_seconds}秒 ({duration_seconds/3600:.1f}小时)")
|
||||||
|
logger.info(f"间隔: {interval}秒")
|
||||||
|
logger.info(f"{'='*60}")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
end_time = start_time + duration_seconds
|
||||||
|
total_saved = 0
|
||||||
|
batch_count = 0
|
||||||
|
|
||||||
|
while time.time() < end_time:
|
||||||
|
trades = self.get_trades(limit=100)
|
||||||
|
if trades:
|
||||||
|
saved = self.save_trades(trades)
|
||||||
|
total_saved += saved
|
||||||
|
batch_count += 1
|
||||||
|
|
||||||
|
# 每10批显示一次进度
|
||||||
|
if batch_count % 10 == 0:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
remaining = end_time - time.time()
|
||||||
|
latest = trades[-1]
|
||||||
|
ts_str = datetime.datetime.fromtimestamp(
|
||||||
|
latest['timestamp']/1000
|
||||||
|
).strftime('%H:%M:%S')
|
||||||
|
logger.info(f"[{ts_str}] 价格: {latest['price']:.2f} | "
|
||||||
|
f"本批新增: {saved} | 累计: {total_saved} | "
|
||||||
|
f"剩余: {remaining/60:.1f}分钟")
|
||||||
|
|
||||||
|
time.sleep(interval)
|
||||||
|
|
||||||
|
logger.success(f"采集完成!共新增 {total_saved} 条成交记录")
|
||||||
|
|
||||||
|
# 自动聚合为秒级K线
|
||||||
|
logger.info("正在将成交记录聚合为秒级K线...")
|
||||||
|
self.aggregate_trades_to_seconds()
|
||||||
|
|
||||||
|
return total_saved
|
||||||
|
|
||||||
|
def aggregate_trades_to_seconds(self, start_ts: int = None, end_ts: int = None):
|
||||||
|
"""
|
||||||
|
将成交记录聚合为秒级K线数据
|
||||||
|
:param start_ts: 开始时间戳(毫秒),默认全部
|
||||||
|
:param end_ts: 结束时间戳(毫秒),默认全部
|
||||||
|
:return: 聚合的秒级K线数量
|
||||||
|
"""
|
||||||
|
# 构建查询
|
||||||
|
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||||||
|
if start_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||||||
|
if end_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||||||
|
|
||||||
|
# 按秒聚合
|
||||||
|
second_data = {}
|
||||||
|
trade_count = 0
|
||||||
|
|
||||||
|
for trade in query:
|
||||||
|
trade_count += 1
|
||||||
|
# 取整到秒(毫秒时间戳)
|
||||||
|
second_ts = (trade.timestamp // 1000) * 1000
|
||||||
|
|
||||||
|
if second_ts not in second_data:
|
||||||
|
second_data[second_ts] = {
|
||||||
|
'open': trade.price,
|
||||||
|
'high': trade.price,
|
||||||
|
'low': trade.price,
|
||||||
|
'close': trade.price,
|
||||||
|
'volume': trade.volume,
|
||||||
|
'trade_count': 1
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
second_data[second_ts]['high'] = max(second_data[second_ts]['high'], trade.price)
|
||||||
|
second_data[second_ts]['low'] = min(second_data[second_ts]['low'], trade.price)
|
||||||
|
second_data[second_ts]['close'] = trade.price
|
||||||
|
second_data[second_ts]['volume'] += trade.volume
|
||||||
|
second_data[second_ts]['trade_count'] += 1
|
||||||
|
|
||||||
|
# 保存到数据库
|
||||||
|
saved_count = 0
|
||||||
|
for ts, ohlc in second_data.items():
|
||||||
|
try:
|
||||||
|
BitMartETHSecond.insert(
|
||||||
|
id=ts,
|
||||||
|
open=ohlc['open'],
|
||||||
|
high=ohlc['high'],
|
||||||
|
low=ohlc['low'],
|
||||||
|
close=ohlc['close'],
|
||||||
|
volume=ohlc['volume'],
|
||||||
|
trade_count=ohlc['trade_count']
|
||||||
|
).on_conflict(
|
||||||
|
conflict_target=[BitMartETHSecond.id],
|
||||||
|
update={
|
||||||
|
BitMartETHSecond.open: ohlc['open'],
|
||||||
|
BitMartETHSecond.high: ohlc['high'],
|
||||||
|
BitMartETHSecond.low: ohlc['low'],
|
||||||
|
BitMartETHSecond.close: ohlc['close'],
|
||||||
|
BitMartETHSecond.volume: ohlc['volume'],
|
||||||
|
BitMartETHSecond.trade_count: ohlc['trade_count'],
|
||||||
|
}
|
||||||
|
).execute()
|
||||||
|
saved_count += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"保存秒级K线失败 {ts}: {e}")
|
||||||
|
|
||||||
|
logger.success(f"聚合完成!{trade_count} 条成交记录 → {saved_count} 条秒级K线")
|
||||||
|
return saved_count
|
||||||
|
|
||||||
|
def get_second_klines(self, start_ts: int = None, end_ts: int = None):
|
||||||
|
"""
|
||||||
|
获取秒级K线数据
|
||||||
|
:param start_ts: 开始时间戳(毫秒)
|
||||||
|
:param end_ts: 结束时间戳(毫秒)
|
||||||
|
:return: 秒级K线列表
|
||||||
|
"""
|
||||||
|
query = BitMartETHSecond.select().order_by(BitMartETHSecond.id)
|
||||||
|
if start_ts:
|
||||||
|
query = query.where(BitMartETHSecond.id >= start_ts)
|
||||||
|
if end_ts:
|
||||||
|
query = query.where(BitMartETHSecond.id <= end_ts)
|
||||||
|
|
||||||
|
return [{
|
||||||
|
'timestamp': k.id,
|
||||||
|
'open': k.open,
|
||||||
|
'high': k.high,
|
||||||
|
'low': k.low,
|
||||||
|
'close': k.close,
|
||||||
|
'volume': k.volume,
|
||||||
|
'trade_count': k.trade_count
|
||||||
|
} for k in query]
|
||||||
|
|
||||||
|
def aggregate_trades_custom(self, interval_ms: int = 100, start_ts: int = None, end_ts: int = None):
|
||||||
|
"""
|
||||||
|
将成交记录聚合为自定义毫秒级K线数据(不保存到数据库,直接返回)
|
||||||
|
:param interval_ms: 聚合周期(毫秒),如 100=100ms, 500=500ms, 1000=1秒
|
||||||
|
:param start_ts: 开始时间戳(毫秒)
|
||||||
|
:param end_ts: 结束时间戳(毫秒)
|
||||||
|
:return: K线列表 [{'timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count'}, ...]
|
||||||
|
"""
|
||||||
|
# 构建查询
|
||||||
|
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||||||
|
if start_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||||||
|
if end_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||||||
|
|
||||||
|
# 按指定间隔聚合
|
||||||
|
interval_data = {}
|
||||||
|
trade_count = 0
|
||||||
|
|
||||||
|
for trade in query:
|
||||||
|
trade_count += 1
|
||||||
|
# 取整到指定间隔
|
||||||
|
interval_ts = (trade.timestamp // interval_ms) * interval_ms
|
||||||
|
|
||||||
|
if interval_ts not in interval_data:
|
||||||
|
interval_data[interval_ts] = {
|
||||||
|
'open': trade.price,
|
||||||
|
'high': trade.price,
|
||||||
|
'low': trade.price,
|
||||||
|
'close': trade.price,
|
||||||
|
'volume': trade.volume,
|
||||||
|
'trade_count': 1
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
interval_data[interval_ts]['high'] = max(interval_data[interval_ts]['high'], trade.price)
|
||||||
|
interval_data[interval_ts]['low'] = min(interval_data[interval_ts]['low'], trade.price)
|
||||||
|
interval_data[interval_ts]['close'] = trade.price
|
||||||
|
interval_data[interval_ts]['volume'] += trade.volume
|
||||||
|
interval_data[interval_ts]['trade_count'] += 1
|
||||||
|
|
||||||
|
# 转换为列表
|
||||||
|
result = []
|
||||||
|
for ts, ohlc in sorted(interval_data.items()):
|
||||||
|
result.append({
|
||||||
|
'timestamp': ts,
|
||||||
|
'datetime': datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
||||||
|
'open': ohlc['open'],
|
||||||
|
'high': ohlc['high'],
|
||||||
|
'low': ohlc['low'],
|
||||||
|
'close': ohlc['close'],
|
||||||
|
'volume': ohlc['volume'],
|
||||||
|
'trade_count': ohlc['trade_count']
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(f"聚合完成: {trade_count} 条成交记录 → {len(result)} 条 {interval_ms}ms K线")
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_raw_trades(self, start_ts: int = None, end_ts: int = None, limit: int = None):
|
||||||
|
"""
|
||||||
|
获取原始成交记录(逐笔数据,毫秒级)
|
||||||
|
:param start_ts: 开始时间戳(毫秒)
|
||||||
|
:param end_ts: 结束时间戳(毫秒)
|
||||||
|
:param limit: 最大返回条数
|
||||||
|
:return: 成交记录列表
|
||||||
|
"""
|
||||||
|
query = BitMartETHTrades.select().order_by(BitMartETHTrades.timestamp)
|
||||||
|
if start_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp >= start_ts)
|
||||||
|
if end_ts:
|
||||||
|
query = query.where(BitMartETHTrades.timestamp <= end_ts)
|
||||||
|
if limit:
|
||||||
|
query = query.limit(limit)
|
||||||
|
|
||||||
|
return [{
|
||||||
|
'id': t.id,
|
||||||
|
'timestamp': t.timestamp,
|
||||||
|
'datetime': datetime.datetime.fromtimestamp(t.timestamp/1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
||||||
|
'price': t.price,
|
||||||
|
'volume': t.volume,
|
||||||
|
'side': '买' if t.side == 1 else '卖'
|
||||||
|
} for t in query]
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""关闭数据库连接"""
|
||||||
|
if not db.is_closed():
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
collector = BitMartMultiKlineCollector()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 查看当前数据统计
|
||||||
|
collector.get_stats()
|
||||||
|
|
||||||
|
# ============ 选择要执行的任务 ============
|
||||||
|
|
||||||
|
# 任务1: 抓取K线数据(1分钟~1小时周期)
|
||||||
|
# 从 2025-01-01 抓取到当前时间(支持断点续传)
|
||||||
|
collector.collect_from_date(
|
||||||
|
start_date='2010-01-01',
|
||||||
|
periods=[1, 3, 5, 15, 30, 60] # 所有周期
|
||||||
|
)
|
||||||
|
|
||||||
|
# 任务2: 实时采集秒级数据(成交记录)
|
||||||
|
# 注意: 秒级数据只能实时采集,无法获取历史
|
||||||
|
# collector.collect_trades_realtime(
|
||||||
|
# duration_seconds=3600, # 采集1小时
|
||||||
|
# interval=0.3 # 每0.3秒请求一次
|
||||||
|
# )
|
||||||
|
|
||||||
|
# 任务3: 将已采集的成交记录聚合为秒级K线
|
||||||
|
# collector.aggregate_trades_to_seconds()
|
||||||
|
|
||||||
|
# 再次查看统计
|
||||||
|
collector.get_stats()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
collector.close()
|
||||||
Reference in New Issue
Block a user