This commit is contained in:
27942
2026-01-04 18:24:42 +08:00
parent 123dbf4649
commit 0af8dc562b
6 changed files with 323 additions and 76 deletions

View File

@@ -0,0 +1,150 @@
"""
BitMart 30分钟K线数据抓取脚本
从 BitMart API 获取30分钟K线数据并存储到数据库
"""
import time
from loguru import logger
from bitmart.api_contract import APIContract
from models.bitmart import BitMart30
class BitMartDataCollector:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
def get_klines(self, start_time=None, end_time=None, limit=200):
"""
获取K线数据
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:param limit: 获取数量限制
:return: K线数据列表
"""
try:
if not end_time:
end_time = int(time.time())
if not start_time:
start_time = end_time - 3600 * 24 * 7 # 默认获取最近7天
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.error(f"获取K线失败: {response}")
return []
klines = response.get('data', [])
formatted = []
for k in klines:
# BitMart API 返回的时间戳是秒级,需要转换为毫秒级
# 根据 bitmart/框架.py 中的使用方式API返回的是秒级时间戳
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
# 按时间戳排序
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
return []
def save_klines(self, klines):
"""
保存K线数据到数据库
:param klines: K线数据列表
:return: 保存的数量
"""
saved_count = 0
for kline in klines:
try:
BitMart30.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
saved_count += 1
except Exception as e:
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
return saved_count
def collect_historical_data(self, days=30):
"""
抓取历史数据
:param days: 抓取最近多少天的数据
"""
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的30分钟K线数据")
end_time = int(time.time())
start_time = end_time - 3600 * 24 * days
# 分批获取每次获取7天的数据
batch_days = 7
total_saved = 0
current_start = start_time
while current_start < end_time:
current_end = min(current_start + 3600 * 24 * batch_days, end_time)
logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}")
klines = self.get_klines(start_time=current_start, end_time=current_end)
if klines:
saved = self.save_klines(klines)
total_saved += saved
logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved}")
else:
logger.warning("本批次未获取到数据")
current_start = current_end
time.sleep(1) # 避免请求过快
logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据")
def collect_realtime_data(self):
"""
实时抓取最新数据(用于定时任务)
"""
logger.info("开始抓取 BitMart 最新30分钟K线数据")
# 获取最近1小时的数据确保能获取到最新的K线
end_time = int(time.time())
start_time = end_time - 3600 * 2 # 最近2小时
klines = self.get_klines(start_time=start_time, end_time=end_time)
if klines:
saved = self.save_klines(klines)
logger.success(f"保存 {saved} 条最新K线数据")
else:
logger.warning("未获取到最新数据")
if __name__ == '__main__':
collector = BitMartDataCollector()
# 抓取最近30天的历史数据
collector.collect_historical_data(days=30)
# 如果需要实时抓取,可以取消下面的注释
# collector.collect_realtime_data()

21
models/bitmart.py Normal file
View File

@@ -0,0 +1,21 @@
from peewee import *
from models import db
class BitMart30(Model):
id = IntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_30'
# 连接到数据库
db.connect()
# 创建表(如果表不存在)
db.create_tables([BitMart30])

Binary file not shown.

View File

@@ -6,26 +6,24 @@ from models.weex import Weex1, Weex1Hour, Weex15, Weex30
if __name__ == '__main__':
# bit_port = openBrowser(id="8dcb4f744cf64ab190e465e153088515")
bit_port = openBrowser(id="f2320f57e24c45529a009e1541e25961")
response = requests.post(
f"http://127.0.0.1:50326/api/browser/start",
json={"envId": 146473},
headers={
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
)
# response = requests.post(
# f"http://127.0.0.1:50326/api/browser/start",
# json={"envId": 146473},
# headers={
# "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
# "Content-Type": "application/json"
# }
# )
co = ChromiumOptions()
co.set_local_port(port=response.json()["data"]["port"])
co.set_local_port(port=bit_port)
page = ChromiumPage(addr_or_opts=co)
page.set.window.max()
page.listen.start("public/quote/v1/getKlineV2")
page.listen.start("public/quote/v1/getKlineV2")
page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT")

Binary file not shown.

View File

@@ -1,5 +1,23 @@
"""
量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版
量化交易回测系统 - 30分钟K线策略回测Weex数据源
========== 策略规则 ==========
重要所有开仓和平仓操作都在下一根K线的开盘价执行
1. 开仓条件信号出现时下一根K线开盘价开仓
- 阳包阴(涨包跌):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价
-> 下一根K线开盘价开多
- 阴包阳(跌包涨):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价
-> 下一根K线开盘价开空
2. 平仓条件所有平仓都在下一根K线开盘价执行
- 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓
- 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓
- 遇到反向信号下一根K线开盘价平仓并反手开仓
3. 续持条件:
- 遇到同向信号:续持
- 未满足平仓条件:续持
"""
import datetime
@@ -23,9 +41,17 @@ def is_bearish(c): # 阴线
def check_signal(prev, curr):
"""
包住形态信号判定(优化版):
只看两种信号:
- 跌包涨(前涨后跌):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价 -> 做空
- 涨包跌(前跌后涨):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价 -> 做
只看两种信号,严格按照收盘价与开盘价的比较
1. 跌包涨(前涨后跌)-> 做空:
- 前一根是涨阳线close > open
- 后一根是跌阴线close < open
- 且:跌的收盘价 < 涨的开盘价curr['close'] < prev['open']
2. 涨包跌(前跌后涨)-> 做多:
- 前一根是跌阴线close < open
- 后一根是涨阳线close > open
- 且:涨的收盘价 > 跌的开盘价curr['close'] > prev['open']
"""
p_open = float(prev['open'])
c_close = float(curr['close'])
@@ -42,18 +68,32 @@ def check_signal(prev, curr):
def get_data_by_date(model, date_str: str):
"""按天获取指定表的数据15分钟"""
"""
按天获取指定表的数据30分钟K线
数据格式:时间戳(毫秒级) 开盘价 最高价 最低价 收盘价
例如1767461400000 3106.68 3109.1 3106.22 3107.22
注意返回的数据已按时间戳id升序排序
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
# 将日期转换为毫秒级时间戳进行查询
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
# 查询时按时间戳升序排序
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# 确保数据已排序
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
@@ -77,82 +117,76 @@ def backtest_15m_trend_optimized(dates: List[str]):
trades: List[Dict] = []
current_position: Optional[Dict] = None # 开仓信息
consecutive_opposite_count = 0 # 连续反色K线计数
idx = 1
while idx < len(all_data) - 1:
prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1]
direction, signal_key = check_signal(prev, curr)
# 空仓 -> 碰到信号则开仓
if current_position is None and direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
idx += 1
continue
if current_position:
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 反向信号 -> 下一根开盘平仓 + 同价反手
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
# 空仓 -> 碰到信号则开仓下一根K线开盘价
if current_position is None:
if direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': exit_price,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
idx += 1
continue
idx += 1
continue
# 同向信号 -> 续持
if direction and direction == pos_dir:
idx += 1
continue
# 有仓位状态:检查平仓条件
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 单根反色K线 -> 判断后续是否能组成信号
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
can_peek = idx + 1 < len(all_data)
if can_peek:
lookahead_dir, _ = check_signal(curr, all_data[idx + 1])
if lookahead_dir is not None:
idx += 1
continue # 后续可组成信号,等待信号处理
# 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
# 否则按收盘价平
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
# 反手开
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
idx += 1
continue
# 2. 检查连续反色K线平仓条件下一根K线开盘价平仓
# 持有多单:检查是否连续两根阴线
if pos_dir == 'long' and is_bearish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阴线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
exit_price = float(next_bar['open'])
diff = exit_price - current_position['entry_price']
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(all_data[idx + 1]['id'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'direction': '做多',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
@@ -160,6 +194,50 @@ def backtest_15m_trend_optimized(dates: List[str]):
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阴线,续持
idx += 1
continue
# 持有空单:检查是否连续两根阳线
elif pos_dir == 'short' and is_bullish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阳线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
exit_price = float(next_bar['open'])
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阳线,续持
idx += 1
continue
# 3. 同向K线或同向信号 -> 续持,重置连续反色计数
if (pos_dir == 'long' and is_bullish(curr)) or (pos_dir == 'short' and is_bearish(curr)):
consecutive_opposite_count = 0 # 重置连续反色计数
# 同向信号 -> 续持
if direction and direction == pos_dir:
consecutive_opposite_count = 0 # 重置连续反色计数
idx += 1
continue
idx += 1
@@ -188,7 +266,7 @@ def backtest_15m_trend_optimized(dates: List[str]):
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
dates = []
for month in range(1, 11):
for month in range(1, 13):
# 获取该月的实际天数
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):