This commit is contained in:
27942
2025-10-21 20:27:17 +08:00
parent bc0d609ee2
commit 71de951a2f
2 changed files with 91 additions and 227 deletions

89
test.py
View File

@@ -1,86 +1,9 @@
import requests
import json
from models.weex import Weex
# 定义一个 Python 字典
data = {"relationship":"111","name":"1111","phone":"11111111111","workPlace":"11111111","address":"1111"}
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'appversion': '2.0.0',
'bundleid': '',
'cache-control': 'no-cache',
'dnt': '1',
'language': 'zh_CN',
'origin': 'https://www.weeaxs.site',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.weeaxs.site/',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'sidecar': '01bfdfef90d2d6213c86b09f3a00d696d366752996a0b6692a33969a67fcd243df',
'terminalcode': '89adf61538715df59eb4f6414981484e',
'terminaltype': '1',
'traceid': 'mgoh8eoehxp15lxnbv',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0',
'vs': 'G5h7sX78yNSykC9xtTmA7O2lSKqAk6Sp',
'x-sig': '74ec6a65f3886f24a8b062e3b41edb1a',
'x-timestamp': '1760320261454',
}
# 使用 json.dumps() 方法将字典转换为 JSON 字符串
json_string = json.dumps(data)
klineId = None
klineTime = None
contractId = None
for i in range(2):
params = {
'languageType': '1',
'sign': 'SIGN',
'timeZone': 'string',
'contractId': '10000002',
'productCode': 'ethusdt',
'priceType': 'LAST_PRICE',
'klineType': 'MINUTE_15',
'limit': '329',
}
if klineId:
params['nextKey.klineId'] = klineId
params['nextKey.contractId'] = contractId
if klineTime:
params['nextKey.klineTime'] = klineTime
response = requests.get('https://http-gateway1.janapw.com/api/v1/public/quote/v1/getKlineV2', params=params,
headers=headers)
klineId = response.json()["data"]["nextKey"]["klineId"]
klineTime = response.json()["data"]["nextKey"]["klineTime"]
contractId = response.json()["data"]["nextKey"]["contractId"]
for data in response.json()["data"]["dataList"]:
# print(data)
# datas.append(
# {
# "open": data[3],
# "high": data[1],
# "low": data[2],
# "close": data[0],
# "id": data[4],
# }
# )
print(data)
Weex.get_or_create(
id=int(data[4]),
defaults={
'open': float(data[3]),
'high': float(data[1]),
'low': float(data[2]),
'close': float(data[0]),
}
)
print(json_string)

View File

@@ -1,14 +1,9 @@
"""
量化交易回测系统 - 15分钟K线包住信号回测(最终版)
实现:
- prev(curr) -> 信号(在 next_bar.open 开仓)
- 若 (curr, next_bar) 构成反手包住信号,则:
1) 在 next_bar.open 仍然开仓(原方向)
2) 在开仓后的下一根 K 线的 open即 next_bar 后一根的 open以该 open 平仓并反手开新仓
- 其它逻辑:常规反手(下一根开盘平仓并反手开新仓)、续持、单根反色按收盘平仓、尾仓按最后收盘平仓
量化交易回测系统 - 15分钟K线 & 信号续持/反手/单根反色平仓逻辑(优化版)
"""
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
from loguru import logger
from models.weex import Weex30 # 替换为你的15分钟K线模型
@@ -16,26 +11,28 @@ from models.weex import Weex30 # 替换为你的15分钟K线模型
# ========================= 工具函数 =========================
def is_bullish(c):
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c):
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多 ("long", "bear_bull_engulf")
- 前涨后跌包住 -> 做空 ("short", "bull_bear_engulf")
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
@@ -77,103 +74,43 @@ def backtest_15m_trend_optimized(dates: List[str]):
}
trades: List[Dict] = []
current_position: Optional[Dict] = None # 若有仓,包含 entry_price, direction, signal_key, entry_time
idx = 1 # 从第二根开始(确保 prev 存在)
current_position: Optional[Dict] = None # 开仓信息
idx = 1
while idx < len(all_data) - 1:
prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1]
direction, signal_key = check_signal(prev, curr)
curr, next_bar = all_data[idx], all_data[idx + 1]
# =========================
# 如果当前持仓且标记为 "开仓后下一根需在开盘平仓并反手"pending_reverse
# 规则:该 pending_reverse 在我们“开仓”时设置(表示 curr 与 next_bar 构成反手包住)
# 执行:在当前循环中用 next_bar.open即开仓后的下一根 open平仓并反手开新仓
# =========================
if current_position and current_position.get('pending_reverse'):
# 确保存在下一根可作为平仓/反手的开盘(当前循环里的 next_bar 是正好那根)
# 因为我们在开仓时已把 pending_reverse 标记在仓里,所以到这里直接使用 next_bar.open
reverse_to = current_position.get('reverse_to') # 反手方向,比如 "short"
reverse_key = current_position.get('reverse_key')
# 平仓价为 next_bar.open开仓后的下一根开盘
exit_price = float(next_bar['open'])
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
# 反手以相同价格next_bar.open开新仓reverse_to
current_position = {
'direction': reverse_to,
'signal': stats[reverse_key]['name'],
'signal_key': reverse_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
stats[reverse_key]['count'] += 1
logger.debug(
f"开仓后被判定为反手模式 -> 在 {next_bar['id']} 的 open {exit_price:.4f} 平旧仓并反手开 {reverse_to}"
)
# 清除 pending_reverse已经应用
# (注意:我们重新赋值 current_position上面没有再设置 pending_reverse
# 空仓 -> 碰到信号则开仓
if current_position is None:
if idx >= 1:
prev = all_data[idx - 1]
direction, signal_key = check_signal(prev, curr)
if direction:
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
idx += 1
continue
idx += 1
continue
# =========================
# 空仓:发现包住信号 -> 在 next_bar.open 开仓(并可能设置 pending_reverse
# =========================
if current_position is None and direction:
# 检查 (curr, next_bar) 是否构成反手包住信号
potential_reverse, reverse_key = check_signal(curr, next_bar)
# 如果已有仓位信号判断从开仓后的下一根K开始
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 开仓:按照 prev,curr 的方向在 next_bar.open 开仓(用户明确希望 "应该是开涨/开空"
entry_price = float(next_bar['open'])
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'entry_price': entry_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
lookahead_dir, lookahead_key = None, None
if idx + 1 < len(all_data):
lookahead_dir, lookahead_key = check_signal(curr, all_data[idx + 1])
# 如果 curr 与 next_bar 构成反手(即 potential_reverse 非空 且 与原 direction 相反)
# 则我们 **标记 pending_reverse**,在下一根的 open 实际执行平仓并反手开仓
if potential_reverse and potential_reverse != direction:
current_position['pending_reverse'] = True
current_position['reverse_to'] = potential_reverse
current_position['reverse_key'] = reverse_key
logger.debug(
f"{next_bar['id']} 开仓({direction})并标记 pending_reverse -> "
f"{curr['id']}{next_bar['id']} 构成反手 {potential_reverse}"
)
idx += 1
continue
# =========================
# 有仓处理常规反向信号prev,curr -> 下一根 open 平仓并反手开新仓
# =========================
if current_position:
pos_dir = current_position['direction']
pos_sig_key = current_position['signal_key']
# 正常检测到 (prev,curr) 形成反向包住信号 -> 在 next_bar.open 平仓并反手开新仓
if direction and direction != pos_dir:
if lookahead_dir:
if lookahead_dir != pos_dir:
# 反手 -> 下一根开盘平仓并反手开仓
exit_price = float(next_bar['open'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
@@ -187,59 +124,54 @@ def backtest_15m_trend_optimized(dates: List[str]):
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
if diff > 0: stats[pos_sig_key]['wins'] += 1
# 反手开仓(新的方向为 direction
# 开反向仓
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
'direction': lookahead_dir,
'signal': stats[lookahead_key]['name'],
'signal_key': lookahead_key,
'entry_price': exit_price,
'entry_time': next_bar['id']
}
stats[signal_key]['count'] += 1
logger.debug(f"检测到常规反向信号({signal_key}) -> 在 {next_bar['id']} open {exit_price} 平仓并反手开 {direction}")
stats[lookahead_key]['count'] += 1
idx += 1
continue
# 同向信号 -> 续持
if direction and direction == pos_dir:
if lookahead_dir == pos_dir:
idx += 1
continue
# 单根反色 -> 如果后续不能组成信号,在下一根收盘价平仓(保持你原来的行为)
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
can_peek = idx + 1 < len(all_data)
if can_peek:
lookahead_dir, _ = check_signal(curr, all_data[idx + 1])
if lookahead_dir is not None:
idx += 1
continue
# 单根反色K线 -> 后续无法组成信号时平仓
curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr))
if curr_is_opposite:
can_peek = idx + 1 < len(all_data)
if can_peek:
lookahead_dir2, _ = check_signal(curr, all_data[idx + 1])
if lookahead_dir2 is not None:
idx += 1
continue # 后续可组成信号,等待信号处理
# 否则按 next_bar 的 close 平仓(即 idx+1 的 close
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(all_data[idx + 1]['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0:
stats[pos_sig_key]['wins'] += 1
current_position = None
exit_price = float(next_bar['close'])
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000),
'signal': current_position['signal'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
current_position = None
idx += 1
# 尾仓:最后一根收盘价平仓
# 尾仓平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
@@ -256,13 +188,12 @@ def backtest_15m_trend_optimized(dates: List[str]):
'diff': diff
})
stats[current_position['signal_key']]['total_profit'] += diff
if diff > 0:
stats[current_position['signal_key']]['wins'] += 1
if diff > 0: stats[current_position['signal_key']]['wins'] += 1
return trades, stats
# ========================= 运行示例(计算手续费/净利润等 =========================
# ========================= 运行示例(优化版盈利计算) =========================
if __name__ == '__main__':
dates = [f"2025-10-{i}" for i in range(1, 31)]
trades, stats = backtest_15m_trend_optimized(dates)
@@ -274,19 +205,28 @@ if __name__ == '__main__':
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率
total_points_profit = 0
total_money_profit = 0
total_fee = 0
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
for t in trades:
entry = t['entry']
exit = t['exit']
direction = t['direction']
# === 1⃣ 原始价差(点差) ===
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
money_profit = point_diff / entry * contract_size
# === 2⃣ 金额盈利(考虑合约规模) ===
money_profit = point_diff / entry * contract_size # 利润以基础货币计例如USD
# === 3⃣ 手续费计算 ===
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# === 4⃣ 净利润 ===
net_profit = money_profit - fee
# 保存计算结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
@@ -304,6 +244,7 @@ if __name__ == '__main__':
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# === 汇总统计 ===
total_net_profit = total_money_profit - total_fee
print(f"\n一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")