This commit is contained in:
ddrwode
2026-01-18 20:31:41 +08:00
parent 51f3401ea4
commit 1680a3571e
2 changed files with 414 additions and 0 deletions

414
mexc/30分钟.py Normal file
View File

@@ -0,0 +1,414 @@
"""
量化交易回测系统 - 30分钟K线策略回测Weex数据源
========== 策略规则 ==========
重要所有开仓和平仓操作都在下一根K线的开盘价执行
【策略流程】
1. 开仓条件信号出现时下一根K线开盘价开仓
- 阳包阴(涨包跌)信号 -> 开多
* 前一根是跌(阴线),后一根是涨(阳线)
* 且:涨的收盘价 > 跌的开盘价
- 阴包阳(跌包涨)信号 -> 开空
* 前一根是涨(阳线),后一根是跌(阴线)
* 且:跌的收盘价 < 涨的开盘价
2. 平仓条件所有平仓都在下一根K线开盘价执行
- 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓
- 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓
- 遇到反向信号下一根K线开盘价平仓并反手开仓
* 例如:持有多单时遇到阴包阳信号 -> 平多开空
3. 续持条件:
- 遇到同向信号:续持
- 未满足平仓条件:续持
【数据处理流程】
1. 从数据库读取数据
2. 数据排序(按时间戳升序)
3. 开始回测
"""
import datetime
import calendar
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.mexc import Mexc30
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(prev, curr):
"""
包住形态信号判定(优化版):
只看两种信号,严格按照收盘价与开盘价的比较:
1. 阳包阴(涨包跌,前跌后涨)-> 做多:
- 前一根是跌阴线close < open
- 后一根是涨阳线close > open
- 且:涨的收盘价 > 跌的开盘价curr['close'] > prev['open']
2. 阴包阳(跌包涨,前涨后跌)-> 做空:
- 前一根是涨阳线close > open
- 后一根是跌阴线close < open
- 且:跌的收盘价 < 涨的开盘价curr['close'] < prev['open']
"""
p_open = float(prev['open'])
c_close = float(curr['close'])
# 阳包阴(涨包跌,前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价
if is_bearish(prev) and is_bullish(curr) and c_close > p_open:
return "long", "bear_bull_engulf"
# 阴包阳(跌包涨,前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价
if is_bullish(prev) and is_bearish(curr) and c_close < p_open:
return "short", "bull_bear_engulf"
return None, None
def get_data_by_date(model, date_str: str):
"""
按天获取指定表的数据30分钟K线
数据格式:时间戳(毫秒级) 开盘价 最高价 最低价 收盘价
例如1767461400000 3106.68 3109.1 3106.22 3107.22
注意返回的数据已按时间戳id升序排序
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
# 将日期转换为毫秒级时间戳进行查询
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
# 查询时按时间戳升序排序
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# 确保数据已排序
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_15m_trend_optimized(dates: List[str]):
"""
回测策略逻辑:
1. 开仓条件信号出现时下一根K线开盘价开仓
- 阳包阴(涨包跌)信号 -> 开多
- 阴包阳(跌包涨)信号 -> 开空
2. 平仓条件所有平仓都在下一根K线开盘价执行
- 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓
- 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓
- 遇到反向信号下一根K线开盘价平仓并反手开仓
(例如:持有多单时遇到阴包阳信号 -> 平多开空)
3. 续持条件:
- 遇到同向信号:续持
- 未满足平仓条件:续持
"""
# ==================== 步骤1从数据库读取数据 ====================
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(Mexc30, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查数据库连接和日期范围")
return [], {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
# ==================== 步骤2数据去重 + 排序(重要!必须先处理再回测) ====================
# 以时间戳(id)为唯一键去重,保留最先出现的一条
unique_by_id: Dict[int, Dict] = {}
for item in all_data:
if item['id'] not in unique_by_id:
unique_by_id[item['id']] = item
all_data = list(unique_by_id.values())
all_data.sort(key=lambda x: x['id'])
# 验证排序结果
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
# ==================== 步骤3开始回测 ====================
stats = {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None # 开仓信息
consecutive_opposite_count = 0 # 连续反色K线计数
idx = 1
while idx < len(all_data) - 1:
prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1]
direction, signal_key = check_signal(prev, curr)
# ==================== 空仓状态:遇到信号则开仓 ====================
# 策略:阳包阴信号 -> 开多,阴包阳信号 -> 开空
if current_position is None:
if direction:
# 信号出现prev和curr形成信号在下一根K线next_bar的开盘价开仓
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
logger.debug(f"开仓: {stats[signal_key]['name']} {'做多' if direction == 'long' else '做空'} @ {entry_price:.2f}")
idx += 1
continue
# ==================== 有仓位状态:检查平仓条件 ====================
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓
# 策略:遇到反向信号(如持有多单时遇到阴包阳),平仓并反手开仓
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
# 反手开仓下一根K线开盘价
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
logger.debug(f"反向信号反手: 平{'做多' if pos_dir == 'long' else '做空'} @ {exit_price:.2f}, 开{'做多' if direction == 'long' else '做空'}")
idx += 1
continue
# 2. 检查连续反色K线平仓条件下一根K线开盘价平仓
# 策略:持有多单时,遇到两根连续的阴线 -> 平仓
# 持有空单时,遇到两根连续的阳线 -> 平仓
if pos_dir == 'long' and is_bearish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阴线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
logger.debug(f"平仓: 做多遇到连续两根阴线")
exit_price = float(next_bar['open'])
diff = exit_price - current_position['entry_price']
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阴线,续持
idx += 1
continue
# 持有空单:检查是否连续两根阳线
elif pos_dir == 'short' and is_bullish(curr):
consecutive_opposite_count += 1
# 如果已经连续两根阳线下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
logger.debug(f"平仓: 做空遇到连续两根阳线")
exit_price = float(next_bar['open'])
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
consecutive_opposite_count = 0
idx += 1
continue
else:
# 只有一根阳线,续持
idx += 1
continue
# 3. 同向K线或同向信号 -> 续持,重置连续反色计数
if (pos_dir == 'long' and is_bullish(curr)) or (pos_dir == 'short' and is_bearish(curr)):
consecutive_opposite_count = 0 # 重置连续反色计数
# 同向信号 -> 续持
if direction and direction == pos_dir:
consecutive_opposite_count = 0 # 重置连续反色计数
idx += 1
continue
idx += 1
# 尾仓:最后一根收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0: stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
dates = []
# 获取当前日期
today = datetime.datetime.now()
target_year = 2025
for month in range(1, 13):
# 获取该月的实际天数
days_in_month = calendar.monthrange(target_year, month)[1]
for day in range(1, days_in_month + 1):
# 只添加今天及之前的日期
date_str = f"{target_year}-{month:02d}-{day:02d}"
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
# 如果日期在今天之后,跳过
if date_obj > today:
break
dates.append(date_str)
print(dates)
# dates = [f"2025-09-{i}" for i in range(1, 32)]
trades, stats = backtest_15m_trend_optimized(dates)
logger.info("===== 每笔交易详情 =====")
# === 参数设定 ===
contract_size = 10000 # 合约规模1手对应多少基础货币
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# === 1⃣ 原始价差(点差) ===
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
# === 2⃣ 金额盈利(考虑合约规模) ===
money_profit = point_diff / entry * contract_size # 利润以基础货币计例如USD
# === 3⃣ 手续费计算 ===
# 开仓 + 平仓手续费(按比例计算 + 固定)
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# === 4⃣ 净利润 ===
net_profit = money_profit - fee
# 保存计算结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
# if net_profit < -400:
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# === 汇总统计 ===
total_net_profit = total_money_profit - total_fee
print(f"\n一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print(total_money_profit - total_fee * 0.1)
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")

Binary file not shown.