加入 weex

This commit is contained in:
ddrwode
2026-02-03 17:51:44 +08:00
parent ee9e7c29a8
commit 7c9ede446f
6 changed files with 0 additions and 3145 deletions

View File

@@ -1,745 +0,0 @@
"""
BitMart 五分之一回归策略回测(精准版)
使用3分钟K线周期计算触发价格1分钟K线判断触发顺序
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根3分钟K线同时触及做多和做空价格时
- 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发
- 这样可以更精准地还原真实交易场景
"""
import datetime
import calendar
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
from peewee import *
# 数据库配置
DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# ========================= 数据库模型 =========================
class BitMartETH1m(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BitMartETH3m(Model):
"""3分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
# 连接数据库
db.connect(reuse_if_open=True)
# ========================= 工具函数 =========================
def is_bullish(c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(prev):
"""
计算前一根K线实体的 1/5 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5实体
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5实体
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def get_3m_data_by_date(date_str: str) -> List[Dict]:
"""按天获取3分钟K线数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = BitMartETH3m.select().where(
BitMartETH3m.id.between(start_ts, end_ts)
).order_by(BitMartETH3m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_by_range(start_ts: int, end_ts: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:return: 1分钟K线数据列表
"""
query = BitMartETH1m.select().where(
BitMartETH1m.id.between(start_ts, end_ts - 1)
).order_by(BitMartETH1m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]:
"""
获取3分钟K线对应的3根1分钟K线
:param bar_3m: 3分钟K线数据
:return: 对应的1分钟K线数据列表最多3根
"""
start_ts = bar_3m['id']
end_ts = start_ts + 3 * 60 * 1000 # 3分钟后
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> str:
"""
使用1分钟K线精确判断在3分钟周期内是先触发做多还是做空
:param bars_1m: 3根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:return: 'long', 'short', 或 None
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
# 如果只触发了一个方向
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
# 如果两个方向都触发了在同一根1分钟K线内
if long_triggered and short_triggered:
# 根据开盘价判断:
# 如果开盘价更接近做空触发价,说明先往下走,先触发做空
# 如果开盘价更接近做多触发价,说明先往上走,先触发做多
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
if dist_to_short < dist_to_long:
return 'short'
else:
return 'long'
return None
def check_reverse_signal_in_first_minute(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float,
current_direction: str
) -> bool:
"""
检查反手信号是否在第一分钟触发
:param bars_1m: 3根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:param current_direction: 当前持仓方向 ('long''short')
:return: True 表示反手信号在第一分钟触发False 表示不是
"""
if not bars_1m:
return False
# 只检查第一分钟K线
first_bar = bars_1m[0]
high = float(first_bar['high'])
low = float(first_bar['low'])
# 如果当前是多仓,检查空信号是否在第一分钟触发
if current_direction == 'long':
return low <= short_trigger
# 如果当前是空仓,检查多信号是否在第一分钟触发
if current_direction == 'short':
return high >= long_trigger
return False
def get_body_percent(candle) -> float:
"""
计算K线实体占价格的百分比
:param candle: K线数据
:return: 实体百分比如0.1表示0.1%
"""
body = abs(float(candle['open']) - float(candle['close']))
price = (float(candle['open']) + float(candle['close'])) / 2
if price == 0:
return 0
return (body / price) * 100
def check_breakout_reverse_signal(
all_data_3m: List[Dict],
current_idx: int,
current_position_direction: str,
min_body_percent: float = 0.1
) -> tuple:
"""
检查"突破上一根K线高低点"的反手信号
规则:
- 持多单时当前K线跌破上一根K线最低点 → 反手开空
条件上一根K线是阴线且实体>0.1%波动
- 持空单时当前K线涨破上一根K线最高点 → 反手开多
条件上一根K线是阳线且实体>0.1%波动
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param current_position_direction: 当前持仓方向 ('long''short')
:param min_body_percent: 最小实体百分比默认0.1%
:return: (方向, 触发价格, 信号类型) 或 (None, None, None)
"""
if current_idx <= 0 or current_position_direction is None:
return None, None, None
curr = all_data_3m[current_idx]
prev = all_data_3m[current_idx - 1]
c_high = float(curr['high'])
c_low = float(curr['low'])
prev_high = float(prev['high'])
prev_low = float(prev['low'])
# 计算上一根K线的实体百分比
body_percent = get_body_percent(prev)
# 持多单时检查是否跌破上一根K线最低点
if current_position_direction == 'long':
# 条件上一根K线是阴线且实体>min_body_percent%
if is_bearish(prev) and body_percent >= min_body_percent:
if c_low < prev_low:
# 触发反手开空信号
logger.debug(f"突破反手信号持多单当前K线低点{c_low:.2f}跌破上一根阴线低点{prev_low:.2f},实体{body_percent:.3f}%")
return 'short', prev_low, 'breakout'
# 持空单时检查是否涨破上一根K线最高点
elif current_position_direction == 'short':
# 条件上一根K线是阳线且实体>min_body_percent%
if is_bullish(prev) and body_percent >= min_body_percent:
if c_high > prev_high:
# 触发反手开多信号
logger.debug(f"突破反手信号持空单当前K线高点{c_high:.2f}涨破上一根阳线高点{prev_high:.2f},实体{body_percent:.3f}%")
return 'long', prev_high, 'breakout'
return None, None, None
def check_trigger_with_1m(
all_data_3m: List[Dict],
current_idx: int,
min_body_size: float = 0.1,
current_position_direction: str = None,
first_minute_only: bool = True
) -> tuple:
"""
检查当前3分钟K线是否触发了交易信号
如果同时触发两个方向使用1分钟K线精确判断顺序
新增逻辑:如果有持仓且 first_minute_only=True反手信号必须在第一分钟触发才有效
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param min_body_size: 最小实体大小
:param current_position_direction: 当前持仓方向 ('long', 'short', 或 None)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效
返回:(方向, 触发价格, 有效前一根K线索引, 1分钟数据是否使用)
"""
if current_idx <= 0:
return None, None, None, False
curr = all_data_3m[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size)
if prev is None:
return None, None, None, False
long_trigger, short_trigger = get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, False
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发使用1分钟K线精确判断
if long_triggered and short_triggered:
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
# 检查反手信号是否需要在第一分钟触发
if first_minute_only and current_position_direction and direction != current_position_direction:
# 这是一个反手信号,检查是否在第一分钟触发
if not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, current_position_direction):
# 反手信号不是在第一分钟触发,忽略
logger.debug(f"反手信号 {direction} 不在第一分钟触发,忽略")
return None, None, None, False
return direction, trigger_price, valid_prev_idx, True
# 如果没有1分钟数据使用开盘价距离判断
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False
else:
return 'long', long_trigger, valid_prev_idx, False
if short_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'long':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'long'):
logger.debug(f"空信号不在第一分钟触发,忽略(当前持多仓)")
return None, None, None, False
return 'short', short_trigger, valid_prev_idx, False
if long_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'short':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'short'):
logger.debug(f"多信号不在第一分钟触发,忽略(当前持空仓)")
return None, None, None, False
return 'long', long_trigger, valid_prev_idx, False
return None, None, None, False
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(
dates: List[str],
min_body_size: float = 0.1,
first_minute_only: bool = True,
enable_breakout_reverse: bool = True,
breakout_min_body_percent: float = 0.1
):
"""
三分之一策略回测(精准版)
:param dates: 日期列表
:param min_body_size: 最小实体大小(绝对值)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效默认True
:param enable_breakout_reverse: 是否启用"突破上一根K线高低点"反手信号默认True
:param breakout_min_body_percent: 突破反手信号的最小实体百分比默认0.1%
:return: (trades, stats)
"""
# 获取所有3分钟K线数据
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_3m_data_by_date(d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据")
logger.info(f"反手信号仅第一分钟有效: {first_minute_only}")
logger.info(f"突破反手信号启用: {enable_breakout_reverse},最小实体百分比: {breakout_min_body_percent}%")
if not all_data:
logger.warning("未获取到任何数据,请检查数据库")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
# 按时间戳排序
all_data.sort(key=lambda x: x['id'])
# 验证排序结果
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
# 统计使用1分钟数据精准判断的次数
precise_count = 0
fallback_count = 0
# 统计突破反手信号触发次数
breakout_reverse_count = 0
# 记录每根K线是否已经执行过突破反手当前K线只执行一次
last_breakout_bar_id = None
idx = 1
while idx < len(all_data):
curr = all_data[idx]
curr_bar_id = curr['id']
# 获取当前持仓方向(用于判断反手信号是否在第一分钟触发)
current_pos_dir = current_position['direction'] if current_position else None
# 检测信号使用1分钟K线精准判断并考虑反手信号第一分钟限制
direction, trigger_price, valid_prev_idx, used_1m = check_trigger_with_1m(
all_data, idx, min_body_size,
current_position_direction=current_pos_dir,
first_minute_only=first_minute_only
)
# 如果没有五分之一信号,且有持仓,检查突破反手信号
signal_type = 'one_fifth' # 信号类型one_fifth五分之一或 breakout突破
if direction is None and current_position is not None and enable_breakout_reverse:
# 检查当前K线是否已经执行过突破反手
if last_breakout_bar_id != curr_bar_id:
breakout_dir, breakout_price, breakout_type = check_breakout_reverse_signal(
all_data, idx, current_pos_dir, breakout_min_body_percent
)
if breakout_dir:
direction = breakout_dir
trigger_price = breakout_price
signal_type = 'breakout'
if used_1m:
precise_count += 1
elif direction and signal_type == 'one_fifth':
fallback_count += 1
# 无持仓 -> 开仓(只用五分之一信号开仓,突破信号不用于开仓)
if current_position is None:
if direction and signal_type == 'one_fifth':
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 开仓{direction} @ {trigger_price:.2f}")
idx += 1
continue
# 有持仓 -> 检查是否需要反向
pos_dir = current_position['direction']
if direction and direction != pos_dir:
# 反向信号,平仓并反手
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
# 记录信号类型
signal_type_str = '突破' if signal_type == 'breakout' else '五分之一'
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
'signal_type': signal_type_str
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 如果是突破反手信号记录当前K线ID防止重复执行
if signal_type == 'breakout':
last_breakout_bar_id = curr_bar_id
breakout_reverse_count += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 平{pos_dir}反手{direction}({signal_type_str}) @ {trigger_price:.2f} 盈亏={diff:.2f}")
idx += 1
# 尾仓处理最后一根K线收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count} 次,突破反手信号 {breakout_reverse_count}")
return trades, stats
# ========================= 运行回测 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
MIN_BODY_SIZE = 0.1 # 最小实体大小(绝对值)
FIRST_MINUTE_ONLY = True # 反手信号仅在3分钟K线的第一分钟触发才有效
# 突破反手信号配置
ENABLE_BREAKOUT_REVERSE = True # 是否启用"突破上一根K线高低点"反手信号
BREAKOUT_MIN_BODY_PERCENT = 0.1 # 突破反手信号的最小实体百分比0.1表示0.1%
# ==================== 生成查询日期列表 ====================
dates = []
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"回测日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e}")
exit(1)
# ==================== 执行回测 ====================
trades, stats = backtest_one_third_strategy(
dates,
MIN_BODY_SIZE,
FIRST_MINUTE_ONLY,
ENABLE_BREAKOUT_REVERSE,
BREAKOUT_MIN_BODY_PERCENT
)
# ==================== 输出交易详情 ====================
logger.info("===== 每笔交易详情 =====")
# 参数设定
contract_size = 10000 # 合约规模
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 平仓手续费率
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_price = t['exit']
direction = t['direction']
# 原始价差
point_diff = t['diff']
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit_price * close_fee_rate)
# 净利润
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_price:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
print(f"\n{'='*60}")
print(f"【BitMart 五分之一策略回测结果3分钟K线 + 1分钟精准判断")
print(f"{'='*60}")
print(f"回测周期:{START_DATE}{END_DATE}")
print(f"最小实体要求:{MIN_BODY_SIZE}")
print(f"反手信号仅第一分钟有效:{'' if FIRST_MINUTE_ONLY else ''}")
print(f"突破反手信号:{'启用' if ENABLE_BREAKOUT_REVERSE else '禁用'}(最小实体{BREAKOUT_MIN_BODY_PERCENT}%")
print(f"{'='*60}")
print(f"总交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
name = v['name']
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
# 保存交易记录到CSV
if trades:
import csv
csv_path = Path(__file__).parent / 'backtest_one_third_trades.csv'
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'entry_time', 'exit_time', 'direction', 'entry', 'exit',
'point_diff', 'raw_profit', 'fee', 'net_profit', 'signal_type'
])
writer.writeheader()
for t in trades:
writer.writerow({
'entry_time': t['entry_time'],
'exit_time': t['exit_time'],
'direction': t['direction'],
'entry': t['entry'],
'exit': t['exit'],
'point_diff': t['point_diff'],
'raw_profit': t['raw_profit'],
'fee': t['fee'],
'net_profit': t['net_profit'],
'signal_type': t.get('signal_type', '五分之一')
})
print(f"\n交易记录已保存到:{csv_path}")

View File

@@ -1,764 +0,0 @@
"""
量化交易回测系统 - 三分之一回归策略(双向触发版)
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根K线内只交易一次防止频繁反手
4. 实体过滤:
- 如果前一根K线的实体部分|open - close|< 0.1,继续往前查找
- 直到找到实体>=0.1的K线再用那根K线来计算触发价格
示例1阳线
前一根K线开盘3000收盘3100阳线实体=100
- 做多触发价格 = 3100 + 33 = 3133继续上涨做多
- 做空触发价格 = 3100 - 33 = 3067回调做空
示例2阴线
前一根K线开盘3100收盘3000阴线实体=100
- 做多触发价格 = 3000 + 33 = 3033反弹做多
- 做空触发价格 = 3000 - 33 = 2967继续下跌做空
"""
import datetime
import calendar
import os
from typing import List, Dict, Optional
from loguru import logger
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
import matplotlib
try:
import plotly.graph_objects as go
except Exception:
go = None
from models.bitmart_klines import BitMartETH5M
# 配置中文字体
import matplotlib.font_manager as fm
import warnings
# 忽略matplotlib的字体警告
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib.font_manager')
warnings.filterwarnings('ignore', message='.*Glyph.*missing.*', category=UserWarning)
# 尝试设置中文字体,按优先级尝试
chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi', 'FangSong', 'STSong', 'STHeiti']
available_fonts = [f.name for f in fm.fontManager.ttflist]
# 找到第一个可用的中文字体
font_found = None
for font_name in chinese_fonts:
if font_name in available_fonts:
font_found = font_name
break
if font_found:
plt.rcParams['font.sans-serif'] = [font_found] + ['DejaVu Sans']
logger.info(f"使用中文字体: {font_found}")
else:
# 如果没有找到中文字体,尝试使用系统默认字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS', 'DejaVu Sans']
logger.warning("未找到中文字体,使用默认配置")
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
plt.rcParams['font.size'] = 10 # 设置默认字体大小
# 尝试清除字体缓存(如果可能)
try:
# 不强制重建,避免性能问题
pass
except:
pass
# 获取当前脚本所在目录
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(prev):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3实体
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3实体
示例:
阳线 open=3000, close=3100, 实体=100
- 做多触发 = 3100 + 33 = 3133继续涨
- 做空触发 = 3100 - 33 = 3067回调
阴线 open=3100, close=3000, 实体=100
- 做多触发 = 3000 + 33 = 3033反弹
- 做空触发 = 3000 - 33 = 2967继续跌
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多
short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空
return long_trigger, short_trigger
def check_trigger(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
检查当前K线是否触发了交易信号双向检测
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则:
- 当前K线高点 >= 做多触发价格 → 做多信号
- 当前K线低点 <= 做空触发价格 → 做空信号
- 如果同时触发两个方向,以先触发的为准(这里简化为优先做空,因为下跌更快)
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data, current_idx, min_body_size)
if prev is None:
return None, None, None
long_trigger, short_trigger = get_one_third_levels(prev)
if long_trigger is None:
return None, None, None
# 使用影线部分high/low来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发,需要判断哪个先触发
# 简化处理:比较触发价格距离开盘价的远近,更近的先触发
if long_triggered and short_triggered:
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx
else:
return 'long', long_trigger, valid_prev_idx
if short_triggered:
return 'short', short_trigger, valid_prev_idx
if long_triggered:
return 'long', long_trigger, valid_prev_idx
return None, None, None
def get_data_by_date(model, date_str: str):
"""按天获取指定表的数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(dates: List[str]):
"""三分之一回归策略回测(优化版)"""
all_data: List[Dict] = []
for d in dates:
day_data = get_data_by_date(BitMartETH5M, d)
all_data.extend(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {len(all_data)} 条K线数据")
if not all_data:
logger.warning("未获取到任何数据")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_time = datetime.datetime.fromtimestamp(all_data[0]['id'] / 1000)
last_time = datetime.datetime.fromtimestamp(all_data[-1]['id'] / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M')}{last_time.strftime('%Y-%m-%d %H:%M')}")
# 验证排序打印前5条数据
logger.info("===== 前5条数据验证排序=====")
for i in range(min(5, len(all_data))):
d = all_data[i]
t = datetime.datetime.fromtimestamp(d['id'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
k_type = "阳线" if is_bullish(d) else ("阴线" if is_bearish(d) else "十字星")
logger.info(f" [{i}] {t} | {k_type} | O={d['open']:.2f} H={d['high']:.2f} L={d['low']:.2f} C={d['close']:.2f}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
# 额外统计信息
extra_stats = {
'same_dir_ignored': 0, # 同向信号被忽略次数
'no_signal_bars': 0, # 无信号K线数
'total_bars': len(all_data) - 1, # 总K线数排除第一根
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
last_trade_bar: Optional[int] = None # 记录上次交易的K线索引防止同一K线重复交易
for idx in range(1, len(all_data)):
curr = all_data[idx]
# 使用check_trigger函数它会自动查找实体>=0.1的前一根K线
direction, trigger_price, valid_prev_idx = check_trigger(all_data, idx, min_body_size=0.1)
# 获取有效的前一根K线用于日志输出
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
# 无信号时跳过
if direction is None:
extra_stats['no_signal_bars'] += 1
continue
# 同一K线内已交易跳过与交易代码逻辑一致
if last_trade_bar == idx:
continue
# 空仓时,有信号就开仓
if current_position is None:
if valid_prev is not None:
# 打印开仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【开仓】{direction} @ {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
last_trade_bar = idx # 记录交易K线
continue
# 有仓位时,检查信号
pos_dir = current_position['direction']
# 同向信号,忽略(与交易代码逻辑一致)
if direction == pos_dir:
extra_stats['same_dir_ignored'] += 1
continue
# 反向信号,平仓反手
if valid_prev is not None:
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
# 打印平仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【平仓反手】{pos_dir} -> {direction} @ {exit_price:.2f}, 盈亏: {diff:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
'hold_bars': idx - current_position['entry_bar'] # 持仓K线数
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
last_trade_bar = idx # 记录交易K线
# 尾仓处理
if current_position:
last = all_data[-1]
last_idx = len(all_data) - 1
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
'hold_bars': last_idx - current_position['entry_bar'], # 持仓K线数
'is_tail': True # 标记为尾仓平仓
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"【尾仓平仓】{pos_dir} @ {exit_price:.2f}, 盈亏: {diff:.2f}")
# 打印额外统计信息
logger.info(f"\n===== 信号统计 =====")
logger.info(f"总K线数: {extra_stats['total_bars']}")
logger.info(f"无信号K线: {extra_stats['no_signal_bars']} ({extra_stats['no_signal_bars']/extra_stats['total_bars']*100:.1f}%)")
logger.info(f"同向信号忽略: {extra_stats['same_dir_ignored']}")
return trades, stats, all_data, extra_stats
# ========================= 绘图函数 =========================
def plot_trades(all_data: List[Dict], trades: List[Dict], save_path: str = None):
"""
绘制K线图并标注交易点位
"""
if not all_data:
logger.warning("没有数据可绘制")
return
# 转换为 DataFrame
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.set_index('datetime', inplace=True)
df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'})
df = df[['Open', 'High', 'Low', 'Close']]
# 准备标记点
buy_signals = [] # 做多开仓
sell_signals = [] # 做空开仓
buy_exits = [] # 做多平仓
sell_exits = [] # 做空平仓
for trade in trades:
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
exit_time = pd.to_datetime(trade['exit_time_ms'], unit='ms')
direction = trade['direction']
entry_price = trade['entry']
exit_price = trade['exit']
if direction == '做多':
buy_signals.append((entry_time, entry_price))
buy_exits.append((exit_time, exit_price))
else:
sell_signals.append((entry_time, entry_price))
sell_exits.append((exit_time, exit_price))
# 创建标记序列
buy_markers = pd.Series(index=df.index, dtype=float)
sell_markers = pd.Series(index=df.index, dtype=float)
buy_exit_markers = pd.Series(index=df.index, dtype=float)
sell_exit_markers = pd.Series(index=df.index, dtype=float)
for t, p in buy_signals:
if t in buy_markers.index:
buy_markers[t] = p
for t, p in sell_signals:
if t in sell_markers.index:
sell_markers[t] = p
for t, p in buy_exits:
if t in buy_exit_markers.index:
buy_exit_markers[t] = p
for t, p in sell_exits:
if t in sell_exit_markers.index:
sell_exit_markers[t] = p
# 添加标记
add_plots = []
if buy_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_markers, type='scatter', markersize=100,
marker='^', color='green', label='做多开仓'))
if sell_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_markers, type='scatter', markersize=100,
marker='v', color='red', label='做空开仓'))
if buy_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_exit_markers, type='scatter', markersize=80,
marker='x', color='darkgreen', label='做多平仓'))
if sell_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_exit_markers, type='scatter', markersize=80,
marker='x', color='darkred', label='做空平仓'))
# 绘制K线图更接近交易所风格
market_colors = mpf.make_marketcolors(
up='#26a69a', # 常见交易所绿色
down='#ef5350', # 常见交易所红色
edge='inherit',
wick='inherit',
volume='inherit'
)
style = mpf.make_mpf_style(
base_mpf_style='binance',
marketcolors=market_colors,
gridstyle='-',
gridcolor='#e6e6e6'
)
fig, axes = mpf.plot(
df,
type='candle',
style=style,
title='三分之一回归策略回测',
ylabel='价格',
addplot=add_plots if add_plots else None,
figsize=(16, 9),
returnfig=True
)
# 添加图例
axes[0].legend(['做多开仓 ▲', '做空开仓 ▼', '做多平仓 ✕', '做空平仓 ✕'], loc='upper left')
# 标注开仓细节(方向、价格、时间)
if trades:
ax = axes[0]
max_annotate = 60 # 过多会拥挤,可按需调大/调小
annotated = 0
for i, trade in enumerate(trades):
if annotated >= max_annotate:
break
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
if entry_time not in df.index:
continue
entry_price = trade['entry']
direction = trade['direction']
color = 'green' if direction == '做多' else 'red'
text = f"{direction} @ {entry_price:.2f}\n{entry_time.strftime('%m-%d %H:%M')}"
y_offset = 20 if (i % 2 == 0) else -30
ax.annotate(
text,
xy=(entry_time, entry_price),
xytext=(0, y_offset),
textcoords='offset points',
ha='center',
va='bottom' if y_offset > 0 else 'top',
fontsize=8,
color=color,
arrowprops=dict(arrowstyle='->', color=color, lw=0.6, alpha=0.6)
)
annotated += 1
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
logger.info(f"图表已保存到: {save_path}")
plt.show()
def plot_trades_interactive(all_data: List[Dict], trades: List[Dict], html_path: str = None):
"""
交互式K线图TradingView风格支持缩放、时间区间/价格区间平移缩放)
"""
if not all_data:
logger.warning("没有数据可绘制")
return
if go is None:
logger.warning("未安装 plotly无法绘制交互式图。请先安装pip install plotly")
return
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.sort_values('datetime', inplace=True)
fig = go.Figure(
data=[
go.Candlestick(
x=df['datetime'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
increasing_line_color='#26a69a',
decreasing_line_color='#ef5350',
name='K线'
)
]
)
# 标注开仓点
if trades:
entry_x = []
entry_y = []
entry_text = []
entry_color = []
for t in trades:
entry_x.append(pd.to_datetime(t['entry_time_ms'], unit='ms'))
entry_y.append(t['entry'])
entry_text.append(f"{t['direction']} @ {t['entry']:.2f}<br>{t['entry_time']}")
entry_color.append('#26a69a' if t['direction'] == '做多' else '#ef5350')
fig.add_trace(
go.Scatter(
x=entry_x,
y=entry_y,
mode='markers',
marker=dict(size=8, color=entry_color),
name='开仓',
text=entry_text,
hoverinfo='text'
)
)
# TradingView风格深色背景 + 交互缩放
fig.update_layout(
title='三分之一回归策略回测(交互式)',
xaxis=dict(
rangeslider=dict(visible=True),
type='date',
showgrid=False
),
yaxis=dict(
showgrid=False,
fixedrange=False
),
plot_bgcolor='#0b0e11',
paper_bgcolor='#0b0e11',
font=dict(color='#d1d4dc'),
hovermode='x unified',
dragmode='zoom'
)
fig.show()
if html_path:
fig.write_html(html_path)
logger.info(f"交互图已保存到: {html_path}")
# ========================= 主程序 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
# ==================== 生成日期列表 ====================
dates = []
if START_DATE and END_DATE:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"查询日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
# ==================== 执行回测 ====================
trades, stats, all_data, extra_stats = backtest_one_third_strategy(dates)
# ==================== 输出结果 ====================
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = t['diff']
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
# 计算额外统计
win_count = len([t for t in trades if t['diff'] > 0])
lose_count = len([t for t in trades if t['diff'] <= 0])
total_win_rate = (win_count / len(trades) * 100) if trades else 0
# 计算平均持仓K线数
hold_bars_list = [t.get('hold_bars', 0) for t in trades if 'hold_bars' in t]
avg_hold_bars = sum(hold_bars_list) / len(hold_bars_list) if hold_bars_list else 0
# 计算最大连续亏损
max_consecutive_loss = 0
current_consecutive_loss = 0
for t in trades:
if t['diff'] <= 0:
current_consecutive_loss += 1
max_consecutive_loss = max(max_consecutive_loss, current_consecutive_loss)
else:
current_consecutive_loss = 0
# 计算最大回撤
cumulative_profit = 0
peak = 0
max_drawdown = 0
for t in trades:
cumulative_profit += t.get('net_profit', t['diff'])
peak = max(peak, cumulative_profit)
drawdown = peak - cumulative_profit
max_drawdown = max(max_drawdown, drawdown)
print(f"\n{'='*60}")
print(f"【三分之一回归策略 回测结果】")
print(f"{'='*60}")
print(f"交易笔数:{len(trades)}")
print(f"盈利笔数:{win_count} 亏损笔数:{lose_count}")
print(f"总胜率:{total_win_rate:.2f}%")
print(f"平均持仓K线数{avg_hold_bars:.1f}")
print(f"最大连续亏损:{max_consecutive_loss}")
print(f"{'='*60}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利:{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"最大回撤:{max_drawdown:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{v['name']}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
print("\n===== 信号统计 =====")
print(f"总K线数: {extra_stats['total_bars']}")
print(f"无信号K线: {extra_stats['no_signal_bars']} ({extra_stats['no_signal_bars']/extra_stats['total_bars']*100:.1f}%)")
print(f"同向信号忽略: {extra_stats['same_dir_ignored']}")
# ==================== 绘制图表 ====================
if trades and all_data:
# 如果数据太多只绘制最近一部分比如最近500根K线
max_bars = 500
if len(all_data) > max_bars:
logger.info(f"数据量较大({len(all_data)}条),只绘制最近 {max_bars} 根K线")
plot_data = all_data[-max_bars:]
# 过滤出在这个时间范围内的交易
min_time = datetime.datetime.fromtimestamp(plot_data[0]['id'] / 1000)
plot_trades_filtered = [t for t in trades if t['entry_time'] >= min_time]
else:
plot_data = all_data
plot_trades_filtered = trades
save_path = os.path.join(SCRIPT_DIR, '回测图表.png')
plot_trades(plot_data, plot_trades_filtered, save_path=save_path)
# 交互式版本TradingView风格支持时间区间/价格缩放
html_path = os.path.join(SCRIPT_DIR, '回测图表_交互式.html')
plot_trades_interactive(plot_data, plot_trades_filtered, html_path=html_path)

View File

@@ -1,418 +0,0 @@
"""
量化交易回测系统 - 30分钟K线策略回测BitMart数据源
========== 策略规则 ==========
重要所有开仓和平仓操作都在下一根K线的开盘价执行
1. 开仓条件信号出现时下一根K线开盘价开仓
- 阳包阴(涨包跌):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价
-> 下一根K线开盘价开多
- 阴包阳(跌包涨):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价
-> 下一根K线开盘价开空
2. 平仓条件所有平仓都在下一根K线开盘价执行
- 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓
- 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓
- 遇到反向信号下一根K线开盘价平仓并反手开仓
3. 续持条件:
- 遇到同向信号:续持
- 未满足平仓条件:续持
"""
import datetime
import calendar
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.bitmart import BitMart30
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(prev, curr):
"""
包住形态信号判定(优化版):
只看两种信号,严格按照收盘价与开盘价的比较:
1. 跌包涨(前涨后跌)-> 做空:
- 前一根是涨阳线close > open
- 后一根是跌阴线close < open
- 且:跌的收盘价 < 涨的开盘价curr['close'] < prev['open']
2. 涨包跌(前跌后涨)-> 做多:
- 前一根是跌阴线close < open
- 后一根是涨阳线close > open
- 且:涨的收盘价 > 跌的开盘价curr['close'] > prev['open']
"""
p_open = float(prev['open'])
c_close = float(curr['close'])
# 跌包涨(前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价
if is_bullish(prev) and is_bearish(curr) and c_close < p_open:
return "short", "bull_bear_engulf"
# 涨包跌(前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价
if is_bearish(prev) and is_bullish(curr) and c_close > p_open:
return "long", "bear_bull_engulf"
return None, None
def get_data_by_date(model, date_str: str):
"""
按天获取指定表的数据30分钟K线
数据格式:时间戳(毫秒级) 开盘价 最高价 最低价 收盘价
例如1767461400000 3106.68 3109.1 3106.22 3107.22
注意返回的数据已按时间戳id升序排序
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
# 将日期转换为毫秒级时间戳进行查询
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
# 查询时按时间戳升序排序
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# 确保数据已排序
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_15m_trend_optimized(dates: List[str]):
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(BitMart30, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查:")
logger.warning("1. 数据库连接是否正常")
logger.warning("2. 查询的日期范围是否在数据范围内")
logger.warning("3. 时间戳格式是否正确(毫秒级)")
return [], {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
# 重要合并所有数据后必须先按时间戳id排序
all_data.sort(key=lambda x: x['id'])
# 验证排序结果
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None # 开仓信息
consecutive_opposite_count = 0 # 连续反色K线计数
idx = 1
while idx < len(all_data) - 1:
prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1]
direction, signal_key = check_signal(prev, curr)
# 空仓 -> 碰到信号则开仓下一根K线开盘价
if current_position is None:
if direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
idx += 1
continue
# 有仓位状态:检查平仓条件
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
# 反手开仓
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
idx += 1
continue
# 2. 检查连续反色K线平仓条件下一根K线开盘价平仓
# 持有多单:检查是否连续两根阴线
if pos_dir == 'long' and is_bearish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阴线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
exit_price = float(next_bar['open'])
diff = exit_price - current_position['entry_price']
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阴线,续持
idx += 1
continue
# 持有空单:检查是否连续两根阳线
elif pos_dir == 'short' and is_bullish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阳线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
exit_price = float(next_bar['open'])
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阳线,续持
idx += 1
continue
# 3. 同向K线或同向信号 -> 续持,重置连续反色计数
if (pos_dir == 'long' and is_bullish(curr)) or (pos_dir == 'short' and is_bearish(curr)):
consecutive_opposite_count = 0 # 重置连续反色计数
# 同向信号 -> 续持
if direction and direction == pos_dir:
consecutive_opposite_count = 0 # 重置连续反色计数
idx += 1
continue
idx += 1
# 尾仓:最后一根收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0: stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
# ==================== 配置参数:指定查询时间范围 ====================
# 方式1指定开始和结束日期推荐
START_DATE = "2025-01-01" # 开始日期格式YYYY-MM-DD
END_DATE = "2025-12-31" # 结束日期格式YYYY-MM-DD
# 方式2如果上面两个为空则使用年份和月份范围
START_YEAR = None # 开始年份例如2025
START_MONTH = None # 开始月份例如1
END_YEAR = None # 结束年份例如2025
END_MONTH = None # 结束月份例如12
# ==================== 生成查询日期列表 ====================
dates = []
# 优先使用指定的日期范围
if START_DATE and END_DATE:
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"使用指定日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式")
exit(1)
# 如果未指定日期范围,使用年份和月份范围
elif START_YEAR and END_YEAR:
start_m = START_MONTH if START_MONTH else 1
end_m = END_MONTH if END_MONTH else 12
for year in range(START_YEAR, END_YEAR + 1):
month_start = start_m if year == START_YEAR else 1
month_end = end_m if year == END_YEAR else 12
for month in range(month_start, month_end + 1):
days_in_month = calendar.monthrange(year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"{year}-{month:02d}-{day:02d}")
logger.info(f"使用年份月份范围:{START_YEAR}{start_m}月 到 {END_YEAR}{end_m}月,共 {len(dates)}")
# 如果都没有指定,使用默认范围
else:
logger.warning("未指定日期范围使用默认2025年1-12月")
for month in range(1, 13):
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
if dates:
logger.info(f"准备查询 {len(dates)} 天的数据,日期范围:{dates[0]}{dates[-1]}")
else:
logger.error("未生成任何查询日期,请检查配置参数")
exit(1)
trades, stats = backtest_15m_trend_optimized(dates)
logger.info("===== 每笔交易详情 =====")
# === 参数设定 ===
contract_size = 10000 # 合约规模1手对应多少基础货币
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# === 1⃣ 原始价差(点差) ===
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
# === 2⃣ 金额盈利(考虑合约规模) ===
money_profit = point_diff / entry * contract_size # 利润以基础货币计例如USD
# === 3⃣ 手续费计算 ===
# 开仓 + 平仓手续费(按比例计算 + 固定)
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# === 4⃣ 净利润 ===
net_profit = money_profit - fee
# 保存计算结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
# if net_profit < -400:
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# === 汇总统计 ===
total_net_profit = total_money_profit - total_fee
print(f"\n【BitMart 回测结果】")
print(f"一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print(total_money_profit - total_fee * 0.1)
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")

View File

@@ -1,418 +0,0 @@
"""
量化交易回测系统 - 三分之一策略5分钟K线 + 1分钟精准判断
========== 策略规则(与 交易/bitmart-三分之一策略交易.py 一致)==========
1. 触发价格计算基于有效的前一根5分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前5分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前5分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根5分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根5分钟K线同时触及做多和做空价格时
- 使用该5分钟K线对应的5根1分钟K线来判断哪个方向先被触发
- 使回测更贴近真实成交顺序
"""
import datetime
import calendar
from typing import List, Dict, Optional
from loguru import logger
from models.bitmart_klines import BitMartETH5M, BitMartETH1M
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(prev: Dict):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 3
short_trigger = p_close - body / 3
return long_trigger, short_trigger
def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据毫秒时间戳
"""
query = BitMartETH1M.select().where(
BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1)
).order_by(BitMartETH1M.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
def get_1m_data_for_5m_bar(bar_5m: Dict) -> List[Dict]:
"""获取一根5分钟K线对应的5根1分钟K线"""
start_ts = bar_5m['id']
end_ts = start_ts + 5 * 60 * 1000
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> Optional[str]:
"""
使用1分钟K线判断在一根5分钟周期内先触发做多还是做空。
按时间顺序遍历每根1分钟K线先触及哪个方向则返回该方向
若同一根1分钟K线内两个方向都触及用开盘价距离判断。
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
if long_triggered and short_triggered:
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
return 'short' if dist_to_short <= dist_to_long else 'long'
return None
def check_trigger_with_1m(
all_data_5m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
) -> tuple:
"""
检查当前5分钟K线是否触发交易信号。
若同时触发多空则用该5分钟内的1分钟K线判断先后顺序。
返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触)
"""
if current_idx <= 0:
return None, None, None, False, False
curr = all_data_5m[current_idx]
valid_prev_idx, prev = find_valid_prev_bar(all_data_5m, current_idx, min_body_size)
if prev is None:
return None, None, None, False, False
long_trigger, short_trigger = get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, False, False
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
if both_triggered:
bars_1m = get_1m_data_for_5m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
return direction, trigger_price, valid_prev_idx, True, True
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False, True
return 'long', long_trigger, valid_prev_idx, False, True
if short_triggered:
return 'short', short_trigger, valid_prev_idx, False, False
if long_triggered:
return 'long', long_trigger, valid_prev_idx, False, False
return None, None, None, False, False
def get_data_by_date(model, date_str: str) -> List[Dict]:
"""
按天获取指定表的K线数据毫秒时间戳
返回格式id(ms), open, high, low, close已按 id 升序。
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_third_precise(dates: List[str], min_body_size: float = 0.1):
"""
三分之一策略回测5分钟K线 + 1分钟精准判断
风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。
"""
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(BitMartETH5M, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条5分钟K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查:")
logger.warning("1. 数据库连接与 bitmart_eth_5m / bitmart_eth_1m 表是否存在")
logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 5 分钟数据")
return [], {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}, {'precise_1m_count': 0, 'fallback_count': 0}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
extra = {'precise_1m_count': 0, 'fallback_count': 0}
trades: List[Dict] = []
current_position: Optional[Dict] = None
last_trade_bar: Optional[int] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size)
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
if used_1m:
extra['precise_1m_count'] += 1
elif both_triggered and direction:
extra['fallback_count'] += 1
if direction is None:
continue
if last_trade_bar == idx:
continue
if current_position is None:
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
continue
pos_dir = current_position['direction']
if direction == pos_dir:
continue
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'signal': '三分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'signal': '三分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']}")
return trades, stats, extra
# ========================= 运行示例(与 回测数据-30分钟版 风格一致)=========================
if __name__ == '__main__':
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
START_YEAR = None
START_MONTH = None
END_YEAR = None
END_MONTH = None
dates = []
if START_DATE and END_DATE:
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"使用指定日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式")
exit(1)
elif START_YEAR and END_YEAR:
start_m = START_MONTH if START_MONTH else 1
end_m = END_MONTH if END_MONTH else 12
for year in range(START_YEAR, END_YEAR + 1):
month_start = start_m if year == START_YEAR else 1
month_end = end_m if year == END_YEAR else 12
for month in range(month_start, month_end + 1):
days_in_month = calendar.monthrange(year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"{year}-{month:02d}-{day:02d}")
logger.info(f"使用年份月份范围:{START_YEAR}{start_m}月 到 {END_YEAR}{end_m}月,共 {len(dates)}")
else:
logger.warning("未指定日期范围使用默认2025年1-12月")
for month in range(1, 13):
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
if not dates:
logger.error("未生成任何查询日期,请检查配置参数")
exit(1)
trades, stats, extra = backtest_one_third_precise(dates, min_body_size=0.1)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p)
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
total_net_profit = total_money_profit - total_fee
print("\n【BitMart 三分之一策略回测结果5分钟K线 + 1分钟精准判断")
print(f"一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
print(f"使用1分钟K线精准判断双触次数: {extra['precise_1m_count']}")
print(f"使用开盘价距离判断次数: {extra['fallback_count']}")

View File

@@ -1,418 +0,0 @@
"""
量化交易回测系统 - 五分之一策略3分钟K线 + 1分钟精准判断
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根3分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前3分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前3分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根3分钟K线同时触及做多和做空价格时
- 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发
- 使回测更贴近真实成交顺序
"""
import datetime
import calendar
from typing import List, Dict, Optional
from loguru import logger
from models.bitmart_klines import BitMartETH3M, BitMartETH1M
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(prev: Dict):
"""
计算前一根K线实体的 1/5 双向触发价格
返回:(做多触发价格, 做空触发价格)
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据毫秒时间戳
"""
query = BitMartETH1M.select().where(
BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1)
).order_by(BitMartETH1M.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]:
"""获取一根3分钟K线对应的3根1分钟K线"""
start_ts = bar_3m['id']
end_ts = start_ts + 3 * 60 * 1000
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> Optional[str]:
"""
使用1分钟K线判断在一根3分钟周期内先触发做多还是做空。
按时间顺序遍历每根1分钟K线先触及哪个方向则返回该方向
若同一根1分钟K线内两个方向都触及用开盘价距离判断。
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
if long_triggered and short_triggered:
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
return 'short' if dist_to_short <= dist_to_long else 'long'
return None
def check_trigger_with_1m(
all_data_3m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
) -> tuple:
"""
检查当前3分钟K线是否触发交易信号。
若同时触发多空则用该3分钟内的1分钟K线判断先后顺序。
返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触)
"""
if current_idx <= 0:
return None, None, None, False, False
curr = all_data_3m[current_idx]
valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size)
if prev is None:
return None, None, None, False, False
long_trigger, short_trigger = get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, False, False
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
if both_triggered:
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
return direction, trigger_price, valid_prev_idx, True, True
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False, True
return 'long', long_trigger, valid_prev_idx, False, True
if short_triggered:
return 'short', short_trigger, valid_prev_idx, False, False
if long_triggered:
return 'long', long_trigger, valid_prev_idx, False, False
return None, None, None, False, False
def get_data_by_date(model, date_str: str) -> List[Dict]:
"""
按天获取指定表的K线数据毫秒时间戳
返回格式id(ms), open, high, low, close已按 id 升序。
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_fifth_precise(dates: List[str], min_body_size: float = 0.1):
"""
五分之一策略回测3分钟K线 + 1分钟精准判断
风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。
"""
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(BitMartETH3M, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查:")
logger.warning("1. 数据库连接与 bitmart_eth_3m / bitmart_eth_1m 表是否存在")
logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 3 分钟数据")
return [], {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}, {'precise_1m_count': 0, 'fallback_count': 0}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
extra = {'precise_1m_count': 0, 'fallback_count': 0}
trades: List[Dict] = []
current_position: Optional[Dict] = None
last_trade_bar: Optional[int] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size)
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
if used_1m:
extra['precise_1m_count'] += 1
elif both_triggered and direction:
extra['fallback_count'] += 1
if direction is None:
continue
if last_trade_bar == idx:
continue
if current_position is None:
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
continue
pos_dir = current_position['direction']
if direction == pos_dir:
continue
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'signal': '五分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'signal': '五分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']}")
return trades, stats, extra
# ========================= 运行示例(与 回测数据-30分钟版 风格一致)=========================
if __name__ == '__main__':
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
START_YEAR = None
START_MONTH = None
END_YEAR = None
END_MONTH = None
dates = []
if START_DATE and END_DATE:
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"使用指定日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式")
exit(1)
elif START_YEAR and END_YEAR:
start_m = START_MONTH if START_MONTH else 1
end_m = END_MONTH if END_MONTH else 12
for year in range(START_YEAR, END_YEAR + 1):
month_start = start_m if year == START_YEAR else 1
month_end = end_m if year == END_YEAR else 12
for month in range(month_start, month_end + 1):
days_in_month = calendar.monthrange(year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"{year}-{month:02d}-{day:02d}")
logger.info(f"使用年份月份范围:{START_YEAR}{start_m}月 到 {END_YEAR}{end_m}月,共 {len(dates)}")
else:
logger.warning("未指定日期范围使用默认2025年1-12月")
for month in range(1, 13):
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
if not dates:
logger.error("未生成任何查询日期,请检查配置参数")
exit(1)
trades, stats, extra = backtest_one_fifth_precise(dates, min_body_size=0.1)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p)
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
total_net_profit = total_money_profit - total_fee
print("\n【BitMart 五分之一策略回测结果3分钟K线 + 1分钟精准判断")
print(f"一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
print(f"使用1分钟K线精准判断双触次数: {extra['precise_1m_count']}")
print(f"使用开盘价距离判断次数: {extra['fallback_count']}")

View File

@@ -1,382 +0,0 @@
import time
import datetime
import openBrowser
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
# 获取当前持仓方向
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
# 设置杠杆和全仓
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def take_over_browser(self):
"""接管浏览器"""
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def close_extra_tabs(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def 平仓(self):
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价最多或者做空1是做多-1是做空
limitPriceShortOrder 限价最多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
logger.info(text)
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定(优化版):
只看两种信号,严格按照收盘价与开盘价的比较:
1. 阳包阴(涨包跌,前跌后涨)-> 做多:
- 前一根是跌阴线close < open
- 后一根是涨阳线close > open
- 且:涨的收盘价 > 跌的开盘价curr['close'] > prev['open']
2. 阴包阳(跌包涨,前涨后跌)-> 做空:
- 前一根是涨阳线close > open
- 后一根是跌阴线close < open
- 且:跌的收盘价 < 涨的开盘价curr['close'] < prev['open']
"""
p_open = float(prev['open'])
c_close = float(curr['close'])
# 阳包阴(涨包跌,前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价
if self.is_bearish(prev) and self.is_bullish(curr) and c_close > p_open:
return "long", "bear_bull_engulf"
# 阴包阳(跌包涨,前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价
if self.is_bullish(prev) and self.is_bearish(curr) and c_close < p_open:
return "short", "bull_bear_engulf"
return None, None
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开 TGE 失败!", error=True)
return
logger.info("TGE 端口获取成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if self.time_start == self.get_now_time():
time.sleep(5)
continue
new_price_datas = self.get_klines()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
self.time_start = self.get_now_time()
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
if self.start == 1:
if self.is_bearish(self.kline_1) and self.is_bearish(self.kline_2):
self.平仓()
elif self.start == -1:
if self.is_bullish(self.kline_1) and self.is_bullish(self.kline_2):
self.平仓()
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction == "long":
if self.start == -1:
self.平仓()
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
elif self.start == 0:
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
if self.direction == "short":
if self.start == 1:
self.平仓()
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
elif self.start == 0:
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
self.pbar.reset() # 重置进度条
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()