Files
lm_code/交易/bitmart-终极策略回测.py
Your Name b5af5b07f3 哈哈
2026-02-15 02:16:45 +08:00

339 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
终极组合策略回测 — 多时间框架 + 波动率自适应
核心改进:
1. 吞没形态用5分钟K线减少噪音提高质量
2. 加入突破策略N根K线新高新低突破
3. 波动率自适应:高波动用趋势跟踪,低波动用均值回归
4. 动态止盈使用ATR倍数作为止盈目标而非固定比例
5. 趋势强度过滤EMA斜率+ADX概念
信号:
A) EMA(8/21) 金叉死叉 + ATR>0.3% + EMA(120)趋势方向
B) 5分钟吞没形态 + EMA趋势方向 + ATR确认
C) 20根K线高低突破 + 趋势方向
D) BB反弹 + RSI极值 + 趋势方向(仅逆趋势小单)
条件: 同时只持1个仓, 90%返佣, >3分钟持仓
"""
import sys, time, datetime, sqlite3
from pathlib import Path
from collections import defaultdict
class EMA:
__slots__ = ('k', 'v')
def __init__(self, p):
self.k = 2.0 / (p + 1); self.v = None
def update(self, x):
self.v = x if self.v is None else x * self.k + self.v * (1 - self.k)
return self.v
def load():
db = Path(__file__).parent.parent / 'models' / 'database.db'
s = int(datetime.datetime(2025,1,1).timestamp()) * 1000
e = int(datetime.datetime(2026,1,1).timestamp()) * 1000
conn = sqlite3.connect(str(db))
rows = conn.cursor().execute(
"SELECT id,open,high,low,close FROM bitmart_eth_1m WHERE id>=? AND id<? ORDER BY id", (s,e)
).fetchall()
conn.close()
return [(datetime.datetime.fromtimestamp(r[0]/1000.0), r[1], r[2], r[3], r[4]) for r in rows]
def run(data, notional):
N = len(data)
FEE = notional * 0.0006 * 2
REB = FEE * 0.9
NFEE = FEE - REB
MIN_HOLD = 200; MAX_HOLD = 1800
# EMA
ef = EMA(8); es = EMA(21); eb = EMA(120); e50 = EMA(50)
pf_ = None; ps_ = None
CB = []; HB = []; LB = []
# 5分钟聚合
b5_o=None; b5_h=None; b5_l=None; b5_c=None; b5_cnt=0
bars5 = []
# 突破追踪
high_20 = []; low_20 = []
pos=0; op=0.0; ot=None; st=""; sl=0; tp_atr=0
trades = []
def close_(price, dt_, reason):
nonlocal pos, op, ot, st
pp = (price-op)/op if pos==1 else (op-price)/op
trades.append((st, 'L' if pos==1 else 'S', op, price, notional*pp,
(dt_-ot).total_seconds(), reason, ot, dt_))
pos=0; op=0; ot=None; st=""
for i in range(N):
dt, o_, h_, l_, c_ = data[i]
p = c_
CB.append(p); HB.append(h_); LB.append(l_)
if len(CB)>300: CB=CB[-300:]; HB=HB[-300:]; LB=LB[-300:]
fast=ef.update(p); slow=es.update(p); big=eb.update(p); mid50=e50.update(p)
# ATR
atr=0.0; atr_val=0.0
if len(HB)>15:
s=0
for j in range(-14,0):
tr=HB[j]-LB[j]; d1=abs(HB[j]-CB[j-1]); d2=abs(LB[j]-CB[j-1])
if d1>tr: tr=d1
if d2>tr: tr=d2
s+=tr
atr_val = s/14
atr = atr_val/p if p>0 else 0
# EMA交叉
cu = pf_ is not None and pf_<=ps_ and fast>slow
cd = pf_ is not None and pf_>=ps_ and fast<slow
pf_=fast; ps_=slow
# EMA趋势强度斜率
trend_up = fast > slow and slow > big # 三线多头
trend_dn = fast < slow and slow < big # 三线空头
# BB & RSI
bb_u=None; bb_m=None; bb_l=None; rsi=None
if len(CB)>=20:
rec=CB[-20:]; mid=sum(rec)/20
var=sum((x-mid)**2 for x in rec)/20; std=var**0.5
bb_u=mid+2*std; bb_m=mid; bb_l=mid-2*std
if len(CB)>=15:
g=0; l=0
for j in range(-14,0):
d=CB[j]-CB[j-1]
if d>0: g+=d
else: l-=d
rsi = 100-100/(1+(g/14)/(l/14)) if l>0 else 100
# 5分钟聚合
if b5_o is None:
b5_o=o_; b5_h=h_; b5_l=l_; b5_c=c_; b5_cnt=1
else:
b5_h=max(b5_h,h_); b5_l=min(b5_l,l_); b5_c=c_; b5_cnt+=1
new_b5 = False
if b5_cnt>=5:
bars5.append({'o':b5_o,'h':b5_h,'l':b5_l,'c':b5_c})
if len(bars5)>30: bars5=bars5[-30:]
b5_o=None; b5_cnt=0; new_b5=True
# 20根K线高低用于突破
high_20.append(h_); low_20.append(l_)
if len(high_20)>21: high_20=high_20[-21:]; low_20=low_20[-21:]
breakout_high = max(high_20[:-1]) if len(high_20)>1 else None
breakout_low = min(low_20[:-1]) if len(low_20)>1 else None
# ===== 持仓管理 =====
if pos!=0 and ot:
pp=(p-op)/op if pos==1 else (op-p)/op
hsec=(dt-ot).total_seconds()
# 动态止损/止盈基于ATR
hard_sl = max(sl*1.5, 0.006)
if -pp>=hard_sl: close_(p,dt,"硬止损"); continue
if hsec>=MIN_HOLD:
if -pp>=sl: close_(p,dt,"止损"); continue
# 动态止盈:盈利超过 tp_atr 倍ATR值
if tp_atr>0 and atr_val>0:
tp_target = tp_atr * atr_val / op # 转为百分比
if pp>=tp_target: close_(p,dt,f"ATR止盈({pp*100:.2f}%)"); continue
if hsec>=MAX_HOLD: close_(p,dt,"超时"); continue
# 趋势跟踪出场
if st in ("EMA","突破"):
if pos==1 and cd: close_(p,dt,"反向交叉"); continue
if pos==-1 and cu: close_(p,dt,"反向交叉"); continue
# 价格跌破慢线止盈
if pos==1 and p<slow and pp>0.001: close_(p,dt,"破慢线止盈"); continue
if pos==-1 and p>slow and pp>0.001: close_(p,dt,"破慢线止盈"); continue
if st=="BB" and bb_m:
if pos==1 and p>=bb_m: close_(p,dt,"BB回中轨"); continue
if pos==-1 and p<=bb_m: close_(p,dt,"BB回中轨"); continue
if st=="5m吞没":
if pos==1 and cd: close_(p,dt,"反向交叉"); continue
if pos==-1 and cu: close_(p,dt,"反向交叉"); continue
# ===== 开仓 =====
if pos==0 and i>120:
sig=None; s_sl=0; s_tp_atr=0; s_type=""
# 信号A: EMA交叉最高质量
if atr>=0.003:
if cu and p>big:
sig='L'; s_type="EMA"; s_sl=0.004; s_tp_atr=3.0
elif cd and p<big:
sig='S'; s_type="EMA"; s_sl=0.004; s_tp_atr=3.0
# 信号B: 5分钟吞没仅在新5分钟K线完成时检查
if sig is None and new_b5 and len(bars5)>=3:
prev5 = bars5[-2]; cur5 = bars5[-1]
pb = abs(prev5['c']-prev5['o'])
cb = abs(cur5['c']-cur5['o'])
cb_pct = cb/p if p>0 else 0
if cb_pct>=0.0015 and cb>pb*1.5 and atr>=0.0015:
# 看涨吞没
if prev5['c']<prev5['o'] and cur5['c']>cur5['o']:
if cur5['c']>prev5['o'] and cur5['o']<=prev5['c']:
if p>big:
sig='L'; s_type="5m吞没"; s_sl=0.005; s_tp_atr=2.5
# 看跌吞没
elif prev5['c']>prev5['o'] and cur5['c']<cur5['o']:
if cur5['c']<prev5['o'] and cur5['o']>=prev5['c']:
if p<big:
sig='S'; s_type="5m吞没"; s_sl=0.005; s_tp_atr=2.5
# 信号C: 突破20根K线新高新低
if sig is None and breakout_high and atr>=0.002:
if h_>breakout_high and trend_up:
sig='L'; s_type="突破"; s_sl=0.005; s_tp_atr=2.5
elif l_<breakout_low and trend_dn:
sig='S'; s_type="突破"; s_sl=0.005; s_tp_atr=2.5
# 信号D: BB反弹要求RSI极值 + ATR适中
if sig is None and bb_u and rsi is not None and 0.001<=atr<=0.004:
if p<=bb_l and rsi<25 and p>big:
sig='L'; s_type="BB"; s_sl=0.003; s_tp_atr=0
elif p>=bb_u and rsi>75 and p<big:
sig='S'; s_type="BB"; s_sl=0.003; s_tp_atr=0
if sig:
pos=1 if sig=='L' else -1; op=p; ot=dt
st=s_type; sl=s_sl; tp_atr=s_tp_atr
if pos!=0: close_(data[-1][4], data[-1][0], "结束")
return trades
def report(trades, notional, label):
if not trades: print(f" [{label}] No trades"); return 0,{}
n=len(trades)
FEE=notional*0.0006*2; REB=FEE*0.9; NFEE=FEE-REB
total_pnl=sum(t[4] for t in trades)
net=total_pnl-NFEE*n; total_reb=REB*n
wins=len([t for t in trades if t[4]>0]); wr=wins/n*100
by_type=defaultdict(lambda:{'n':0,'pnl':0,'w':0})
for t in trades:
by_type[t[0]]['n']+=1; by_type[t[0]]['pnl']+=t[4]
if t[4]>0: by_type[t[0]]['w']+=1
monthly=defaultdict(lambda:{'n':0,'net':0,'w':0})
for t in trades:
k=t[8].strftime('%Y-%m')
monthly[k]['n']+=1; monthly[k]['net']+=t[4]-NFEE
if t[4]>0: monthly[k]['w']+=1
cum=0;peak=0;dd=0
for t in trades:
cum+=t[4]-NFEE
if cum>peak:peak=cum
if peak-cum>dd:dd=peak-cum
pm=len([m for m in monthly.values() if m['net']>0])
min_m=min(monthly.values(),key=lambda x:x['net'])['net']
max_m=max(monthly.values(),key=lambda x:x['net'])['net']
print(f"\n{'='*75}", flush=True)
print(f" {label} | 名义值={notional:,.0f}U", flush=True)
print(f"{'='*75}", flush=True)
print(f" 年净利: {net:>+10.0f} | 月均: {net/12:>+8.0f} | 交易: {n}笔 | 胜率: {wr:.1f}%", flush=True)
print(f" 返佣: {total_reb:>.0f} | 回撤: {dd:>.0f} | 盈利月: {pm}/12", flush=True)
print(f" 最佳月: {max_m:>+.0f} | 最差月: {min_m:>+.0f}", flush=True)
print(f"\n 信号:", flush=True)
for st in sorted(by_type.keys()):
d=by_type[st]; nt=d['pnl']-NFEE*d['n']
wt=d['w']/d['n']*100 if d['n']>0 else 0
avg=nt/d['n'] if d['n']>0 else 0
mk="+" if nt>0 else "-"
print(f" {st:<8} {d['n']:>5}笔 净{nt:>+8.0f}{wt:.0f}% 均{avg:>+.1f}/笔 {mk}", flush=True)
print(f"\n 月度:", flush=True)
for m in sorted(monthly.keys()):
d=monthly[m]; wr_m=d['w']/d['n']*100 if d['n']>0 else 0
bar = "+" * max(0, int(d['net']/200)) + "-" * max(0, int(-d['net']/200))
print(f" {m} {d['n']:>4}{d['net']:>+8.0f} {wr_m:>4.0f}% {bar}", flush=True)
print(f" {'合计':>7} {n:>4}{net:>+8.0f}", flush=True)
print(f"{'='*75}", flush=True)
return net, monthly
def main():
print("Loading...", flush=True)
data = load()
print(f"{len(data)} bars\n", flush=True)
# 测试不同仓位大小
margins = [100, 200, 300, 500, 800, 1000]
print("="*80, flush=True)
print(" 不同保证金下的收益 (100x杠杆)", flush=True)
print("="*80, flush=True)
all_results = []
for margin in margins:
notional = margin * 100
trades = run(data, notional)
net, monthly = report(trades, notional, f"{margin}U保证金")
all_results.append((margin, notional, len(trades), net, monthly))
# 总览
print(f"\n\n{'='*80}", flush=True)
print(f" 总览 — 目标: 每月 1000 USDT", flush=True)
print(f"{'='*80}", flush=True)
print(f" {'保证金':>6} {'杠杆':>4} {'名义值':>10} {'交易':>5} {'年净利':>10} {'月均':>8} {'达标':>4}", flush=True)
print(f" {'-'*52}", flush=True)
for margin, notional, n, net, monthly in all_results:
mavg = net/12
ok = "YES" if mavg>=1000 else "no"
print(f" {margin:>5}U {100:>3}x {notional:>9,}U {n:>5} {net:>+10.0f} {mavg:>+8.0f} {ok:>4}", flush=True)
# 找到达标的最小保证金
print(f"\n 结论:", flush=True)
for margin, notional, n, net, monthly in all_results:
if net/12 >= 1000:
print(f" >>> {margin}U 保证金即可达到月均 {net/12:.0f} USDT <<<", flush=True)
# 打印该配置的月度
print(f"\n {margin}U配置月度明细:", flush=True)
pm = 0
for m in sorted(monthly.keys()):
d = monthly[m]
status = "" if d['net']>0 else ""
print(f" {m}: {d['net']:>+8.0f} USDT ({d['n']}笔) [{status}]", flush=True)
if d['net'] > 0: pm += 1
print(f" 盈利月份: {pm}/12", flush=True)
break
else:
# 没有达标的,计算需要多少
base_net = all_results[0][3] # 100U的净利
if base_net > 0:
needed = int(12000 / base_net * 100) + 1
print(f" 100U净利={base_net:.0f}/年 → 达标需约 {needed}U 保证金", flush=True)
else:
print(f" 策略本身不盈利,需要继续优化信号质量", flush=True)
print(f"{'='*80}", flush=True)
# 保存最佳配置的交易记录
best = max(all_results, key=lambda x: x[3])
margin, notional = best[0], best[1]
trades = run(data, notional)
csv = Path(__file__).parent.parent / 'final_trades.csv'
with open(csv, 'w', encoding='utf-8-sig') as f:
f.write("信号,方向,开仓价,平仓价,盈亏,持仓秒,原因,开仓时间,平仓时间\n")
for t in trades:
f.write(f"{t[0]},{t[1]},{t[2]:.2f},{t[3]:.2f},{t[4]:.2f},{t[5]:.0f},{t[6]},{t[7]},{t[8]}\n")
print(f"\n 交易记录: {csv}", flush=True)
if __name__=='__main__':
main()