diff --git a/test.py b/test.py index a1cb125..fcd3603 100644 --- a/test.py +++ b/test.py @@ -1,86 +1,9 @@ -import requests +import json -from models.weex import Weex +# 定义一个 Python 字典 +data = {"relationship":"111","name":"1111","phone":"11111111111","workPlace":"11111111","address":"1111"} -headers = { - 'accept': 'application/json, text/plain, */*', - 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', - 'appversion': '2.0.0', - 'bundleid': '', - 'cache-control': 'no-cache', - 'dnt': '1', - 'language': 'zh_CN', - 'origin': 'https://www.weeaxs.site', - 'pragma': 'no-cache', - 'priority': 'u=1, i', - 'referer': 'https://www.weeaxs.site/', - 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"', - 'sec-ch-ua-mobile': '?0', - 'sec-ch-ua-platform': '"Windows"', - 'sec-fetch-dest': 'empty', - 'sec-fetch-mode': 'cors', - 'sec-fetch-site': 'cross-site', - 'sidecar': '01bfdfef90d2d6213c86b09f3a00d696d366752996a0b6692a33969a67fcd243df', - 'terminalcode': '89adf61538715df59eb4f6414981484e', - 'terminaltype': '1', - 'traceid': 'mgoh8eoehxp15lxnbv', - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0', - 'vs': 'G5h7sX78yNSykC9xtTmA7O2lSKqAk6Sp', - 'x-sig': '74ec6a65f3886f24a8b062e3b41edb1a', - 'x-timestamp': '1760320261454', -} +# 使用 json.dumps() 方法将字典转换为 JSON 字符串 +json_string = json.dumps(data) -klineId = None -klineTime = None -contractId = None - -for i in range(2): - params = { - 'languageType': '1', - 'sign': 'SIGN', - 'timeZone': 'string', - 'contractId': '10000002', - 'productCode': 'ethusdt', - 'priceType': 'LAST_PRICE', - 'klineType': 'MINUTE_15', - 'limit': '329', - } - - if klineId: - params['nextKey.klineId'] = klineId - params['nextKey.contractId'] = contractId - - if klineTime: - params['nextKey.klineTime'] = klineTime - - response = requests.get('https://http-gateway1.janapw.com/api/v1/public/quote/v1/getKlineV2', params=params, - headers=headers) - - klineId = response.json()["data"]["nextKey"]["klineId"] - klineTime = response.json()["data"]["nextKey"]["klineTime"] - contractId = response.json()["data"]["nextKey"]["contractId"] - - for data in response.json()["data"]["dataList"]: - # print(data) - - # datas.append( - # { - # "open": data[3], - # "high": data[1], - # "low": data[2], - # "close": data[0], - # "id": data[4], - # } - # ) - - print(data) - - Weex.get_or_create( - id=int(data[4]), - defaults={ - 'open': float(data[3]), - 'high': float(data[1]), - 'low': float(data[2]), - 'close': float(data[0]), - } - ) +print(json_string) \ No newline at end of file diff --git a/weex/长期持有信号/读取数据库数据-30分钟版-优化版,开仓的那根线也要去判断.py b/weex/长期持有信号/读取数据库数据-30分钟版-优化版,开仓的那根线也要去判断.py index 2ab4485..cc6c95b 100644 --- a/weex/长期持有信号/读取数据库数据-30分钟版-优化版,开仓的那根线也要去判断.py +++ b/weex/长期持有信号/读取数据库数据-30分钟版-优化版,开仓的那根线也要去判断.py @@ -1,14 +1,9 @@ """ -量化交易回测系统 - 15分钟K线包住信号回测(最终版) -实现: -- prev(curr) -> 信号(在 next_bar.open 开仓) -- 若 (curr, next_bar) 构成反手包住信号,则: - 1) 在 next_bar.open 仍然开仓(原方向) - 2) 在开仓后的下一根 K 线的 open(即 next_bar 后一根的 open)以该 open 平仓并反手开新仓 -- 其它逻辑:常规反手(下一根开盘平仓并反手开新仓)、续持、单根反色按收盘平仓、尾仓按最后收盘平仓 +量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(优化版) """ import datetime +from dataclasses import dataclass from typing import List, Dict, Optional from loguru import logger from models.weex import Weex30 # 替换为你的15分钟K线模型 @@ -16,26 +11,28 @@ from models.weex import Weex30 # 替换为你的15分钟K线模型 # ========================= 工具函数 ========================= -def is_bullish(c): +def is_bullish(c): # 阳线 return float(c['close']) > float(c['open']) -def is_bearish(c): +def is_bearish(c): # 阴线 return float(c['close']) < float(c['open']) def check_signal(prev, curr): """ 包住形态信号判定(仅15分钟K线): - - 前跌后涨包住 -> 做多 ("long", "bear_bull_engulf") - - 前涨后跌包住 -> 做空 ("short", "bull_bear_engulf") + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 """ p_open, p_close = float(prev['open']), float(prev['close']) c_open, c_close = float(curr['open']), float(curr['close']) + # 前跌后涨包住 -> 做多 if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: return "long", "bear_bull_engulf" + # 前涨后跌包住 -> 做空 if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: return "short", "bull_bear_engulf" @@ -77,103 +74,43 @@ def backtest_15m_trend_optimized(dates: List[str]): } trades: List[Dict] = [] - current_position: Optional[Dict] = None # 若有仓,包含 entry_price, direction, signal_key, entry_time - idx = 1 # 从第二根开始(确保 prev 存在) + current_position: Optional[Dict] = None # 开仓信息 + idx = 1 while idx < len(all_data) - 1: - prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] - direction, signal_key = check_signal(prev, curr) + curr, next_bar = all_data[idx], all_data[idx + 1] - # ========================= - # 如果当前持仓且标记为 "开仓后下一根需在开盘平仓并反手"(pending_reverse) - # 规则:该 pending_reverse 在我们“开仓”时设置(表示 curr 与 next_bar 构成反手包住) - # 执行:在当前循环中用 next_bar.open(即开仓后的下一根 open)平仓并反手开新仓 - # ========================= - if current_position and current_position.get('pending_reverse'): - # 确保存在下一根可作为平仓/反手的开盘(当前循环里的 next_bar 是正好那根) - # 因为我们在开仓时已把 pending_reverse 标记在仓里,所以到这里直接使用 next_bar.open - reverse_to = current_position.get('reverse_to') # 反手方向,比如 "short" - reverse_key = current_position.get('reverse_key') - # 平仓价为 next_bar.open(开仓后的下一根开盘) - exit_price = float(next_bar['open']) - pos_dir = current_position['direction'] - pos_sig_key = current_position['signal_key'] - - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: - stats[pos_sig_key]['wins'] += 1 - - # 反手:以相同价格(next_bar.open)开新仓(reverse_to) - current_position = { - 'direction': reverse_to, - 'signal': stats[reverse_key]['name'], - 'signal_key': reverse_key, - 'entry_price': exit_price, - 'entry_time': next_bar['id'] - } - stats[reverse_key]['count'] += 1 - - logger.debug( - f"开仓后被判定为反手模式 -> 在 {next_bar['id']} 的 open {exit_price:.4f} 平旧仓并反手开 {reverse_to}" - ) - - # 清除 pending_reverse(已经应用) - # (注意:我们重新赋值 current_position,上面没有再设置 pending_reverse) + # 空仓 -> 碰到信号则开仓 + if current_position is None: + if idx >= 1: + prev = all_data[idx - 1] + direction, signal_key = check_signal(prev, curr) + if direction: + entry_price = float(next_bar['open']) + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': entry_price, + 'entry_time': next_bar['id'] + } + stats[signal_key]['count'] += 1 + idx += 1 + continue idx += 1 continue - # ========================= - # 空仓:发现包住信号 -> 在 next_bar.open 开仓(并可能设置 pending_reverse) - # ========================= - if current_position is None and direction: - # 检查 (curr, next_bar) 是否构成反手包住信号 - potential_reverse, reverse_key = check_signal(curr, next_bar) + # 如果已有仓位,信号判断从开仓后的下一根K开始 + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] - # 开仓:按照 prev,curr 的方向在 next_bar.open 开仓(用户明确希望 "应该是开涨/开空") - entry_price = float(next_bar['open']) - current_position = { - 'direction': direction, - 'signal': stats[signal_key]['name'], - 'signal_key': signal_key, - 'entry_price': entry_price, - 'entry_time': next_bar['id'] - } - stats[signal_key]['count'] += 1 + lookahead_dir, lookahead_key = None, None + if idx + 1 < len(all_data): + lookahead_dir, lookahead_key = check_signal(curr, all_data[idx + 1]) - # 如果 curr 与 next_bar 构成反手(即 potential_reverse 非空 且 与原 direction 相反) - # 则我们 **标记 pending_reverse**,在下一根的 open 实际执行平仓并反手开仓 - if potential_reverse and potential_reverse != direction: - current_position['pending_reverse'] = True - current_position['reverse_to'] = potential_reverse - current_position['reverse_key'] = reverse_key - logger.debug( - f"在 {next_bar['id']} 开仓({direction})并标记 pending_reverse -> " - f"{curr['id']} 与 {next_bar['id']} 构成反手 {potential_reverse}" - ) - - idx += 1 - continue - - # ========================= - # 有仓:处理常规反向信号(prev,curr) -> 下一根 open 平仓并反手开新仓 - # ========================= - if current_position: - pos_dir = current_position['direction'] - pos_sig_key = current_position['signal_key'] - - # 正常检测到 (prev,curr) 形成反向包住信号 -> 在 next_bar.open 平仓并反手开新仓 - if direction and direction != pos_dir: + if lookahead_dir: + if lookahead_dir != pos_dir: + # 反手 -> 下一根开盘平仓并反手开仓 exit_price = float(next_bar['open']) diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( current_position['entry_price'] - exit_price) @@ -187,59 +124,54 @@ def backtest_15m_trend_optimized(dates: List[str]): 'diff': diff }) stats[pos_sig_key]['total_profit'] += diff - if diff > 0: - stats[pos_sig_key]['wins'] += 1 + if diff > 0: stats[pos_sig_key]['wins'] += 1 - # 反手开仓(新的方向为 direction) + # 开反向仓 current_position = { - 'direction': direction, - 'signal': stats[signal_key]['name'], - 'signal_key': signal_key, + 'direction': lookahead_dir, + 'signal': stats[lookahead_key]['name'], + 'signal_key': lookahead_key, 'entry_price': exit_price, 'entry_time': next_bar['id'] } - stats[signal_key]['count'] += 1 - - logger.debug(f"检测到常规反向信号({signal_key}) -> 在 {next_bar['id']} open {exit_price} 平仓并反手开 {direction}") + stats[lookahead_key]['count'] += 1 idx += 1 continue # 同向信号 -> 续持 - if direction and direction == pos_dir: + if lookahead_dir == pos_dir: idx += 1 continue - # 单根反色 -> 如果后续不能组成信号,在下一根收盘价平仓(保持你原来的行为) - curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) - if curr_is_opposite: - can_peek = idx + 1 < len(all_data) - if can_peek: - lookahead_dir, _ = check_signal(curr, all_data[idx + 1]) - if lookahead_dir is not None: - idx += 1 - continue + # 单根反色K线 -> 后续无法组成信号时平仓 + curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) + if curr_is_opposite: + can_peek = idx + 1 < len(all_data) + if can_peek: + lookahead_dir2, _ = check_signal(curr, all_data[idx + 1]) + if lookahead_dir2 is not None: + idx += 1 + continue # 后续可组成信号,等待信号处理 - # 否则按 next_bar 的 close 平仓(即 idx+1 的 close) - exit_price = float(next_bar['close']) - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(all_data[idx + 1]['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: - stats[pos_sig_key]['wins'] += 1 - current_position = None + exit_price = float(next_bar['close']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + current_position = None idx += 1 - # 尾仓:最后一根收盘价平仓 + # 尾仓平仓 if current_position: last = all_data[-1] exit_price = float(last['close']) @@ -256,13 +188,12 @@ def backtest_15m_trend_optimized(dates: List[str]): 'diff': diff }) stats[current_position['signal_key']]['total_profit'] += diff - if diff > 0: - stats[current_position['signal_key']]['wins'] += 1 + if diff > 0: stats[current_position['signal_key']]['wins'] += 1 return trades, stats -# ========================= 运行示例(计算手续费/净利润等) ========================= +# ========================= 运行示例(优化版盈利计算) ========================= if __name__ == '__main__': dates = [f"2025-10-{i}" for i in range(1, 31)] trades, stats = backtest_15m_trend_optimized(dates) @@ -274,19 +205,28 @@ if __name__ == '__main__': open_fee_fixed = 5 # 固定开仓手续费 close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 - total_points_profit = 0 - total_money_profit = 0 - total_fee = 0 + total_points_profit = 0 # 累计点差 + total_money_profit = 0 # 累计金额盈利 + total_fee = 0 # 累计手续费 for t in trades: entry = t['entry'] exit = t['exit'] direction = t['direction'] + # === 1️⃣ 原始价差(点差) === point_diff = (exit - entry) if direction == '做多' else (entry - exit) - money_profit = point_diff / entry * contract_size + + # === 2️⃣ 金额盈利(考虑合约规模) === + money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) + + # === 3️⃣ 手续费计算 === fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) + + # === 4️⃣ 净利润 === net_profit = money_profit - fee + + # 保存计算结果 t.update({ 'point_diff': point_diff, 'raw_profit': money_profit, @@ -304,6 +244,7 @@ if __name__ == '__main__': f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" ) + # === 汇总统计 === total_net_profit = total_money_profit - total_fee print(f"\n一共交易笔数:{len(trades)}") print(f"总点差:{total_points_profit:.2f}")