Files
lm_code/bitmart/抓取数据_30分钟.py
2026-01-27 16:45:08 +08:00

170 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 15分钟K线数据抓取脚本
从 BitMart API 获取15分钟K线数据并存储到数据库
"""
import time
from loguru import logger
from bitmart.api_contract import APIContract
from models.bitmart_15 import BitMart15
class BitMartDataCollector:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
def get_klines(self, start_time=None, end_time=None, limit=200):
"""
获取K线数据
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:param limit: 获取数量限制
:return: K线数据列表
"""
try:
if not end_time:
end_time = int(time.time())
if not start_time:
start_time = end_time - 3600 * 24 * 1 # 默认获取最近7天
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=15, # 15分钟
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.error(f"获取K线失败: {response}")
return []
klines = response.get('data', [])
formatted = []
for k in klines:
# BitMart API 返回的时间戳是秒级,需要转换为毫秒级
# 根据 bitmart/框架.py 中的使用方式API返回的是秒级时间戳
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
# 按时间戳排序
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
return []
def save_klines(self, klines):
"""
保存K线数据到数据库
:param klines: K线数据列表
:return: 保存的数量
"""
saved_count = 0
for kline in klines:
try:
BitMart15.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
saved_count += 1
except Exception as e:
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
return saved_count
def collect_historical_data(self, start_date=None, days=None):
"""
抓取历史数据(从指定日期到现在)
:param start_date: 起始日期字符串,格式 'YYYY-MM-DD',如 '2025-01-01'
:param days: 如果不指定 start_date则抓取最近多少天的数据
"""
import datetime
now = int(time.time())
if start_date:
# 解析起始日期
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start_time = int(start_dt.timestamp())
logger.info(f"开始抓取 BitMart {self.contract_symbol}{start_date} 到现在的15分钟K线数据")
elif days:
target_start_time = now - 3600 * 24 * days
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的15分钟K线数据")
else:
target_start_time = now - 3600 * 24 * 30 # 默认30天
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 30 天的15分钟K线数据")
# 分批获取每次获取5天的数据15分钟K线数据量较大
batch_days = 5
total_saved = 0
fail_count = 0
max_fail = 3 # 连续失败超过3次则停止
current_end = now
while current_end > target_start_time:
current_start = max(current_end - 3600 * 24 * batch_days, target_start_time)
logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}")
klines = self.get_klines(start_time=current_start, end_time=current_end)
if klines:
saved = self.save_klines(klines)
total_saved += saved
logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved}")
fail_count = 0 # 重置失败计数
else:
fail_count += 1
logger.warning(f"本批次未获取到数据 (连续失败 {fail_count} 次)")
if fail_count >= max_fail:
logger.error(f"连续 {max_fail} 次获取数据失败,可能已达到 API 历史数据限制,停止抓取")
break
current_end = current_start
time.sleep(1) # 避免请求过快
logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据")
def collect_realtime_data(self):
"""
实时抓取最新数据(用于定时任务)
"""
logger.info("开始抓取 BitMart 最新15分钟K线数据")
# 获取最近1小时的数据确保能获取到最新的K线
end_time = int(time.time())
start_time = end_time - 3600 * 2 # 最近2小时
klines = self.get_klines(start_time=start_time, end_time=end_time)
if klines:
saved = self.save_klines(klines)
logger.success(f"保存 {saved} 条最新K线数据")
else:
logger.warning("未获取到最新数据")
if __name__ == '__main__':
collector = BitMartDataCollector()
# 抓取从 2025-01-01 到现在的15分钟K线历史数据
collector.collect_historical_data(start_date='2025-01-01')
# 如果需要实时抓取,可以取消下面的注释
# collector.collect_realtime_data()