""" 信号生成模块 - 多指标加权投票 + 多时间框架过滤 """ import numpy as np import pandas as pd def generate_indicator_signals(df: pd.DataFrame, params: dict) -> pd.DataFrame: """ 根据指标值生成每个指标的独立信号 (+1 做多 / -1 做空 / 0 中性) df 必须已经包含 compute_all_indicators 计算出的列 """ out = df.copy() # --- 布林带 %B --- out['sig_bb'] = 0 out.loc[out['bb_pct'] < params.get('bb_oversold', 0.0), 'sig_bb'] = 1 out.loc[out['bb_pct'] > params.get('bb_overbought', 1.0), 'sig_bb'] = -1 # --- 肯特纳通道 --- out['sig_kc'] = 0 out.loc[out['kc_pct'] < params.get('kc_oversold', 0.0), 'sig_kc'] = 1 out.loc[out['kc_pct'] > params.get('kc_overbought', 1.0), 'sig_kc'] = -1 # --- 唐奇安通道 --- out['sig_dc'] = 0 out.loc[out['dc_pct'] < params.get('dc_oversold', 0.2), 'sig_dc'] = 1 out.loc[out['dc_pct'] > params.get('dc_overbought', 0.8), 'sig_dc'] = -1 # --- EMA 交叉 --- out['sig_ema'] = 0 out.loc[out['ema_diff'] > 0, 'sig_ema'] = 1 out.loc[out['ema_diff'] < 0, 'sig_ema'] = -1 # --- MACD --- out['sig_macd'] = 0 out.loc[out['macd_hist'] > 0, 'sig_macd'] = 1 out.loc[out['macd_hist'] < 0, 'sig_macd'] = -1 # --- ADX + DI --- adx_thresh = params.get('adx_threshold', 25) out['sig_adx'] = 0 out.loc[(out['adx'] > adx_thresh) & (out['di_diff'] > 0), 'sig_adx'] = 1 out.loc[(out['adx'] > adx_thresh) & (out['di_diff'] < 0), 'sig_adx'] = -1 # --- Supertrend --- out['sig_st'] = out['st_dir'] # --- RSI --- rsi_ob = params.get('rsi_overbought', 70) rsi_os = params.get('rsi_oversold', 30) out['sig_rsi'] = 0 out.loc[out['rsi'] < rsi_os, 'sig_rsi'] = 1 out.loc[out['rsi'] > rsi_ob, 'sig_rsi'] = -1 # --- Stochastic --- stoch_ob = params.get('stoch_overbought', 80) stoch_os = params.get('stoch_oversold', 20) out['sig_stoch'] = 0 out.loc[(out['stoch_k'] < stoch_os) & (out['stoch_k'] > out['stoch_d']), 'sig_stoch'] = 1 out.loc[(out['stoch_k'] > stoch_ob) & (out['stoch_k'] < out['stoch_d']), 'sig_stoch'] = -1 # --- CCI --- cci_ob = params.get('cci_overbought', 100) cci_os = params.get('cci_oversold', -100) out['sig_cci'] = 0 out.loc[out['cci'] < cci_os, 'sig_cci'] = 1 out.loc[out['cci'] > cci_ob, 'sig_cci'] = -1 # --- Williams %R --- wr_ob = params.get('wr_overbought', -20) wr_os = params.get('wr_oversold', -80) out['sig_wr'] = 0 out.loc[out['wr'] < wr_os, 'sig_wr'] = 1 out.loc[out['wr'] > wr_ob, 'sig_wr'] = -1 # --- WMA --- out['sig_wma'] = 0 out.loc[out['wma_diff'] > 0, 'sig_wma'] = 1 out.loc[out['wma_diff'] < 0, 'sig_wma'] = -1 return out SIGNAL_COLS = [ 'sig_bb', 'sig_kc', 'sig_dc', 'sig_ema', 'sig_macd', 'sig_adx', 'sig_st', 'sig_rsi', 'sig_stoch', 'sig_cci', 'sig_wr', 'sig_wma', ] WEIGHT_KEYS = [ 'w_bb', 'w_kc', 'w_dc', 'w_ema', 'w_macd', 'w_adx', 'w_st', 'w_rsi', 'w_stoch', 'w_cci', 'w_wr', 'w_wma', ] def compute_composite_score(df: pd.DataFrame, params: dict) -> pd.Series: """ 加权投票计算综合得分 (-1 ~ +1) """ weights = np.array([params.get(k, 1.0) for k in WEIGHT_KEYS]) total_w = weights.sum() if total_w == 0: total_w = 1.0 signals = df[SIGNAL_COLS].values # (N, 12) score = (signals * weights).sum(axis=1) / total_w return pd.Series(score, index=df.index, name='score') def apply_htf_filter(score: pd.Series, htf_df: pd.DataFrame, params: dict) -> pd.Series: """ 用高时间框架(如1h)的趋势方向过滤信号 htf_df 需要包含 'ema_diff' 列 只允许与大趋势同向的信号通过 """ # 将 htf 的 ema_diff reindex 到主时间框架 htf_trend = htf_df['ema_diff'].reindex(score.index, method='ffill') filtered = score.copy() # 大趋势向上时,屏蔽做空信号 filtered.loc[(htf_trend > 0) & (filtered < 0)] = 0 # 大趋势向下时,屏蔽做多信号 filtered.loc[(htf_trend < 0) & (filtered > 0)] = 0 return filtered