Files
jyx_code4/strategy/ai_strategy.py

222 lines
9.1 KiB
Python
Raw Normal View History

2026-02-20 20:57:25 +08:00
"""
方案BAI模型训练 + 信号生成
使用 LightGBM / XGBoostWalk-Forward 滚动训练
"""
import json
import joblib
import pandas as pd
import numpy as np
from pathlib import Path
from loguru import logger
from .config import MODEL_CONFIG as MC, PRIMARY_PERIOD, PROJECT_ROOT
from .feature_engine import prepare_dataset, get_latest_feature_row
from .backtest import BacktestEngine, print_metrics
SCHEME_B_MODEL_DIR = PROJECT_ROOT / 'models'
SCHEME_B_MODEL_FILE = SCHEME_B_MODEL_DIR / 'scheme_b_last_model.joblib'
SCHEME_B_SCALER_FILE = SCHEME_B_MODEL_DIR / 'scheme_b_scaler.joblib'
SCHEME_B_FEATURES_FILE = SCHEME_B_MODEL_DIR / 'scheme_b_features.json'
class AIStrategy:
"""AI模型策略 — LightGBM / XGBoost Walk-Forward"""
def __init__(self, model_type: str = 'lightgbm'):
"""
:param model_type: 'lightgbm' 'xgboost'
"""
self.model_type = model_type
self.models = [] # 存储每个窗口训练的模型
self.feature_importance = None
def _create_model(self):
"""创建模型实例"""
if self.model_type == 'lightgbm':
import lightgbm as lgb
params = MC['lightgbm'].copy()
return lgb.LGBMClassifier(**params)
elif self.model_type == 'xgboost':
import xgboost as xgb
params = MC['xgboost'].copy()
return xgb.XGBClassifier(**params)
else:
raise ValueError(f"不支持的模型类型: {self.model_type}")
def walk_forward_train(self, X: pd.DataFrame, y: pd.Series,
confidence_threshold: float = 0.45) -> pd.Series:
"""
Walk-Forward 滚动训练与预测
:param confidence_threshold: 概率阈值低于此值的预测设为0观望
:return: 全部测试窗口拼接的预测信号
"""
train_size = MC['walk_forward_train_size']
test_size = MC['walk_forward_test_size']
step = MC['walk_forward_step']
n = len(X)
all_preds = pd.Series(dtype=float)
window_count = 0
logger.info(f"Walk-Forward: 数据量={n}, 训练窗口={train_size}, "
f"测试窗口={test_size}, 步长={step}, 置信阈值={confidence_threshold}")
start = 0
while start + train_size + test_size <= n:
train_end = start + train_size
test_end = min(train_end + test_size, n)
X_train = X.iloc[start:train_end]
y_train = y.iloc[start:train_end]
X_test = X.iloc[train_end:test_end]
y_test = y.iloc[train_end:test_end]
# 训练
model = self._create_model()
model.fit(X_train, y_train)
self.models.append(model)
# 预测概率 + 置信度过滤
proba = model.predict_proba(X_test)
max_proba = proba.max(axis=1)
raw_preds = model.predict(X_test)
# 置信度不够的设为观望
filtered_preds = raw_preds.copy()
filtered_preds[max_proba < confidence_threshold] = 0
preds = pd.Series(filtered_preds, index=X_test.index)
all_preds = pd.concat([all_preds, preds])
# 准确率(用原始预测算)
acc = (raw_preds == y_test).mean()
n_filtered = (max_proba < confidence_threshold).sum()
window_count += 1
logger.info(f" 窗口 {window_count}: 训练[{start}:{train_end}] "
f"测试[{train_end}:{test_end}] 准确率={acc:.2%} "
f"过滤={n_filtered}/{len(X_test)}")
start += step
# 特征重要性(取最后一个模型)
if self.models:
last_model = self.models[-1]
if hasattr(last_model, 'feature_importances_'):
self.feature_importance = pd.Series(
last_model.feature_importances_, index=X.columns
).sort_values(ascending=False)
logger.info(f"Walk-Forward 完成: {window_count} 个窗口, "
f"{len(all_preds)} 条预测")
return all_preds
def get_top_features(self, n: int = 20) -> pd.Series:
"""获取Top N重要特征"""
if self.feature_importance is not None:
return self.feature_importance.head(n)
return pd.Series(dtype=float)
def run(self, period: int = None, start_date: str = None, end_date: str = None) -> dict:
"""
完整运行方案B
若指定了 start_date/end_date会向前加载 warm_up_months 月数据用于训练使回测区间首月即有预测
:return: 回测结果
"""
if period is None:
period = PRIMARY_PERIOD
logger.info("=" * 60)
logger.info(f"方案BAI模型策略 ({self.model_type})")
logger.info("=" * 60)
from .data_loader import load_kline
# 1. 准备数据:若指定了回测区间,则向前加载预热数据,使区间内从首月就有预测
load_start, load_end = start_date, end_date
if start_date and end_date:
warm_months = MC.get('warm_up_months', 12)
load_start_ts = pd.Timestamp(start_date) - pd.DateOffset(months=warm_months)
load_start = load_start_ts.strftime('%Y-%m-%d')
logger.info(f"回测区间 [{start_date} ~ {end_date}],向前加载 {warm_months} 月至 {load_start} 用于训练")
X, y, feature_names, scaler = prepare_dataset(period, load_start, load_end)
# 2. Walk-Forward 训练
predictions = self.walk_forward_train(X, y)
# 3. 回测仅用用户指定区间将预测对齐到该区间的每根K线
df = load_kline(period, start_date, end_date)
if df.empty:
logger.warning("回测区间内无K线数据")
return BacktestEngine()._empty_result()
# 对齐信号:回测区间内有的时间戳用预测,缺失的填 0观望
signals = predictions.reindex(df.index, fill_value=0).astype(int)
prices = df['close']
# 4. 回测
engine = BacktestEngine()
result = engine.run(prices, signals)
print_metrics(result['metrics'], f"方案B: {self.model_type} AI策略")
# 5. 保存最后一窗模型、scaler、特征列供实盘 get_live_signal 使用)
if self.models and scaler is not None:
SCHEME_B_MODEL_DIR.mkdir(parents=True, exist_ok=True)
joblib.dump(self.models[-1], SCHEME_B_MODEL_FILE)
joblib.dump(scaler, SCHEME_B_SCALER_FILE)
with open(SCHEME_B_FEATURES_FILE, 'w', encoding='utf-8') as f:
json.dump(feature_names, f, ensure_ascii=False)
logger.info(f"已保存方案B模型: {SCHEME_B_MODEL_FILE}, scaler, {len(feature_names)} 个特征")
# 6. 输出特征重要性
top_feat = self.get_top_features(15)
if not top_feat.empty:
logger.info("\nTop 15 重要特征:")
for i, (feat, imp) in enumerate(top_feat.items()):
logger.info(f" {i+1}. {feat}: {imp:.4f}")
result['feature_importance'] = self.feature_importance
return result
def run_ai_strategy(model_type: str = 'lightgbm', period: int = None,
start_date: str = None, end_date: str = None) -> dict:
"""方案B快捷入口"""
strategy = AIStrategy(model_type=model_type)
return strategy.run(period, start_date, end_date)
def get_live_signal(period: int = None, model_type: str = 'lightgbm',
start_date: str = None, end_date: str = None) -> int:
"""
使用已保存的方案B模型对当前最新K线生成信号供实盘/模拟盘调用
需先运行过 run_ai_strategy AIStrategy().run() 以生成 models/scheme_b_*.joblib features.json
:param period: K线主周期默认 15
:param model_type: 未使用模型已固定为磁盘上的 scheme_b_last_model.joblib
:param start_date, end_date: 可选限制 load_kline 范围
:return: 0=观望, 1=做多, 2=做空
"""
if period is None:
period = PRIMARY_PERIOD
if not SCHEME_B_MODEL_FILE.exists() or not SCHEME_B_SCALER_FILE.exists() or not SCHEME_B_FEATURES_FILE.exists():
logger.warning("方案B模型未找到请先运行 AI 策略训练保存模型")
return 0
model = joblib.load(SCHEME_B_MODEL_FILE)
scaler = joblib.load(SCHEME_B_SCALER_FILE)
with open(SCHEME_B_FEATURES_FILE, 'r', encoding='utf-8') as f:
feature_cols = json.load(f)
X_last = get_latest_feature_row(period, feature_cols, start_date, end_date)
if X_last.empty:
return 0
X_scaled = scaler.transform(X_last)
2026-02-20 21:37:52 +08:00
X_scaled_df = pd.DataFrame(X_scaled, columns=feature_cols, index=X_last.index)
2026-02-21 14:28:17 +08:00
proba = model.predict_proba(X_scaled_df)[0] # (p0, p1, p2)
pred = model.predict(X_scaled_df)[0]
confidence_threshold = MC.get('confidence_threshold', 0.45)
logger.info(f"方案B 预测概率: 观望={proba[0]:.2f} 做多={proba[1]:.2f} 做空={proba[2]:.2f} -> {int(pred)}")
if proba.max() < confidence_threshold:
logger.info(f"置信度 {proba.max():.2f} < {confidence_threshold},视为观望")
return 0
return int(pred)