diff --git a/models/database.db b/models/database.db index d07c4c5..f1f8945 100644 Binary files a/models/database.db and b/models/database.db differ diff --git a/models/weex.py b/models/weex.py index de83bb6..52d161e 100644 --- a/models/weex.py +++ b/models/weex.py @@ -38,6 +38,17 @@ class Weex1Hour(Model): database = db table_name = 'weex_1_hour' +class Weex30(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'weex_30' + # 连接到数据库 db.connect() diff --git a/weex/自动化抓取数据.py b/weex/自动化抓取数据.py index cacc96f..1b0103a 100644 --- a/weex/自动化抓取数据.py +++ b/weex/自动化抓取数据.py @@ -1,7 +1,7 @@ from DrissionPage import ChromiumPage, ChromiumOptions from bit_tools import openBrowser -from models.weex import Weex1, Weex1Hour +from models.weex import Weex1, Weex1Hour, Weex15, Weex30 if __name__ == '__main__': @@ -17,7 +17,7 @@ if __name__ == '__main__': page.set.window.max() - page.listen.start("https://http-gateway1.janapw.com/api/v1/public/quote/v1/getKlineV2") + page.listen.start("https://http-gateway1.ngsvsfx.cn/api/v1/public/quote/v1/getKlineV2") page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") @@ -28,7 +28,7 @@ if __name__ == '__main__': print(res.response.url) for data in res.response.body['data']["dataList"]: - Weex1Hour.get_or_create( + Weex30.get_or_create( id=int(data[4]), defaults={ 'open': float(data[3]), diff --git a/weex/长期持有信号/读取数据库数据-15分钟版.py b/weex/长期持有信号/读取数据库数据-15分钟版.py new file mode 100644 index 0000000..06f62b6 --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-15分钟版.py @@ -0,0 +1,259 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex15 # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定(仅15分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_optimized(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex15, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 下一根开盘平仓 + 同价反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # 同向信号 -> 续持 + if direction and direction == pos_dir: + idx += 1 + continue + + # 单根反色K线 -> 判断后续是否能组成信号 + curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) + if curr_is_opposite: + can_peek = idx + 1 < len(all_data) + if can_peek: + lookahead_dir, _ = check_signal(curr, all_data[idx + 1]) + if lookahead_dir is not None: + idx += 1 + continue # 后续可组成信号,等待信号处理 + + # 否则按收盘价平仓 + exit_price = float(next_bar['close']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + current_position = None + + idx += 1 + + # 尾仓:最后一根收盘价平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例(优化版盈利计算) ========================= +if __name__ == '__main__': + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{i1}") + # + # print(dates) + + # dates = [f"2025-10-{i}" for i in range(1, 31)] + trades, stats = backtest_15m_trend_optimized(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + # 开仓 + 平仓手续费(按比例计算 + 固定) + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + # if net_profit > 500 or net_profit < -500: + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/weex/长期持有信号/读取数据库数据-15分钟版,不判断信号.py b/weex/长期持有信号/读取数据库数据-15分钟版,不判断信号.py new file mode 100644 index 0000000..c58336b --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-15分钟版,不判断信号.py @@ -0,0 +1,265 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 反向K线即时平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex15 # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定: + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟K线)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_simplified(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex15, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 平仓 + 反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + + # 反手开仓 + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # === 新逻辑:遇到反向K线立即平仓 === + if pos_dir == 'long' and is_bearish(curr): + exit_price = float(curr['close']) + diff = exit_price - current_position['entry_price'] + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + if pos_dir == 'short' and is_bullish(curr): + exit_price = float(curr['close']) + diff = current_position['entry_price'] - exit_price + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + idx += 1 + + # 尾仓平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: + stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例 ========================= +if __name__ == '__main__': + # dates = [f"2025-06-{i}" for i in range(1, 31)] + + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{f'0{i1}' if len(str(i1)) < 2 else i1}") + + + + trades, stats = backtest_15m_trend_simplified(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/weex/长期持有信号/读取数据库数据-1小时版.py b/weex/长期持有信号/读取数据库数据-1小时版.py new file mode 100644 index 0000000..3a59a3c --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-1小时版.py @@ -0,0 +1,259 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex1Hour # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定(仅15分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_optimized(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex1Hour, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 下一根开盘平仓 + 同价反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # 同向信号 -> 续持 + if direction and direction == pos_dir: + idx += 1 + continue + + # 单根反色K线 -> 判断后续是否能组成信号 + curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) + if curr_is_opposite: + can_peek = idx + 1 < len(all_data) + if can_peek: + lookahead_dir, _ = check_signal(curr, all_data[idx + 1]) + if lookahead_dir is not None: + idx += 1 + continue # 后续可组成信号,等待信号处理 + + # 否则按收盘价平仓 + exit_price = float(next_bar['close']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + current_position = None + + idx += 1 + + # 尾仓:最后一根收盘价平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例(优化版盈利计算) ========================= +if __name__ == '__main__': + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{i1}") + # + # print(dates) + + # dates = [f"2025-10-{i}" for i in range(1, 31)] + trades, stats = backtest_15m_trend_optimized(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + # 开仓 + 平仓手续费(按比例计算 + 固定) + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + # if net_profit > 500 or net_profit < -500: + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/weex/长期持有信号/读取数据库数据-1小时版,不判断信号.py b/weex/长期持有信号/读取数据库数据-1小时版,不判断信号.py new file mode 100644 index 0000000..7cff7d5 --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-1小时版,不判断信号.py @@ -0,0 +1,265 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 反向K线即时平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex1Hour # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定: + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟K线)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_simplified(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex1Hour, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 平仓 + 反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + + # 反手开仓 + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # === 新逻辑:遇到反向K线立即平仓 === + if pos_dir == 'long' and is_bearish(curr): + exit_price = float(curr['close']) + diff = exit_price - current_position['entry_price'] + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + if pos_dir == 'short' and is_bullish(curr): + exit_price = float(curr['close']) + diff = current_position['entry_price'] - exit_price + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + idx += 1 + + # 尾仓平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: + stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例 ========================= +if __name__ == '__main__': + # dates = [f"2025-06-{i}" for i in range(1, 31)] + + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{f'0{i1}' if len(str(i1)) < 2 else i1}") + + + + trades, stats = backtest_15m_trend_simplified(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/weex/长期持有信号/读取数据库数据-30分钟版.py b/weex/长期持有信号/读取数据库数据-30分钟版.py new file mode 100644 index 0000000..8ecc0ee --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-30分钟版.py @@ -0,0 +1,259 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex30 # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定(仅15分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_optimized(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex30, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 下一根开盘平仓 + 同价反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # 同向信号 -> 续持 + if direction and direction == pos_dir: + idx += 1 + continue + + # 单根反色K线 -> 判断后续是否能组成信号 + curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) + if curr_is_opposite: + can_peek = idx + 1 < len(all_data) + if can_peek: + lookahead_dir, _ = check_signal(curr, all_data[idx + 1]) + if lookahead_dir is not None: + idx += 1 + continue # 后续可组成信号,等待信号处理 + + # 否则按收盘价平仓 + exit_price = float(next_bar['close']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + current_position = None + + idx += 1 + + # 尾仓:最后一根收盘价平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例(优化版盈利计算) ========================= +if __name__ == '__main__': + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{i1}") + # + # print(dates) + + # dates = [f"2025-10-{i}" for i in range(1, 31)] + trades, stats = backtest_15m_trend_optimized(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + # 开仓 + 平仓手续费(按比例计算 + 固定) + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + # if net_profit > 500 or net_profit < -500: + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/weex/长期持有信号/读取数据库数据-30分钟版,不判断信号.py b/weex/长期持有信号/读取数据库数据-30分钟版,不判断信号.py new file mode 100644 index 0000000..c52e8c7 --- /dev/null +++ b/weex/长期持有信号/读取数据库数据-30分钟版,不判断信号.py @@ -0,0 +1,265 @@ +""" +量化交易回测系统 - 仅15分钟K线 & 反向K线即时平仓逻辑(完整版) +""" + +import datetime +from dataclasses import dataclass +from typing import List, Dict, Optional +from loguru import logger +from models.weex import Weex30 # 替换为你的15分钟K线模型 + + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def check_signal(prev, curr): + """ + 包住形态信号判定: + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据(15分钟K线)""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# ========================= 回测逻辑 ========================= + +def backtest_15m_trend_simplified(dates: List[str]): + all_data: List[Dict] = [] + for d in dates: + all_data.extend(get_data_by_date(Weex30, d)) + if not all_data: + return [], { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + all_data.sort(key=lambda x: x['id']) + + stats = { + 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, + 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + idx = 1 + + while idx < len(all_data) - 1: + prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] + direction, signal_key = check_signal(prev, curr) + + # 空仓 -> 碰到信号则开仓 + if current_position is None and direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + if current_position: + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] + + # 反向信号 -> 平仓 + 反手 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + + # 反手开仓 + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # === 新逻辑:遇到反向K线立即平仓 === + if pos_dir == 'long' and is_bearish(curr): + exit_price = float(curr['close']) + diff = exit_price - current_position['entry_price'] + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + if pos_dir == 'short' and is_bullish(curr): + exit_price = float(curr['close']) + diff = current_position['entry_price'] - exit_price + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: + stats[pos_sig_key]['wins'] += 1 + current_position = None + idx += 1 + continue + + idx += 1 + + # 尾仓平仓 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[current_position['signal_key']]['total_profit'] += diff + if diff > 0: + stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# ========================= 运行示例 ========================= +if __name__ == '__main__': + # dates = [f"2025-06-{i}" for i in range(1, 31)] + + dates = [] + for i in range(1, 11): + for i1 in range(1, 31): + dates.append(f"2025-{f'0{i}' if len(str(i)) < 2 else i}-{f'0{i1}' if len(str(i1)) < 2 else i1}") + + + + trades, stats = backtest_15m_trend_simplified(dates) + + logger.info("===== 每笔交易详情 =====") + + # === 参数设定 === + contract_size = 10000 # 合约规模(1手对应多少基础货币) + open_fee_fixed = 5 # 固定开仓手续费 + close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 + + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 + + for t in trades: + entry = t['entry'] + exit = t['exit'] + direction = t['direction'] + + # === 1️⃣ 原始价差(点差) === + point_diff = (exit - entry) if direction == '做多' else (entry - exit) + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === + fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === + net_profit = money_profit - fee + + # 保存计算结果 + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + logger.info( + f"{t['entry_time']} {direction}({t['signal']}) " + f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" + ) + + # === 汇总统计 === + total_net_profit = total_money_profit - total_fee + print(f"\n一共交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利(未扣费):{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}\n") + + print("===== 信号统计 =====") + for k, v in stats.items(): + name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")