Files
lm_code/交易/bitmart-AI快速优化.py
Your Name b5af5b07f3 哈哈
2026-02-15 02:16:45 +08:00

317 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
AI策略快速优化 — 只用LightGBM多时间框架特征扫描参数
优化点 vs v1:
1. 63个特征(加5m/15m多时间框架)
2. 更强LightGBM参数(更多树+更深)
3. 扫描: 概率阈值/止损/止盈/前瞻期/持仓时间
4. 要求多空概率差距>3%才开仓(减少弱信号)
5. 动态ATR止盈选项
固定: 100U保证金, 100x杠杆, 10,000U名义, 90%返佣
"""
import datetime, sqlite3, time as _time
import numpy as np
import pandas as pd
import lightgbm as lgb
import warnings
from pathlib import Path
from collections import defaultdict
warnings.filterwarnings('ignore')
def load_data():
db = Path(__file__).parent.parent / 'models' / 'database.db'
s = int(datetime.datetime(2025,1,1).timestamp())*1000
e = int(datetime.datetime(2026,1,1).timestamp())*1000
conn = sqlite3.connect(str(db))
df = pd.read_sql_query(
f"SELECT id as ts,open,high,low,close FROM bitmart_eth_1m WHERE id>={s} AND id<{e} ORDER BY id", conn)
conn.close()
df['datetime'] = pd.to_datetime(df['ts'], unit='ms')
df.set_index('datetime', inplace=True)
return df
def add_features(df):
c=df['close']; h=df['high']; l=df['low']; o=df['open']
for p in [5,8,13,21,50,120]:
df[f'ema_{p}'] = c.ewm(span=p, adjust=False).mean()
df['ema_fast_slow'] = (df['ema_8']-df['ema_21'])/c
df['ema_slow_big'] = (df['ema_21']-df['ema_120'])/c
df['price_vs_ema120'] = (c-df['ema_120'])/c
df['price_vs_ema50'] = (c-df['ema_50'])/c
df['ema8_slope'] = df['ema_8'].pct_change(5)
df['ema21_slope'] = df['ema_21'].pct_change(5)
df['ema120_slope'] = df['ema_120'].pct_change(20)
df['triple_bull'] = ((df['ema_8']>df['ema_21'])&(df['ema_21']>df['ema_120'])).astype(float)
df['triple_bear'] = ((df['ema_8']<df['ema_21'])&(df['ema_21']<df['ema_120'])).astype(float)
delta = c.diff(); gain = delta.clip(lower=0); loss = (-delta).clip(lower=0)
for p in [7,14,21]:
ag=gain.rolling(p).mean(); al=loss.rolling(p).mean()
df[f'rsi_{p}'] = 100 - 100/(1+ag/al.replace(0,np.nan))
df['rsi_14_slope'] = df['rsi_14'].diff(5)
mid=c.rolling(20).mean(); std=c.rolling(20).std()
df['bb_pct'] = (c-(mid-2*std))/((mid+2*std)-(mid-2*std)).replace(0,np.nan)
df['bb_width'] = 4*std/mid
df['bb_width_chg'] = df['bb_width'].pct_change(10)
ema12=c.ewm(span=12,adjust=False).mean(); ema26=c.ewm(span=26,adjust=False).mean()
df['macd'] = (ema12-ema26)/c
df['macd_signal'] = df['macd'].ewm(span=9,adjust=False).mean()
df['macd_hist'] = df['macd']-df['macd_signal']
df['macd_hist_slope'] = df['macd_hist'].diff(3)
tr = pd.concat([h-l,(h-c.shift(1)).abs(),(l-c.shift(1)).abs()],axis=1).max(axis=1)
df['atr_pct'] = tr.rolling(14).mean()/c
df['atr_7'] = tr.rolling(7).mean()/c
df['atr_ratio'] = df['atr_7']/df['atr_pct'].replace(0,np.nan)
for p in [14,28]:
low_p=l.rolling(p).min(); high_p=h.rolling(p).max()
df[f'stoch_k_{p}'] = (c-low_p)/(high_p-low_p).replace(0,np.nan)*100
df['stoch_d_14'] = df['stoch_k_14'].rolling(3).mean()
for p in [1,3,5,10,20,60,120]:
df[f'ret_{p}'] = c.pct_change(p)
df['vol_5'] = c.pct_change().rolling(5).std()
df['vol_20'] = c.pct_change().rolling(20).std()
df['vol_60'] = c.pct_change().rolling(60).std()
df['vol_ratio'] = df['vol_5']/df['vol_20'].replace(0,np.nan)
df['vol_trend'] = df['vol_20'].pct_change(20)
body = (c-o).abs()
df['body_pct'] = body/c
df['upper_shadow'] = (h-pd.concat([o,c],axis=1).max(axis=1))/c
df['lower_shadow'] = (pd.concat([o,c],axis=1).min(axis=1)-l)/c
df['body_vs_range'] = body/(h-l).replace(0,np.nan)
df['range_pct'] = (h-l)/c
bullish = (c>o).astype(int)
df['streak'] = bullish.groupby((bullish!=bullish.shift()).cumsum()).cumcount()+1
df['streak'] = df['streak'] * bullish - df['streak'] * (1-bullish)
df['engulf_ratio'] = body/body.shift(1).replace(0,np.nan)
for p in [20,60]:
df[f'high_{p}'] = h.rolling(p).max()
df[f'low_{p}'] = l.rolling(p).min()
df[f'pos_{p}'] = (c-df[f'low_{p}'])/(df[f'high_{p}']-df[f'low_{p}']).replace(0,np.nan)
# 5分钟
c5=c.resample('5min').last(); h5=h.resample('5min').max()
l5=l.resample('5min').min(); o5=o.resample('5min').first()
e5_8=c5.ewm(span=8,adjust=False).mean(); e5_21=c5.ewm(span=21,adjust=False).mean()
df['ema5m_fs'] = ((e5_8-e5_21)/c5).reindex(df.index, method='ffill')
d5=c5.diff(); g5=d5.clip(lower=0).rolling(14).mean(); l5r=(-d5).clip(lower=0).rolling(14).mean()
df['rsi5m'] = (100-100/(1+g5/l5r.replace(0,np.nan))).reindex(df.index, method='ffill')
tr5=pd.concat([h5-l5,(h5-c5.shift(1)).abs(),(l5-c5.shift(1)).abs()],axis=1).max(axis=1)
df['atr5m'] = (tr5.rolling(14).mean()/c5).reindex(df.index, method='ffill')
for p in [1,5,20]:
df[f'ret5m_{p}'] = c5.pct_change(p).reindex(df.index, method='ffill')
# 15分钟
c15=c.resample('15min').last()
e15=c15.ewm(span=21,adjust=False).mean()
df['ema15m_trend'] = ((c15-e15)/c15).reindex(df.index, method='ffill')
df['ret15m_5'] = c15.pct_change(5).reindex(df.index, method='ffill')
df['hour'] = df.index.hour; df['minute'] = df.index.minute
df['hour_sin'] = np.sin(2*np.pi*df['hour']/24)
df['hour_cos'] = np.cos(2*np.pi*df['hour']/24)
df['weekday'] = df.index.weekday
return df
def get_fcols(df):
exclude = {'ts','open','high','low','close','label','month',
'ema_5','ema_8','ema_13','ema_21','ema_50','ema_120',
'high_20','low_20','high_60','low_60'}
return [c for c in df.columns if c not in exclude
and df[c].dtype in ('float64','float32','int64','int32')]
def train_predict(df, fcols, fb, thresh, train_m=3):
future_ret = df['close'].shift(-fb)/df['close'] - 1
df['label'] = 0
df.loc[future_ret > thresh, 'label'] = 1
df.loc[future_ret < -thresh, 'label'] = -1
df['month'] = df.index.to_period('M')
months = sorted(df['month'].unique())
pl = pd.Series(index=df.index, dtype=float); pl[:] = 0.0
ps = pd.Series(index=df.index, dtype=float); ps[:] = 0.0
params = {
'objective':'multiclass','num_class':3,'metric':'multi_logloss',
'learning_rate':0.03,'num_leaves':63,'max_depth':8,
'min_child_samples':80,'subsample':0.7,'colsample_bytree':0.7,
'reg_alpha':0.3,'reg_lambda':0.3,'verbose':-1,'n_jobs':-1,'seed':42
}
for i in range(train_m, len(months)):
tm = months[i]; ts_ = months[i-train_m]
tr_mask = (df['month']>=ts_)&(df['month']<tm)
te_mask = df['month']==tm
tr_df = df[tr_mask].dropna(subset=fcols+['label'])
te_df = df[te_mask].dropna(subset=fcols)
if len(tr_df)<1000 or len(te_df)<100: continue
dt_ = lgb.Dataset(tr_df[fcols].values, label=tr_df['label'].values+1)
model = lgb.train(params, dt_, num_boost_round=300)
proba = model.predict(te_df[fcols].values)
pl.loc[te_df.index] = proba[:,2]
ps.loc[te_df.index] = proba[:,0]
return pl, ps
def backtest(df, pl, ps, prob_th, sl, tp, mh, gap=0.03):
NOTIONAL=10000.0; FEE=NOTIONAL*0.0006*2; REB=FEE*0.9; NFEE=FEE-REB
pos=0; op=0.0; ot=None; trades=[]
for i in range(len(df)):
dt=df.index[i]; p=df['close'].iloc[i]; p_l=pl.iloc[i]; p_s=ps.iloc[i]
if pos!=0 and ot is not None:
pp=(p-op)/op if pos==1 else (op-p)/op
hsec=(dt-ot).total_seconds()
hard_sl=max(sl*1.5,0.006)
if -pp>=hard_sl: trades.append((pos,op,p,NOTIONAL*pp,hsec,'hsl',ot,dt));pos=0;continue
if hsec>=200:
if -pp>=sl: trades.append((pos,op,p,NOTIONAL*pp,hsec,'sl',ot,dt));pos=0;continue
if pp>=tp: trades.append((pos,op,p,NOTIONAL*pp,hsec,'tp',ot,dt));pos=0;continue
if hsec>=mh: trades.append((pos,op,p,NOTIONAL*pp,hsec,'to',ot,dt));pos=0;continue
if pos==1 and p_s>prob_th+0.08:
trades.append((pos,op,p,NOTIONAL*pp,hsec,'ai',ot,dt));pos=0
elif pos==-1 and p_l>prob_th+0.08:
trades.append((pos,op,p,NOTIONAL*pp,hsec,'ai',ot,dt));pos=0
if pos==0:
if p_l>prob_th and p_l>p_s+gap: pos=1;op=p;ot=dt
elif p_s>prob_th and p_s>p_l+gap: pos=-1;op=p;ot=dt
if pos!=0:
p=df['close'].iloc[-1];dt=df.index[-1]
pp=(p-op)/op if pos==1 else (op-p)/op
trades.append((pos,op,p,NOTIONAL*pp,(dt-ot).total_seconds(),'end',ot,dt))
return trades
def score(trades):
if not trades: return 0, 0, 0, 0, {}
NFEE=1.2; n=len(trades)
tpnl=sum(t[3] for t in trades); net=tpnl-NFEE*n
wins=len([t for t in trades if t[3]>0]); wr=wins/n*100
cum=0;peak=0;dd=0
monthly=defaultdict(lambda:{'n':0,'net':0,'w':0})
for t in trades:
cum+=t[3]-NFEE
if cum>peak:peak=cum
if peak-cum>dd:dd=peak-cum
k=t[7].strftime('%Y-%m')
monthly[k]['n']+=1;monthly[k]['net']+=t[3]-NFEE
if t[3]>0:monthly[k]['w']+=1
pm=len([m for m in monthly.values() if m['net']>0])
return net, wr, pm, dd, monthly
def main():
t0=_time.time()
print("="*70, flush=True)
print(" AI快速优化 | 100U x 100倍 | LightGBM + 63特征", flush=True)
print("="*70, flush=True)
df = load_data()
df = add_features(df)
fcols = get_fcols(df)
print(f" {len(df):,} bars, {len(fcols)} features\n", flush=True)
# 预训练不同前瞻/阈值的模型(最耗时的部分)
model_configs = [
(10, 0.003, 3, "10bar/0.3%/3m"),
(10, 0.003, 4, "10bar/0.3%/4m"),
(10, 0.004, 3, "10bar/0.4%/3m"),
(15, 0.004, 3, "15bar/0.4%/3m"),
(20, 0.005, 3, "20bar/0.5%/3m"),
]
predictions = {}
for fb, thresh, tm, lbl in model_configs:
print(f" Training: {lbl}...", flush=True)
dfc = df.copy()
pl, ps = train_predict(dfc, fcols, fb, thresh, tm)
predictions[lbl] = (pl, ps)
# 快速检查
t_ = backtest(dfc, pl, ps, 0.45, 0.005, 0.008, 1800)
n_, _, _, _, _ = score(t_)
print(f" quick check: {len(t_)} trades, net={n_:+.0f}", flush=True)
# 扫描回测参数
print(f"\n Scanning backtest params...\n", flush=True)
bt_configs = [
# prob_th, sl, tp, max_hold, gap
(0.42, 0.005, 0.008, 1800, 0.02),
(0.45, 0.005, 0.008, 1800, 0.03),
(0.48, 0.005, 0.008, 1800, 0.03),
(0.45, 0.005, 0.010, 2400, 0.03),
(0.48, 0.005, 0.010, 2400, 0.03),
(0.50, 0.005, 0.010, 2400, 0.03),
(0.45, 0.006, 0.012, 2400, 0.03),
(0.48, 0.006, 0.012, 3600, 0.03),
(0.50, 0.006, 0.015, 3600, 0.03),
(0.45, 0.004, 0.008, 1800, 0.03),
(0.42, 0.004, 0.006, 1200, 0.02),
]
results = []
for mlbl, (pl, ps) in predictions.items():
for prob_th, sl, tp, mh, gap in bt_configs:
trades = backtest(df, pl, ps, prob_th, sl, tp, mh, gap)
net, wr, pm, dd, monthly = score(trades)
n = len(trades)
if n > 0:
results.append({
'model': mlbl, 'prob': prob_th, 'sl': sl, 'tp': tp,
'mh': mh, 'gap': gap, 'n': n, 'net': net,
'wr': wr, 'pm': pm, 'dd': dd, 'monthly': monthly
})
# 按净利排序
results.sort(key=lambda x: x['net'], reverse=True)
# 打印Top 15
print(f"{'='*100}", flush=True)
print(f" TOP 15 配置 (100U保证金 x 100倍)", flush=True)
print(f"{'='*100}", flush=True)
print(f" {'#':>2} {'模型':<18} {'概率':>4} {'SL':>4} {'TP':>4} {'MH':>5} {'gap':>4} {'交易':>5} {'年净利':>8} {'月均':>6} {'胜率':>5} {'盈月':>4} {'回撤':>6}", flush=True)
print(f" {'-'*95}", flush=True)
for i, r in enumerate(results[:15]):
mavg = r['net']/12
print(f" {i+1:>2} {r['model']:<18} {r['prob']:.2f} {r['sl']*100:.1f}% {r['tp']*100:.1f}% {r['mh']:>5} {r['gap']:.2f} {r['n']:>5} {r['net']:>+8.0f} {mavg:>+6.0f} {r['wr']:>4.1f}% {r['pm']:>2}/12 {r['dd']:>6.0f}", flush=True)
# 最佳方案详情
best = results[0]
print(f"\n{'='*70}", flush=True)
print(f" 最佳方案: {best['model']}", flush=True)
print(f" 概率>{best['prob']} SL={best['sl']*100:.1f}% TP={best['tp']*100:.1f}% MH={best['mh']}s gap={best['gap']}", flush=True)
print(f" 年净利: {best['net']:+.0f} USDT = 月均 {best['net']/12:+.0f} USDT", flush=True)
print(f" 交易: {best['n']}笔 | 胜率: {best['wr']:.1f}% | 盈利月: {best['pm']}/12 | 回撤: {best['dd']:.0f}", flush=True)
print(f"{'='*70}", flush=True)
print(f"\n 月度明细:", flush=True)
for m in sorted(best['monthly'].keys()):
d = best['monthly'][m]
wr_m = d['w']/d['n']*100 if d['n']>0 else 0
s = "盈利" if d['net']>0 else "亏损"
print(f" {m}: {d['n']:>4}{d['net']:>+8.0f}U 胜率{wr_m:.0f}% [{s}]", flush=True)
# 对比
print(f"\n 对比:", flush=True)
print(f" 纯EMA策略: +1196/年 = +100/月 (227笔)", flush=True)
print(f" AI v1基线: +4801/年 = +400/月 (923笔)", flush=True)
print(f" AI v2优化: {best['net']:+.0f}/年 = {best['net']/12:+.0f}/月 ({best['n']}笔)", flush=True)
if best['net'] > 4801:
print(f" v2 vs v1提升: {(best['net']/4801-1)*100:+.0f}%", flush=True)
elapsed = _time.time()-t0
print(f"\n 耗时: {elapsed:.0f}s", flush=True)
print(f"{'='*70}", flush=True)
if __name__=='__main__':
main()