""" 集成学习 + Meta 模型 + 动态风控模块 """ import numpy as np import pandas as pd import lightgbm as lgb import datetime ENSEMBLE_CONFIGS = [ {'num_leaves': 31, 'max_depth': 6, 'learning_rate': 0.05, 'subsample': 0.8, 'colsample_bytree': 0.8, 'reg_alpha': 0.1, 'reg_lambda': 0.1, 'seed': 42}, {'num_leaves': 63, 'max_depth': 8, 'learning_rate': 0.03, 'subsample': 0.7, 'colsample_bytree': 0.7, 'reg_alpha': 0.05, 'reg_lambda': 0.05, 'seed': 123}, {'num_leaves': 15, 'max_depth': 5, 'learning_rate': 0.08, 'subsample': 0.9, 'colsample_bytree': 0.9, 'reg_alpha': 0.5, 'reg_lambda': 0.5, 'seed': 456}, {'num_leaves': 31, 'max_depth': 7, 'learning_rate': 0.04, 'subsample': 0.75, 'colsample_bytree': 0.6, 'reg_alpha': 1.0, 'reg_lambda': 1.0, 'seed': 789}, {'num_leaves': 47, 'max_depth': 6, 'learning_rate': 0.06, 'subsample': 0.85, 'colsample_bytree': 0.85, 'reg_alpha': 0.2, 'reg_lambda': 0.3, 'seed': 2024}, ] META_EXTRA = ['bb_pct', 'rsi', 'zscore', 'atr_pct', 'vol_20', 'stoch_k', 'cci'] def _train_one(X, y, cfg, rounds=250): params = {'objective': 'multiclass', 'num_class': 3, 'metric': 'multi_logloss', 'min_child_samples': 50, 'verbose': -1, 'n_jobs': -1, **cfg} ds = lgb.Dataset(X, label=y) return lgb.train(params, ds, num_boost_round=rounds) def train_ensemble_with_meta( df_train, df_test, feature_cols, forward_bars, label_threshold, lgb_rounds=250, n_models=5, ): """集成学习+Meta模型: 5个多样化LightGBM基模型投票 + Meta二次过滤""" train_df = df_train.dropna(subset=feature_cols + ['label']) if len(train_df) < 2000: return None, None, None, None, None test_df = df_test.dropna(subset=feature_cols) if len(test_df) < 100: return None, None, None, None, None X_full = train_df[feature_cols].values y_full = (train_df['label'].values + 1).astype(int) X_test = test_df[feature_cols].values cfgs = ENSEMBLE_CONFIGS[:n_models] # Step1: 训练N个多样化基模型,在测试集上集成预测 models = [] test_probas = [] for cfg in cfgs: m = _train_one(X_full, y_full, cfg, lgb_rounds) models.append(m) test_probas.append(m.predict(X_test)) avg_proba = np.mean(test_probas, axis=0) std_proba = np.std(test_probas, axis=0) # Step2: Meta模型 - 训练集内时间序列划分 split_idx = int(len(train_df) * 0.7) base_part = train_df.iloc[:split_idx] meta_part = train_df.iloc[split_idx:] X_base = base_part[feature_cols].values y_base = (base_part['label'].values + 1).astype(int) X_meta_f = meta_part[feature_cols].values meta_probas = [] for cfg in cfgs: tmp = _train_one(X_base, y_base, cfg, lgb_rounds) meta_probas.append(tmp.predict(X_meta_f)) m_avg = np.mean(meta_probas, axis=0) m_std = np.std(meta_probas, axis=0) me_cols = [c for c in META_EXTRA if c in feature_cols] X_me = meta_part[me_cols].values if me_cols else np.zeros((len(meta_part), 1)) X_mi = np.hstack([m_avg, m_std, X_me]) # Meta标签: 模型预测方向与真实方向一致=1(好信号), 否则=0 pred_dir = np.argmax(m_avg, axis=1) true_dir = (meta_part['label'].values + 1).astype(int) ml = (pred_dir == true_dir).astype(int) has_sig = pred_dir != 1 meta_model = None if has_sig.sum() > 500: mp = {'objective': 'binary', 'metric': 'binary_logloss', 'num_leaves': 15, 'max_depth': 4, 'learning_rate': 0.05, 'min_child_samples': 100, 'subsample': 0.8, 'colsample_bytree': 0.8, 'reg_alpha': 1.0, 'reg_lambda': 1.0, 'verbose': -1, 'n_jobs': -1, 'seed': 42} dm = lgb.Dataset(X_mi[has_sig], label=ml[has_sig]) meta_model = lgb.train(mp, dm, num_boost_round=150) # Step3: 测试集Meta置信度 X_te = test_df[me_cols].values if me_cols else np.zeros((len(test_df), 1)) X_tm = np.hstack([avg_proba, std_proba, X_te]) if meta_model is not None: mc_arr = meta_model.predict(X_tm) else: mc_arr = 1.0 - std_proba.mean(axis=1) pl = pd.Series(0.0, index=df_test.index) ps = pd.Series(0.0, index=df_test.index) mc = pd.Series(0.5, index=df_test.index) ps.loc[test_df.index] = avg_proba[:, 0] pl.loc[test_df.index] = avg_proba[:, 2] mc.loc[test_df.index] = mc_arr return pl, ps, mc, models, meta_model def backtest_with_risk_control( df, proba_long, proba_short, notional=10000.0, prob_threshold=0.45, min_hold_seconds=180, max_hold_seconds=1800, sl_pct=0.004, tp_pct=0.006, meta_conf=None, meta_threshold=0.55, daily_loss_limit=80.0, dd_circuit_breaker=400.0, cooldown_bars=30, max_consec_losses=5, ): """带动态风控的回测: 日亏损限制+回撤熔断+连损冷却+Meta过滤""" pos = 0 open_price = 0.0 open_time = None trades = [] # 风控状态 daily_pnl = 0.0 current_day = None cum_pnl = 0.0 peak_pnl = 0.0 consec_losses = 0 cooldown_until = 0 net_fee = notional * 0.0006 * 2 * 0.10 # 净手续费 for i in range(len(df)): dt = df.index[i] price = df['close'].iloc[i] pl = proba_long.iloc[i] ps = proba_short.iloc[i] mc = meta_conf.iloc[i] if meta_conf is not None else 1.0 day = dt.date() # 日切重置 if current_day != day: current_day = day daily_pnl = 0.0 # 持仓中: 检查平仓条件 if pos != 0 and open_time is not None: pnl_pct = (price - open_price) / open_price if pos == 1 else (open_price - price) / open_price hold_sec = (dt - open_time).total_seconds() # 硬止损 if -pnl_pct >= sl_pct * 1.5: pnl_usdt = notional * pnl_pct trades.append((pos, open_price, price, pnl_usdt, hold_sec, '硬止损', open_time, dt)) cum_pnl += pnl_usdt - net_fee daily_pnl += pnl_usdt - net_fee if pnl_usdt < 0: consec_losses += 1 else: consec_losses = 0 if cum_pnl > peak_pnl: peak_pnl = cum_pnl pos = 0 continue if hold_sec >= min_hold_seconds: closed = False reason = '' if -pnl_pct >= sl_pct: reason = '止损' closed = True elif pnl_pct >= tp_pct: reason = '止盈' closed = True elif hold_sec >= max_hold_seconds: reason = '超时' closed = True elif pos == 1 and ps > prob_threshold + 0.05: reason = 'AI反转' closed = True elif pos == -1 and pl > prob_threshold + 0.05: reason = 'AI反转' closed = True if closed: pnl_usdt = notional * pnl_pct trades.append((pos, open_price, price, pnl_usdt, hold_sec, reason, open_time, dt)) cum_pnl += pnl_usdt - net_fee daily_pnl += pnl_usdt - net_fee if pnl_usdt < 0: consec_losses += 1 else: consec_losses = 0 if cum_pnl > peak_pnl: peak_pnl = cum_pnl if consec_losses >= max_consec_losses: cooldown_until = i + cooldown_bars pos = 0 # 开仓: 检查风控条件 if pos == 0: # 风控检查: 冷却期、日亏损限制、回撤熔断 if i < cooldown_until: continue if daily_pnl <= -daily_loss_limit: continue dd_now = peak_pnl - cum_pnl if dd_now >= dd_circuit_breaker: continue # Meta过滤: 置信度不够则跳过 if mc < meta_threshold: continue if pl > prob_threshold and pl > ps: pos = 1 open_price = price open_time = dt elif ps > prob_threshold and ps > pl: pos = -1 open_price = price open_time = dt if pos != 0: price = df['close'].iloc[-1] dt = df.index[-1] pnl_pct = (price - open_price) / open_price if pos == 1 else (open_price - price) / open_price hold_sec = (dt - open_time).total_seconds() trades.append((pos, open_price, price, notional * pnl_pct, hold_sec, '结束', open_time, dt)) return trades