Files
lm_code/交易/bitmart-组合策略回测.py
Your Name b5af5b07f3 哈哈
2026-02-15 02:16:45 +08:00

400 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
多信号组合策略回测 — 目标 1000 USDT/月
5种信号源同一时间只持一个仓位每笔100U保证金100x杠杆
信号1: EMA金叉死叉 + ATR过滤 + 大趋势方向(已验证盈利)
信号2: 三分之一策略 — 前K线实体的1/3作为触发价动量突破
信号3: 布林带反弹 — 价格触及上下轨 + RSI确认均值回归
信号4: 吞没形态 — 当前K线完全包裹前K线反转信号
信号5: Pin Bar — 长影线蜡烛(拒绝信号)
所有信号共用:
- 大趋势过滤 EMA(120)
- 最低持仓 200秒 (>3分钟)
- 各自独立的止盈止损参数
- 90% 手续费返佣
"""
import sys, time, datetime, sqlite3, statistics
from pathlib import Path
from collections import defaultdict
class EMA:
__slots__ = ('k', 'v')
def __init__(self, p):
self.k = 2.0 / (p + 1); self.v = None
def update(self, x):
self.v = x if self.v is None else x * self.k + self.v * (1 - self.k)
return self.v
def load():
db = Path(__file__).parent.parent / 'models' / 'database.db'
s = int(datetime.datetime(2025,1,1).timestamp()) * 1000
e = int(datetime.datetime(2026,1,1).timestamp()) * 1000
conn = sqlite3.connect(str(db))
rows = conn.cursor().execute(
"SELECT id,open,high,low,close FROM bitmart_eth_1m WHERE id>=? AND id<? ORDER BY id", (s,e)
).fetchall()
conn.close()
return [(datetime.datetime.fromtimestamp(r[0]/1000.0), r[1], r[2], r[3], r[4]) for r in rows]
def calc_bb(closes, period=20, nstd=2.0):
if len(closes) < period:
return None, None, None
rec = closes[-period:]
mid = sum(rec) / period
var = sum((x - mid)**2 for x in rec) / period
std = var ** 0.5
return mid + nstd * std, mid, mid - nstd * std
def calc_rsi(closes, period=14):
if len(closes) < period + 1:
return None
gains = 0; losses = 0
for i in range(-period, 0):
d = closes[i] - closes[i-1]
if d > 0: gains += d
else: losses -= d
if losses == 0: return 100.0
rs = (gains/period) / (losses/period)
return 100 - 100 / (1 + rs)
def main():
print("Loading...", flush=True)
data = load()
N = len(data)
print(f"{N} bars loaded\n", flush=True)
# ===== 参数 =====
NOTIONAL = 10000.0 # 100U * 100x
FEE_RATE = 0.0006
REB_RATE = 0.90
MIN_HOLD = 200 # 秒
FEE_PER_TRADE = NOTIONAL * FEE_RATE * 2 # 12 USDT
REB_PER_TRADE = FEE_PER_TRADE * REB_RATE # 10.8 USDT
NET_FEE = FEE_PER_TRADE - REB_PER_TRADE # 1.2 USDT
# 信号参数
# 信号1: EMA交叉
EMA_FAST = 8; EMA_SLOW = 21; EMA_BIG = 120
ATR_MIN = 0.003; ATR_P = 14
SL1 = 0.004; HSL1 = 0.006
# 信号2: 三分之一 (用5分钟聚合K线)
BODY_MIN = 0.0008 # 最小实体占价格比例 0.08%
SL2 = 0.003; TP2 = 0.004 # 三分之一策略:小止损高胜率
# 信号3: 布林带反弹
BB_P = 20; BB_STD = 2.0; RSI_P = 14
RSI_LONG = 30; RSI_SHORT = 70
SL3 = 0.003; TP3 = 0.002 # 均值回归:回到中轨附近
# 信号4: 吞没形态
ENGULF_MIN_BODY = 0.001 # 最小吞没实体占比
SL4 = 0.004; TP4 = 0.005
# 信号5: Pin Bar
PIN_SHADOW_RATIO = 2.0 # 影线 >= 2倍实体
PIN_MIN_SHADOW = 0.001 # 最小影线占价格比例
SL5 = 0.003; TP5 = 0.004
MAX_HOLD = 1800 # 所有信号共用最大持仓
# ===== 状态 =====
ema_f = EMA(EMA_FAST); ema_s = EMA(EMA_SLOW); ema_b = EMA(EMA_BIG)
prev_fast = None; prev_slow = None
closes_buf = []; highs_buf = []; lows_buf = []
# 5分钟K线聚合
bar5_open = None; bar5_high = None; bar5_low = None; bar5_close = None
bar5_count = 0; bars5 = [] # 完成的5分钟K线
pos = 0; op = 0.0; ot = None; sig_type = ""; sl_pct = 0; tp_pct = 0
trades = []
def do_close(price, dt_, reason):
nonlocal pos, op, ot, sig_type
pp = (price - op) / op if pos == 1 else (op - price) / op
pnl = NOTIONAL * pp
fee = FEE_PER_TRADE; reb = REB_PER_TRADE
hsec = (dt_ - ot).total_seconds()
trades.append((sig_type, 'long' if pos==1 else 'short', op, price,
pnl, fee, reb, hsec, reason, ot, dt_))
pos = 0; op = 0; ot = None; sig_type = ""
def do_open(direction, price, dt_, stype, sl, tp):
nonlocal pos, op, ot, sig_type, sl_pct, tp_pct
pos = 1 if direction == 'long' else -1
op = price; ot = dt_; sig_type = stype; sl_pct = sl; tp_pct = tp
for i in range(N):
dt, o_, h_, l_, c_ = data[i]
p = c_
# 更新缓存
closes_buf.append(p); highs_buf.append(h_); lows_buf.append(l_)
if len(closes_buf) > 300:
closes_buf = closes_buf[-300:]
highs_buf = highs_buf[-300:]
lows_buf = lows_buf[-300:]
# EMA更新
fast = ema_f.update(p); slow = ema_s.update(p); big = ema_b.update(p)
# ATR
atr_pct = 0.0
if len(highs_buf) > ATR_P + 1:
s = 0.0
for j in range(-ATR_P, 0):
tr = highs_buf[j] - lows_buf[j]
d1 = abs(highs_buf[j] - closes_buf[j-1])
d2 = abs(lows_buf[j] - closes_buf[j-1])
if d1 > tr: tr = d1
if d2 > tr: tr = d2
s += tr
atr_pct = s / (ATR_P * p) if p > 0 else 0
# EMA交叉
ema_cross_up = prev_fast is not None and prev_fast <= prev_slow and fast > slow
ema_cross_dn = prev_fast is not None and prev_fast >= prev_slow and fast < slow
prev_fast = fast; prev_slow = slow
# 布林带 & RSI
bb_upper, bb_mid, bb_lower = calc_bb(closes_buf, BB_P, BB_STD)
rsi = calc_rsi(closes_buf, RSI_P)
# 5分钟K线聚合
if bar5_open is None:
bar5_open = o_; bar5_high = h_; bar5_low = l_; bar5_close = c_; bar5_count = 1
else:
bar5_high = max(bar5_high, h_)
bar5_low = min(bar5_low, l_)
bar5_close = c_
bar5_count += 1
new_bar5 = None
if bar5_count >= 5:
new_bar5 = {'open': bar5_open, 'high': bar5_high, 'low': bar5_low, 'close': bar5_close}
bars5.append(new_bar5)
if len(bars5) > 50: bars5 = bars5[-50:]
bar5_open = None; bar5_count = 0
# K线形态用1分钟线
prev_bar = data[i-1] if i > 0 else None
prev2_bar = data[i-2] if i > 1 else None
# ===== 有持仓:检查平仓 =====
if pos != 0 and ot is not None:
pp = (p - op) / op if pos == 1 else (op - p) / op
hsec = (dt - ot).total_seconds()
# 硬止损
hard_sl = max(sl_pct * 1.5, 0.006)
if -pp >= hard_sl:
do_close(p, dt, f"硬止损({pp*100:+.2f}%)"); continue
if hsec >= MIN_HOLD:
# 止损
if -pp >= sl_pct:
do_close(p, dt, f"止损({pp*100:+.2f}%)"); continue
# 止盈
if tp_pct > 0 and pp >= tp_pct:
do_close(p, dt, f"止盈({pp*100:+.2f}%)"); continue
# 超时
if hsec >= MAX_HOLD:
do_close(p, dt, f"超时({hsec:.0f}s)"); continue
# EMA信号反转平仓仅EMA信号开的仓
if sig_type == "EMA":
if pos == 1 and ema_cross_dn:
do_close(p, dt, "EMA反转"); continue
if pos == -1 and ema_cross_up:
do_close(p, dt, "EMA反转"); continue
# BB信号回到中轨平仓
if sig_type == "BB" and bb_mid is not None:
if pos == 1 and p >= bb_mid:
do_close(p, dt, "BB回中轨"); continue
if pos == -1 and p <= bb_mid:
do_close(p, dt, "BB回中轨"); continue
# ===== 无持仓:检查开仓(按优先级) =====
if pos == 0 and i > 20:
signal = None; s_sl = 0; s_tp = 0; s_type = ""
# 优先级1: EMA交叉最高质量
if atr_pct >= ATR_MIN:
if ema_cross_up and p > big:
signal = 'long'; s_type = "EMA"; s_sl = SL1; s_tp = 0
elif ema_cross_dn and p < big:
signal = 'short'; s_type = "EMA"; s_sl = SL1; s_tp = 0
# 优先级2: 三分之一策略5分钟K线动量
if signal is None and len(bars5) >= 2 and new_bar5 is not None:
prev5 = bars5[-2]
body5 = abs(prev5['close'] - prev5['open'])
body5_pct = body5 / prev5['close'] if prev5['close'] > 0 else 0
if body5_pct >= BODY_MIN:
trigger_up = prev5['close'] + body5 / 3
trigger_dn = prev5['close'] - body5 / 3
cur5 = bars5[-1]
if cur5['high'] >= trigger_up and p > big:
signal = 'long'; s_type = "1/3"; s_sl = SL2; s_tp = TP2
elif cur5['low'] <= trigger_dn and p < big:
signal = 'short'; s_type = "1/3"; s_sl = SL2; s_tp = TP2
# 优先级3: 吞没形态
if signal is None and prev_bar is not None:
pb_o, pb_c = prev_bar[1], prev_bar[4]
cb_o, cb_c = o_, c_
pb_body = abs(pb_c - pb_o)
cb_body = abs(cb_c - cb_o)
pb_body_pct = pb_body / p if p > 0 else 0
cb_body_pct = cb_body / p if p > 0 else 0
if cb_body_pct >= ENGULF_MIN_BODY and cb_body > pb_body * 1.5:
# 看涨吞没:前阴后阳,当前完全包裹
if pb_c < pb_o and cb_c > cb_o and cb_c > pb_o and cb_o <= pb_c:
if p > big and atr_pct >= 0.001:
signal = 'long'; s_type = "吞没"; s_sl = SL4; s_tp = TP4
# 看跌吞没:前阳后阴
elif pb_c > pb_o and cb_c < cb_o and cb_c < pb_o and cb_o >= pb_c:
if p < big and atr_pct >= 0.001:
signal = 'short'; s_type = "吞没"; s_sl = SL4; s_tp = TP4
# 优先级4: Pin Bar长影线反转
if signal is None and prev_bar is not None:
pb_o, pb_h, pb_l, pb_c = prev_bar[1], prev_bar[2], prev_bar[3], prev_bar[4]
pb_body = abs(pb_c - pb_o)
upper_shadow = pb_h - max(pb_o, pb_c)
lower_shadow = min(pb_o, pb_c) - pb_l
if pb_body > 0:
# 看涨Pin Bar长下影线
if lower_shadow >= PIN_SHADOW_RATIO * pb_body:
ls_pct = lower_shadow / p if p > 0 else 0
if ls_pct >= PIN_MIN_SHADOW and p > big and atr_pct >= 0.001:
signal = 'long'; s_type = "PinBar"; s_sl = SL5; s_tp = TP5
# 看跌Pin Bar长上影线
if upper_shadow >= PIN_SHADOW_RATIO * pb_body:
us_pct = upper_shadow / p if p > 0 else 0
if us_pct >= PIN_MIN_SHADOW and p < big and atr_pct >= 0.001:
signal = 'short'; s_type = "PinBar"; s_sl = SL5; s_tp = TP5
# 优先级5: 布林带反弹
if signal is None and bb_upper is not None and rsi is not None:
if p <= bb_lower and rsi < RSI_LONG and p > big and atr_pct >= 0.0008:
signal = 'long'; s_type = "BB"; s_sl = SL3; s_tp = TP3
elif p >= bb_upper and rsi > RSI_SHORT and p < big and atr_pct >= 0.0008:
signal = 'short'; s_type = "BB"; s_sl = SL3; s_tp = TP3
if signal:
do_open(signal, p, dt, s_type, s_sl, s_tp)
# 强制平仓
if pos != 0:
p = data[-1][4]; dt = data[-1][0]
do_close(p, dt, "回测结束")
# ===== 分析结果 =====
if not trades:
print("No trades!", flush=True); return
n = len(trades)
total_pnl = sum(t[4] for t in trades)
total_fee = FEE_PER_TRADE * n
total_reb = REB_PER_TRADE * n
net = total_pnl - (total_fee - total_reb)
wins = [t for t in trades if t[4] > 0]
wr = len(wins) / n * 100
# 按信号类型统计
by_type = defaultdict(lambda: {'n':0, 'pnl':0, 'w':0})
for t in trades:
by_type[t[0]]['n'] += 1
by_type[t[0]]['pnl'] += t[4]
if t[4] > 0: by_type[t[0]]['w'] += 1
# 月度
monthly = defaultdict(lambda: {'n':0, 'pnl':0, 'reb':0, 'fee':0, 'w':0})
for t in trades:
k = t[10].strftime('%Y-%m')
monthly[k]['n'] += 1
monthly[k]['pnl'] += t[4]
monthly[k]['reb'] += REB_PER_TRADE
monthly[k]['fee'] += FEE_PER_TRADE
if t[4] > 0: monthly[k]['w'] += 1
# 最大回撤
cum=0; peak=0; dd=0
for t in trades:
cum += t[4] - NET_FEE
if cum > peak: peak = cum
if peak - cum > dd: dd = peak - cum
print("=" * 75, flush=True)
print(" 多信号组合策略回测 | 100U x 100倍 | 目标1000U/月", flush=True)
print("=" * 75, flush=True)
print(f"\n --- 核心收益 ---", flush=True)
print(f" 方向盈亏: {total_pnl:>+12.2f} USDT", flush=True)
print(f" 返佣(90%): {total_reb:>+12.2f} USDT", flush=True)
print(f" 净手续费(10%): {total_fee-total_reb:>12.2f} USDT", flush=True)
print(f" ================================", flush=True)
print(f" 年净利润: {net:>+12.2f} USDT", flush=True)
print(f" 月均净利: {net/12:>+12.2f} USDT", flush=True)
print(f" 最大回撤: {dd:>12.2f} USDT", flush=True)
print(f"\n --- 交易统计 ---", flush=True)
print(f" 总交易: {n} 笔 | 胜率: {wr:.1f}% | 月均: {n/12:.0f}", flush=True)
print(f"\n --- 按信号类型 ---", flush=True)
print(f" {'类型':<10} {'笔数':>6} {'方向盈亏':>10} {'净利':>10} {'胜率':>6} {'每笔均利':>10}", flush=True)
print(f" {'-'*56}", flush=True)
for stype in sorted(by_type.keys()):
d = by_type[stype]
net_t = d['pnl'] - NET_FEE * d['n']
avg = net_t / d['n'] if d['n'] > 0 else 0
wr_t = d['w'] / d['n'] * 100 if d['n'] > 0 else 0
mark = " ++" if net_t > 0 else " --"
print(f" {stype:<10} {d['n']:>6} {d['pnl']:>+10.2f} {net_t:>+10.2f} {wr_t:>5.1f}% {avg:>+10.2f}{mark}", flush=True)
print(f"\n --- 月度明细 ---", flush=True)
print(f" {'月份':<8} {'笔数':>5} {'方向盈亏':>10} {'返佣':>8} {'净利润':>10} {'胜率':>6}", flush=True)
print(f" {'-'*52}", flush=True)
for m in sorted(monthly.keys()):
d = monthly[m]
net_m = d['pnl'] - (d['fee'] - d['reb'])
wr_m = d['w'] / d['n'] * 100 if d['n'] > 0 else 0
print(f" {m:<8} {d['n']:>5} {d['pnl']:>+10.2f} {d['reb']:>8.2f} {net_m:>+10.2f} {wr_m:>5.1f}%", flush=True)
print(f" {'-'*52}", flush=True)
print(f" {'合计':<8} {n:>5} {total_pnl:>+10.2f} {total_reb:>8.2f} {net:>+10.2f} {wr:>5.1f}%", flush=True)
# ===== 测试不同仓位大小达到1000U/月需要多少 =====
print(f"\n --- 仓位放大测试 ---", flush=True)
print(f" {'保证金':>8} {'杠杆':>4} {'名义价值':>12} {'年净利':>10} {'月均':>8} {'达标':>4}", flush=True)
print(f" {'-'*52}", flush=True)
for margin in [100, 200, 300, 500, 800, 1000]:
lev = 100
notional_test = margin * lev
scale = notional_test / NOTIONAL
net_scaled = net * scale
monthly_avg = net_scaled / 12
ok = "Yes" if monthly_avg >= 1000 else "No"
print(f" {margin:>7}U {lev:>3}x {notional_test:>11,}U {net_scaled:>+10.0f} {monthly_avg:>+8.0f} {ok:>4}", flush=True)
print(f"\n{'='*75}", flush=True)
# 保存CSV
csv = Path(__file__).parent.parent / 'combo_trades.csv'
with open(csv, 'w', encoding='utf-8-sig') as f:
f.write("信号,方向,开仓价,平仓价,方向盈亏,手续费,返佣,持仓秒,原因,开仓时间,平仓时间\n")
for t in trades:
f.write(f"{t[0]},{t[1]},{t[2]:.2f},{t[3]:.2f},{t[4]:.2f},"
f"{t[5]:.2f},{t[6]:.2f},{t[7]:.0f},{t[8]},{t[9]},{t[10]}\n")
print(f"\n Saved: {csv}", flush=True)
if __name__ == '__main__':
main()