This commit is contained in:
27942
2026-01-27 16:45:08 +08:00
parent e48e11de45
commit c472a0c0c9
4 changed files with 346 additions and 19 deletions

29
bitmart/test_api.py Normal file
View File

@@ -0,0 +1,29 @@
"""测试 BitMart API K线获取"""
import time
from bitmart.api_contract import APIContract
api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
memo = "数据抓取"
contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15))
# 测试获取最近1小时的15分钟K线
end_time = int(time.time())
start_time = end_time - 3600 # 1小时前
print(f"当前时间戳: {end_time}")
print(f"开始时间戳: {start_time}")
print(f"当前时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
try:
response = contractAPI.get_kline(
contract_symbol="ETHUSDT",
step=15,
start_time=start_time,
end_time=end_time
)
print(f"\n响应: {response}")
except Exception as e:
print(f"\n错误: {e}")

View File

@@ -0,0 +1,279 @@
"""
BitMart 多周期K线数据抓取脚本
支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据
自动创建对应的数据库表
"""
import time
import datetime
from pathlib import Path
from loguru import logger
from peewee import *
from bitmart.api_contract import APIContract
# 数据库配置
DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# K线周期配置step值 -> 表名后缀
KLINE_CONFIGS = {
1: '1m', # 1分钟
3: '3m', # 3分钟
5: '5m', # 5分钟
15: '15m', # 15分钟
30: '30m', # 30分钟
60: '1h', # 1小时
}
def create_kline_model(step: int):
"""
动态创建K线数据模型
:param step: K线周期分钟
:return: Model类
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
tbl_name = f'bitmart_eth_{suffix}'
# 使用 type() 动态创建类,避免闭包问题
attrs = {
'id': BigIntegerField(primary_key=True),
'open': FloatField(null=True),
'high': FloatField(null=True),
'low': FloatField(null=True),
'close': FloatField(null=True),
}
# 创建 Meta 类
meta_attrs = {
'database': db,
'table_name': tbl_name,
}
Meta = type('Meta', (), meta_attrs)
attrs['Meta'] = Meta
# 动态创建 Model 类
model_name = f'BitMartETH{suffix.upper()}'
KlineModel = type(model_name, (Model,), attrs)
return KlineModel
class BitMartMultiKlineCollector:
"""多周期K线数据抓取器"""
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 存储各周期的模型
self.models = {}
# 初始化数据库连接和表
self._init_database()
def _init_database(self):
"""初始化数据库,创建所有周期的表"""
db.connect(reuse_if_open=True)
for step in KLINE_CONFIGS.keys():
model = create_kline_model(step)
self.models[step] = model
# 创建表(如果不存在)
db.create_tables([model], safe=True)
logger.info(f"初始化表: {model._meta.table_name}")
def get_klines(self, step: int, start_time: int, end_time: int):
"""
获取K线数据
:param step: K线周期分钟
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:return: K线数据列表
"""
try:
# 确保是整数
start_time = int(start_time)
end_time = int(end_time)
logger.debug(f"API请求: step={step}, start={start_time}, end={end_time}")
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=step,
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.error(f"获取 {step}分钟 K线失败: {response}")
return []
klines = response.get('data', [])
formatted = []
for k in klines:
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取 {step}分钟 K线异常: {e}")
return []
def save_klines(self, step: int, klines: list):
"""
保存K线数据到数据库
:param step: K线周期
:param klines: K线数据列表
:return: 保存的数量
"""
model = self.models.get(step)
if not model:
logger.error(f"未找到 {step}分钟 的数据模型")
return 0
saved_count = 0
for kline in klines:
try:
model.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
saved_count += 1
except Exception as e:
logger.error(f"保存 {step}分钟 K线数据失败 {kline['id']}: {e}")
return saved_count
def collect_single_period(self, step: int, start_date: str = None, days: int = None):
"""
抓取单个周期的历史数据从当前时间向前抓取直到遇到API限制
:param step: K线周期分钟
:param start_date: 起始日期 'YYYY-MM-DD'(目标,可能无法达到)
:param days: 抓取天数(目标,可能无法达到)
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
now = int(time.time())
if start_date:
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start_time = int(start_dt.timestamp())
logger.info(f"开始抓取 {suffix} K线数据: 目标从 {start_date} 开始(从现在向前抓取)")
elif days:
target_start_time = now - 3600 * 24 * days
logger.info(f"开始抓取 {suffix} K线数据: 目标最近 {days}")
else:
target_start_time = now - 3600 * 24 * 30
logger.info(f"开始抓取 {suffix} K线数据: 目标最近 30 天")
# 根据周期调整批次大小
if step <= 5:
batch_seconds = 3600 * 6 # 小周期每次6小时
elif step <= 30:
batch_seconds = 3600 * 24 # 中周期每次1天
else:
batch_seconds = 3600 * 24 * 3 # 大周期每次3天
total_saved = 0
fail_count = 0
max_fail = 3
# 从当前时间向前抓取
current_end = now
while current_end > target_start_time:
current_start = current_end - batch_seconds
# 打印时间范围
start_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))
end_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))
logger.info(f"[{suffix}] 抓取: {start_str} -> {end_str}")
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
logger.info(f"[{suffix}] 保存 {saved} 条,累计 {total_saved}")
fail_count = 0
else:
fail_count += 1
logger.warning(f"[{suffix}] 未获取到数据 (连续失败 {fail_count} 次)")
if fail_count >= max_fail:
earliest = time.strftime('%Y-%m-%d', time.localtime(current_end))
logger.warning(f"[{suffix}] 达到API历史数据限制最早可用数据约 {earliest}")
break
current_end = current_start
time.sleep(0.3) # API请求间隔
logger.success(f"[{suffix}] 抓取完成,共保存 {total_saved} 条数据")
return total_saved
def collect_all_periods(self, start_date: str = None, days: int = None,
periods: list = None):
"""
抓取所有周期的历史数据
:param start_date: 起始日期 'YYYY-MM-DD'
:param days: 抓取天数
:param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部
"""
if periods is None:
periods = list(KLINE_CONFIGS.keys())
logger.info(f"开始抓取多周期K线数据周期: {[KLINE_CONFIGS[p] for p in periods]}")
results = {}
for step in periods:
if step not in KLINE_CONFIGS:
logger.warning(f"不支持的周期: {step}分钟,跳过")
continue
logger.info(f"\n{'='*50}")
logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线")
logger.info(f"{'='*50}")
saved = self.collect_single_period(step, start_date, days)
results[KLINE_CONFIGS[step]] = saved
time.sleep(1) # 不同周期之间间隔
logger.info(f"\n{'='*50}")
logger.info("所有周期抓取完成!统计:")
for period, count in results.items():
logger.info(f" {period}: {count}")
logger.info(f"{'='*50}")
return results
def close(self):
"""关闭数据库连接"""
if not db.is_closed():
db.close()
if __name__ == '__main__':
collector = BitMartMultiKlineCollector()
try:
# 抓取尽可能多的历史数据从现在向前直到遇到API限制自动停止
# 目标2025-01-01但实际能抓取多少取决于 BitMart API 的历史数据限制
collector.collect_all_periods(
start_date='2025-01-01', # 目标起始日期
periods=[1, 3, 5, 15, 30, 60] # 所有周期
)
finally:
collector.close()

View File

@@ -1,6 +1,6 @@
"""
BitMart 30分钟K线数据抓取脚本
从 BitMart API 获取30分钟K线数据并存储到数据库
BitMart 15分钟K线数据抓取脚本
从 BitMart API 获取15分钟K线数据并存储到数据库
"""
import time
@@ -33,7 +33,7 @@ class BitMartDataCollector:
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=15, # 30分钟
step=15, # 15分钟
start_time=start_time,
end_time=end_time
)[0]
@@ -88,23 +88,37 @@ class BitMartDataCollector:
return saved_count
def collect_historical_data(self, days=30):
def collect_historical_data(self, start_date=None, days=None):
"""
抓取历史数据
:param days: 抓取最近多少天的数据
抓取历史数据(从指定日期到现在)
:param start_date: 起始日期字符串,格式 'YYYY-MM-DD',如 '2025-01-01'
:param days: 如果不指定 start_date则抓取最近多少天的数据
"""
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的30分钟K线数据")
import datetime
end_time = int(time.time())
start_time = end_time - 3600 * 24 * days
now = int(time.time())
# 分批获取每次获取7天的数据
batch_days = 7
if start_date:
# 解析起始日期
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start_time = int(start_dt.timestamp())
logger.info(f"开始抓取 BitMart {self.contract_symbol}{start_date} 到现在的15分钟K线数据")
elif days:
target_start_time = now - 3600 * 24 * days
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的15分钟K线数据")
else:
target_start_time = now - 3600 * 24 * 30 # 默认30天
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 30 天的15分钟K线数据")
# 分批获取每次获取5天的数据15分钟K线数据量较大
batch_days = 5
total_saved = 0
fail_count = 0
max_fail = 3 # 连续失败超过3次则停止
current_start = start_time
while current_start < end_time:
current_end = min(current_start + 3600 * 24 * batch_days, end_time)
current_end = now
while current_end > target_start_time:
current_start = max(current_end - 3600 * 24 * batch_days, target_start_time)
logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}")
@@ -114,10 +128,15 @@ class BitMartDataCollector:
saved = self.save_klines(klines)
total_saved += saved
logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved}")
fail_count = 0 # 重置失败计数
else:
logger.warning("本批次未获取到数据")
fail_count += 1
logger.warning(f"本批次未获取到数据 (连续失败 {fail_count} 次)")
if fail_count >= max_fail:
logger.error(f"连续 {max_fail} 次获取数据失败,可能已达到 API 历史数据限制,停止抓取")
break
current_start = current_end
current_end = current_start
time.sleep(1) # 避免请求过快
logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据")
@@ -126,7 +145,7 @@ class BitMartDataCollector:
"""
实时抓取最新数据(用于定时任务)
"""
logger.info("开始抓取 BitMart 最新30分钟K线数据")
logger.info("开始抓取 BitMart 最新15分钟K线数据")
# 获取最近1小时的数据确保能获取到最新的K线
end_time = int(time.time())
@@ -143,8 +162,8 @@ class BitMartDataCollector:
if __name__ == '__main__':
collector = BitMartDataCollector()
# 抓取最近30天的历史数据
collector.collect_historical_data(days=500)
# 抓取从 2025-01-01 到现在的15分钟K线历史数据
collector.collect_historical_data(start_date='2025-01-01')
# 如果需要实时抓取,可以取消下面的注释
# collector.collect_realtime_data()

Binary file not shown.