diff --git a/telegram/bot_session.session b/telegram/bot_session.session index a720d41..0f6dbf9 100644 Binary files a/telegram/bot_session.session and b/telegram/bot_session.session differ diff --git a/telegram/sign.db b/telegram/sign.db index 53b414a..8375129 100644 Binary files a/telegram/sign.db and b/telegram/sign.db differ diff --git a/weex/长期持有信号/30分钟优化实体超过版本.py b/weex/长期持有信号/30分钟优化实体超过版本.py new file mode 100644 index 0000000..aaaa1ff --- /dev/null +++ b/weex/长期持有信号/30分钟优化实体超过版本.py @@ -0,0 +1,299 @@ +""" +15分钟K线包住形态策略(优化版本) +- 第二根实体必须比第一根实体大 20% 以上 +- 支持续持、反手、单根反色平仓 +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex30 # 你的数据库模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): + """阳线""" + return float(c['close']) > float(c['open']) + + +def is_bearish(c): + """阴线""" + return float(c['close']) < float(c['open']) + + +def calc_body(c): + """计算实体大小""" + return abs(float(c['close']) - float(c['open'])) + + +# ========================= 包住信号(含20%增强) ========================= + +def check_signal(prev, curr): + """ + 包住信号: + - 多头:前跌后涨包住 + - 空头:前涨后跌包住 + + 增强条件: + - 第二根(curr)K线实体大小 >= 第一根(prev) × 1.2 + """ + + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + prev_body = calc_body(prev) + curr_body = calc_body(curr) + + # ====== 第二根实体必须 ≥ 第一根实体 × 1.2 ====== + if curr_body < prev_body * 1.2: + return None, None + + # ----------- 多头包住信号(bear → bull)----------- + if (is_bearish(prev) and is_bullish(curr) + and c_open <= p_close + and c_close >= p_open): + return "long", "bear_bull_engulf" + + # ----------- 空头包住信号(bull → bear)----------- + if (is_bullish(prev) and is_bearish(curr) + and c_open >= p_close + and c_close <= p_open): + return "short", "bull_bear_engulf" + + return None, None + + +# ========================= 数据获取 ========================= + +def get_data_by_date(model, date_str: str): + """按天获取 15 分钟数据""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,需为 YYYY-MM-DD") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 核心回测逻辑 ========================= + +def backtest_15m_trend_optimized(dates: List[str]): + all_data = [] + for d in dates: + all_data.extend(get_data_by_date(Weex30, d)) + + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades = [] + current_position = None + idx = 1 + + while idx < len(all_data) - 1: + prev = all_data[idx - 1] + curr = all_data[idx] + next_bar = all_data[idx + 1] + + direction, sig_key = check_signal(prev, curr) + + # ========== 空仓遇到信号 -> 开仓 ========== + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[sig_key]['name'], + 'signal_key': sig_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[sig_key]['count'] += 1 + idx += 1 + continue + + # ========== 已持仓 ========== + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # ======= 反向信号 -> 平仓 + 反手 ======= + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \ + (current_position['entry_price'] - exit_price) + + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + + # === 反手开新仓 === + current_position = { + 'direction': direction, + 'signal': stats[sig_key]['name'], + 'signal_key': sig_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[sig_key]['count'] += 1 + + idx += 1 + continue + + # ======= 同向信号 -> 继续持仓 ======= + if direction and direction == pos_dir: + idx += 1 + continue + + # ======= 单根反色 ======= + curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or \ + (pos_dir == 'short' and is_bullish(curr)) + + if curr_is_opposite: + # 先判断下一根是不是会形成信号 + if idx + 1 < len(all_data): + look_dir, _ = check_signal(curr, all_data[idx + 1]) + if look_dir: # 后面还能形成信号 → 忍耐 + idx += 1 + continue + + # 否则按下一根收盘价平仓 + exit_price = float(next_bar['close']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \ + (current_position['entry_price'] - exit_price) + + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + + current_position = None + + idx += 1 + + # ========== 尾仓强制平仓 ========== + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else \ + (current_position['entry_price'] - exit_price) + + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: + stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ================ 主程序(盈利计算) ================ + +if __name__ == '__main__': + + dates = [f"2025-11-{i}" for i in range(1, 32)] + trades, stats = backtest_15m_trend_optimized(dates) + + logger.info("===== 每笔交易详情 =====") + + contract_size = 10000 + open_fee_fixed = 5 + close_fee_rate = 0.0005 + + total_points = 0 + total_raw_profit = 0 + total_fee = 0 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # 点差 + point_diff = exit - entry if direction == '做多' else entry - exit + + # 金额盈利 + money_profit = point_diff / entry * contract_size + + # 手续费 + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + net_profit = money_profit - fee + + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points += point_diff + total_raw_profit += money_profit + total_fee += fee + + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # 汇总 + total_net = total_raw_profit - total_fee + print(f"\n总交易数:{len(trades)}") + print(f"总点差:{total_points:.2f}") + print(f"总原始盈利:{total_raw_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name = v['name'] + count = v['count'] + wins = v['wins'] + total_p = v['total_profit'] + win_rate = wins / count * 100 if count > 0 else 0 + avg_p = total_p / count if count > 0 else 0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")