Files
lm_code/weex/长期持有信号/30分钟优化实体超过版本.py

308 lines
10 KiB
Python
Raw Normal View History

2025-12-04 15:11:15 +08:00
"""
15分钟K线包住形态策略优化版本
- 第二根实体必须比第一根实体大 20% 以上
- 支持续持反手单根反色平仓
"""
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.weex import Weex30 # 你的数据库模型
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def calc_body(c):
"""计算实体大小"""
return abs(float(c['close']) - float(c['open']))
# ========================= 包住信号含20%增强) =========================
def check_signal(prev, curr):
"""
包住信号
- 多头前跌后涨包住
- 空头前涨后跌包住
增强条件
- 第二根currK线实体大小 >= 第一根(prev) × 1.2
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
prev_body = calc_body(prev)
curr_body = calc_body(curr)
# ====== 第二根实体必须 ≥ 第一根实体 × 1.2 ======
2025-12-04 16:51:44 +08:00
if curr_body < prev_body + (curr_body * 0.4):
2025-12-04 15:11:15 +08:00
return None, None
# ----------- 多头包住信号bear → bull-----------
if (is_bearish(prev) and is_bullish(curr)
and c_open <= p_close
and c_close >= p_open):
return "long", "bear_bull_engulf"
# ----------- 空头包住信号bull → bear-----------
if (is_bullish(prev) and is_bearish(curr)
and c_open >= p_close
and c_close <= p_open):
return "short", "bull_bear_engulf"
return None, None
# ========================= 数据获取 =========================
def get_data_by_date(model, date_str: str):
"""按天获取 15 分钟数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,需为 YYYY-MM-DD")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# ========================= 核心回测逻辑 =========================
def backtest_15m_trend_optimized(dates: List[str]):
all_data = []
for d in dates:
all_data.extend(get_data_by_date(Weex30, d))
if not all_data:
return [], {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
all_data.sort(key=lambda x: x['id'])
stats = {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
trades = []
current_position = None
idx = 1
while idx < len(all_data) - 1:
prev = all_data[idx - 1]
curr = all_data[idx]
next_bar = all_data[idx + 1]
direction, sig_key = check_signal(prev, curr)
# ========== 空仓遇到信号 -> 开仓 ==========
if current_position is None and direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[sig_key]['name'],
'signal_key': sig_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[sig_key]['count'] += 1
idx += 1
continue
# ========== 已持仓 ==========
if current_position:
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# ======= 反向信号 -> 平仓 + 反手 =======
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
# === 反手开新仓 ===
current_position = {
'direction': direction,
'signal': stats[sig_key]['name'],
'signal_key': sig_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
stats[sig_key]['count'] += 1
idx += 1
continue
# ======= 同向信号 -> 继续持仓 =======
if direction and direction == pos_dir:
idx += 1
continue
# ======= 单根反色 =======
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or \
(pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
# 先判断下一根是不是会形成信号
if idx + 1 < len(all_data):
look_dir, _ = check_signal(curr, all_data[idx + 1])
if look_dir: # 后面还能形成信号 → 忍耐
idx += 1
continue
# 否则按下一根收盘价平仓
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
current_position = None
idx += 1
# ========== 尾仓强制平仓 ==========
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0:
stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ================ 主程序(盈利计算) ================
if __name__ == '__main__':
2025-12-04 16:51:44 +08:00
dates = []
for i in range(1, 11):
for i1 in range(1, 31):
dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{i1}")
2025-12-04 15:11:15 +08:00
2025-12-04 16:51:44 +08:00
print(dates)
# dates = [f"2025-09-{i}" for i in range(1, 32)]
2025-12-04 15:11:15 +08:00
trades, stats = backtest_15m_trend_optimized(dates)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points = 0
total_raw_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# 点差
point_diff = exit - entry if direction == '做多' else entry - exit
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points += point_diff
total_raw_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# 汇总
total_net = total_raw_profit - total_fee
print(f"\n总交易数:{len(trades)}")
print(f"总点差:{total_points:.2f}")
print(f"总原始盈利:{total_raw_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net:.2f}\n")
2025-12-04 16:51:44 +08:00
print(total_raw_profit - total_fee * 0.1)
2025-12-04 15:11:15 +08:00
print("===== 信号统计 =====")
for k, v in stats.items():
name = v['name']
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = wins / count * 100 if count > 0 else 0
avg_p = total_p / count if count > 0 else 0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")