Files
lm_code/weex/长期持有信号/读取数据库数据-30分钟版.py
2025-10-22 16:22:36 +08:00

260 lines
10 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版)
"""
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.weex import Weex30 # 替换为你的15分钟K线模型
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
return None, None
def get_data_by_date(model, date_str: str):
"""按天获取指定表的数据15分钟"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# ========================= 回测逻辑 =========================
def backtest_15m_trend_optimized(dates: List[str]):
all_data: List[Dict] = []
for d in dates:
all_data.extend(get_data_by_date(Weex30, d))
if not all_data:
return [], {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
all_data.sort(key=lambda x: x['id'])
stats = {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None # 开仓信息
idx = 1
while idx < len(all_data) - 1:
prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1]
direction, signal_key = check_signal(prev, curr)
# 空仓 -> 碰到信号则开仓
if current_position is None and direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
idx += 1
continue
if current_position:
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 反向信号 -> 下一根开盘平仓 + 同价反手
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
idx += 1
continue
# 同向信号 -> 续持
if direction and direction == pos_dir:
idx += 1
continue
# 单根反色K线 -> 判断后续是否能组成信号
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
can_peek = idx + 1 < len(all_data)
if can_peek:
lookahead_dir, _ = check_signal(curr, all_data[idx + 1])
if lookahead_dir is not None:
idx += 1
continue # 后续可组成信号,等待信号处理
# 否则按收盘价平仓
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(all_data[idx + 1]['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
idx += 1
# 尾仓:最后一根收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0: stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
# dates = []
# for i in range(1, 11):
# for i1 in range(1, 31):
# dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{i1}")
#
# print(dates)
dates = [f"2025-07-{i}" for i in range(1, 31)]
trades, stats = backtest_15m_trend_optimized(dates)
logger.info("===== 每笔交易详情 =====")
# === 参数设定 ===
contract_size = 10000 # 合约规模1手对应多少基础货币
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# === 1⃣ 原始价差(点差) ===
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
# === 2⃣ 金额盈利(考虑合约规模) ===
money_profit = point_diff / entry * contract_size # 利润以基础货币计例如USD
# === 3⃣ 手续费计算 ===
# 开仓 + 平仓手续费(按比例计算 + 固定)
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# === 4⃣ 净利润 ===
net_profit = money_profit - fee
# 保存计算结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
# if net_profit < -400:
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# === 汇总统计 ===
total_net_profit = total_money_profit - total_fee
print(f"\n一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")