Files
lm_code/weex/长期持有信号/30分钟优化实体超过版本.py
2025-12-04 15:11:15 +08:00

300 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
15分钟K线包住形态策略优化版本
- 第二根实体必须比第一根实体大 20% 以上
- 支持续持、反手、单根反色平仓
"""
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.weex import Weex30 # 你的数据库模型
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def calc_body(c):
"""计算实体大小"""
return abs(float(c['close']) - float(c['open']))
# ========================= 包住信号含20%增强) =========================
def check_signal(prev, curr):
"""
包住信号:
- 多头:前跌后涨包住
- 空头:前涨后跌包住
增强条件:
- 第二根currK线实体大小 >= 第一根(prev) × 1.2
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
prev_body = calc_body(prev)
curr_body = calc_body(curr)
# ====== 第二根实体必须 ≥ 第一根实体 × 1.2 ======
if curr_body < prev_body * 1.2:
return None, None
# ----------- 多头包住信号bear → bull-----------
if (is_bearish(prev) and is_bullish(curr)
and c_open <= p_close
and c_close >= p_open):
return "long", "bear_bull_engulf"
# ----------- 空头包住信号bull → bear-----------
if (is_bullish(prev) and is_bearish(curr)
and c_open >= p_close
and c_close <= p_open):
return "short", "bull_bear_engulf"
return None, None
# ========================= 数据获取 =========================
def get_data_by_date(model, date_str: str):
"""按天获取 15 分钟数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,需为 YYYY-MM-DD")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# ========================= 核心回测逻辑 =========================
def backtest_15m_trend_optimized(dates: List[str]):
all_data = []
for d in dates:
all_data.extend(get_data_by_date(Weex30, d))
if not all_data:
return [], {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
all_data.sort(key=lambda x: x['id'])
stats = {
'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'},
'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'},
}
trades = []
current_position = None
idx = 1
while idx < len(all_data) - 1:
prev = all_data[idx - 1]
curr = all_data[idx]
next_bar = all_data[idx + 1]
direction, sig_key = check_signal(prev, curr)
# ========== 空仓遇到信号 -> 开仓 ==========
if current_position is None and direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[sig_key]['name'],
'signal_key': sig_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[sig_key]['count'] += 1
idx += 1
continue
# ========== 已持仓 ==========
if current_position:
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# ======= 反向信号 -> 平仓 + 反手 =======
if direction and direction != pos_dir:
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
# === 反手开新仓 ===
current_position = {
'direction': direction,
'signal': stats[sig_key]['name'],
'signal_key': sig_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
stats[sig_key]['count'] += 1
idx += 1
continue
# ======= 同向信号 -> 继续持仓 =======
if direction and direction == pos_dir:
idx += 1
continue
# ======= 单根反色 =======
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or \
(pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
# 先判断下一根是不是会形成信号
if idx + 1 < len(all_data):
look_dir, _ = check_signal(curr, all_data[idx + 1])
if look_dir: # 后面还能形成信号 → 忍耐
idx += 1
continue
# 否则按下一根收盘价平仓
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
current_position = None
idx += 1
# ========== 尾仓强制平仓 ==========
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \
(current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0:
stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ================ 主程序(盈利计算) ================
if __name__ == '__main__':
dates = [f"2025-11-{i}" for i in range(1, 32)]
trades, stats = backtest_15m_trend_optimized(dates)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points = 0
total_raw_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# 点差
point_diff = exit - entry if direction == '做多' else entry - exit
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points += point_diff
total_raw_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# 汇总
total_net = total_raw_profit - total_fee
print(f"\n总交易数:{len(trades)}")
print(f"总点差:{total_points:.2f}")
print(f"总原始盈利:{total_raw_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name = v['name']
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = wins / count * 100 if count > 0 else 0
avg_p = total_p / count if count > 0 else 0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")