This commit is contained in:
27942
2025-10-13 17:49:07 +08:00
parent 9ce23b29fb
commit 90dcfb6dee

View File

@@ -1,227 +1,205 @@
import datetime
from loguru import logger
from models.weex import Weex15
from peewee import fn
from models.weex import Weex15, Weex1
def is_bullish(candle):
"""判断是否是阳线(开盘价 < 收盘价,即涨)"""
return float(candle['open']) < float(candle['close'])
# ===============================================================
# 📊 数据获取模块
# ===============================================================
def get_data_by_date(model, date_str):
"""按天获取指定表的数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = (model
.select()
.where(model.id.between(start_ts, end_ts))
.order_by(model.id.asc()))
return [
{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close}
for i in query
]
def is_bearish(candle):
"""判断是否是阴线(开盘价 > 收盘价,即跌)"""
return float(candle['open']) > float(candle['close'])
def get_future_data_1min(start_ts, end_ts):
"""获取指定时间范围内的 1 分钟数据"""
query = (Weex1
.select()
.where(Weex1.id.between(start_ts, end_ts))
.order_by(Weex1.id.asc()))
return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
# ===============================================================
# 📈 信号判定模块
# ===============================================================
def is_bullish(c): return float(c['open']) < float(c['close'])
def is_bearish(c): return float(c['open']) > float(c['close'])
def check_signal(prev, curr):
"""
判断是否出现包住形态,返回信号类型和方向:
1. 前跌后涨包住 -> 做多信号 (bear_bull_engulf)
2. 前涨后跌包住 -> 做空信号 (bull_bear_engulf)
3. 前涨后涨包住 -> 做多信号 (bull_bull_engulf)
4. 前跌后跌包住 -> 做空信号 (bear_bear_engulf)
"""
p_open, p_close = float(prev['open']), float(prev['close']) # 前一笔
c_open, c_close = float(curr['open']), float(curr['close']) # 当前一笔
"""判断是否出现包住形态"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 情况1前跌后涨且涨线包住前跌线 -> 做多信号
if is_bullish(curr) and is_bearish(prev):
if c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 情况2前涨后跌且跌线包住前涨线 -> 做空信号
if is_bearish(curr) and is_bullish(prev):
if c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
# # 情况3前涨后涨且后涨线包住前涨线 -> 做多信号
# if is_bullish(curr) and is_bullish(prev):
# if c_open <= p_open and c_close >= p_close:
# return "long", "bull_bull_engulf"
#
# # 情况4前跌后跌且后跌线包住前跌线 -> 做空信号
# if is_bearish(curr) and is_bearish(prev):
# if c_open >= p_open and c_close <= p_close:
# return "short", "bear_bear_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
return None, None
def simulate_trade(direction, entry_price, future_candles, take_profit_diff=30, stop_loss_diff=-10):
# ===============================================================
# 💹 回测模拟模块(使用 1 分钟数据)
# ===============================================================
def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl=-1):
"""
模拟交易逐根K线回测
改进版:考虑开盘跳空触发止盈/止损的情况
用 1 分钟数据进行精细化止盈止损模拟
entry_time: 当前信号的 entry candle id毫秒时间戳
next_15min_time: 下一个15min时间戳用于界定止盈止损分析范围
"""
# 查 15 分钟之间的 1 分钟数据
future_candles = get_future_data_1min(entry_time, next_15min_time)
if not future_candles:
return None, 0, None
tp_price = entry_price + tp if direction == "long" else entry_price - tp
sl_price = entry_price + sl if direction == "long" else entry_price - sl
for candle in future_candles:
open_p = float(candle['open'])
high = float(candle['high'])
low = float(candle['low'])
close = float(candle['close'])
open_p, high, low = map(float, (candle['open'], candle['high'], candle['low']))
if direction == "long":
tp_price = entry_price + take_profit_diff
sl_price = entry_price + stop_loss_diff
# 🧩 开盘就跳空止盈
if open_p >= tp_price:
if open_p >= tp_price: # 开盘跳空止盈
return open_p, open_p - entry_price, candle['id']
# 🧩 开盘就跳空止损
if open_p <= sl_price:
if open_p <= sl_price: # 开盘跳空止损
return open_p, open_p - entry_price, candle['id']
# 正常区间内触发止盈/止损
if high >= tp_price:
return tp_price, take_profit_diff, candle['id']
return tp_price, tp, candle['id']
if low <= sl_price:
return sl_price, stop_loss_diff, candle['id']
return sl_price, sl, candle['id']
elif direction == "short":
tp_price = entry_price - take_profit_diff
sl_price = entry_price - stop_loss_diff
# 🧩 开盘就跳空止盈
else: # short
if open_p <= tp_price:
return open_p, entry_price - open_p, candle['id']
# 🧩 开盘就跳空止损
if open_p >= sl_price:
return open_p, entry_price - open_p, candle['id']
# 正常区间内触发止盈/止损
if low <= tp_price:
return tp_price, take_profit_diff, candle['id']
return tp_price, tp, candle['id']
if high >= sl_price:
return sl_price, stop_loss_diff, candle['id']
return sl_price, sl, candle['id']
# 未触发止盈止损,最后收盘价平仓
final_price = float(future_candles[-1]['close'])
if direction == "long":
diff_money = final_price - entry_price
else:
diff_money = entry_price - final_price
return final_price, diff_money, future_candles[-1]['id']
# 未触发止盈止损,最后一根收盘价平仓
final = future_candles[-1]
final_price = float(final['close'])
diff = (final_price - entry_price) if direction == "long" else (entry_price - final_price)
return final_price, diff, final['id']
def get_data_by_date(date_str):
# 将日期字符串转换为 datetime 对象
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
print("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
# ===============================================================
# 📊 主回测流程
# ===============================================================
# 计算该天的开始时间戳(毫秒级)
start_timestamp = int(target_date.timestamp() * 1000)
# 计算该天的结束时间戳(毫秒级),即下一天 00:00:00 的时间戳减去 1 毫秒
end_timestamp = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
def backtest(dates, tp=8, sl=-1):
all_data = []
for date_str in dates:
all_data.extend(get_data_by_date(Weex15, date_str))
# 查询该天的数据,并按照 id 字段从小到大排序
query = Weex15.select().where(Weex15.id.between(start_timestamp, end_timestamp)).order_by(Weex15.id.asc())
results = list(query)
all_data.sort(key=lambda x: x['id'])
# 将结果转换为列表嵌套字典的形式
data_list = []
for item in results:
item_dict = {
'id': item.id,
'open': item.open,
'high': item.high,
'low': item.low,
'close': item.close
}
data_list.append(item_dict)
return data_list
if __name__ == '__main__':
# 示例调用
datas = []
for i in range(1, 31):
date_str = f'2025-8-{i}'
data = get_data_by_date(date_str)
datas.extend(data)
datas = sorted(datas, key=lambda x: x["id"])
zh_project = 0 # 累计盈亏
all_trades = [] # 保存所有交易明细
daily_signals = 0 # 信号总数
daily_wins = 0
daily_profit = 0 # 价差总和
# 四种信号类型的统计
signal_stats = {
stats = {
"bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"},
"bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"},
# "bull_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包涨"},
# "bear_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包跌"}
}
# 遍历每根K线寻找信号
for idx in range(1, len(datas) - 1): # 留出未来K线
prev, curr = datas[idx - 1], datas[idx] # 前一笔,当前一笔
entry_candle = datas[idx + 1] # 下一根开盘价作为入场价
future_candles = datas[idx + 1:] # 未来行情
trades = []
entry_open = float(entry_candle['open']) # 开仓价格
direction, signal_type = check_signal(prev, curr) # 判断开仓方向和信号类型
for idx in range(1, len(all_data) - 1):
prev, curr = all_data[idx - 1], all_data[idx]
entry_candle = all_data[idx + 1]
if direction and signal_type:
daily_signals += 1
direction, signal = check_signal(prev, curr)
if not direction:
continue
exit_price, diff, exit_time = simulate_trade(direction, entry_open, future_candles, take_profit_diff=6,
stop_loss_diff=-2)
# 下一个 15 分钟K线的时间范围
next_15min_time = all_data[idx + 1]['id'] if idx + 2 < len(all_data) else all_data[-1]['id']
# 统计该信号类型的表现
signal_stats[signal_type]["count"] += 1
signal_stats[signal_type]["total_profit"] += diff
if diff > 0:
signal_stats[signal_type]["wins"] += 1
daily_wins += 1
entry_price = float(entry_candle['open'])
exit_price, diff, exit_time = simulate_trade(direction, entry_price, entry_candle['id'], next_15min_time, tp=tp,
sl=sl)
daily_profit += diff
if exit_price is None:
continue
# 将时间戳转换为本地时间
local_time = datetime.datetime.fromtimestamp(entry_candle['id'] / 1000)
formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S")
stats[signal]['count'] += 1
stats[signal]['total_profit'] += diff
if diff > 0:
stats[signal]['wins'] += 1
exit_time = datetime.datetime.fromtimestamp(exit_time / 1000)
exit_time1 = exit_time.strftime("%Y-%m-%d %H:%M:%S")
trades.append({
"entry_time": datetime.datetime.fromtimestamp(entry_candle['id'] / 1000),
"exit_time": datetime.datetime.fromtimestamp(exit_time / 1000),
"signal": stats[signal]['name'],
"direction": "做多" if direction == "long" else "做空",
"entry": entry_price,
"exit": exit_price,
"diff": diff
})
# 保存交易详情
all_trades.append(
(
f"{formatted_time}",
"做多" if direction == "long" else "做空",
signal_stats[signal_type]["name"],
entry_open,
exit_price,
diff,
exit_time1
)
)
return trades, stats
# ===== 输出每笔交易详情 =====
logger.info("===== 每笔交易详情 =====")
n = n1 = 0 # n = 总盈利n1 = 总手续费
for date, direction, signal_name, entry, exit, diff, end_time in all_trades:
profit_amount = diff / entry * 10000 # 计算盈利金额
close_fee = 10000 / entry * exit * 0.0005 # 平仓手续费
logger.info(
f"{date} {direction}({signal_name}) 入场={entry:.2f} 出场={exit:.2f} 出场时间={end_time} "
f"差价={diff:.2f} 盈利={profit_amount:.2f} "
f"开仓手续费=5u 平仓手续费={close_fee:.2f}"
)
n1 += 5 + close_fee
n += profit_amount
# ===============================================================
# 🚀 启动主流程
# ===============================================================
print(f'一共笔数:{len(all_trades)}')
print(f"一共盈利:{n:.2f}")
print(f'一共手续费:{n1:.2f}')
if __name__ == '__main__':
dates = [f"2025-9-{i}" for i in range(1, 31)]
for i in range(1, 11):
for i1 in range(1, 51):
trades, stats = backtest(dates, tp=i1, sl=0 - i)
total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades)
total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades)
# logger.info("===== 每笔交易详情 =====")
# for t in trades:
# logger.info(f"{t['entry_time']} {t['direction']}({t['signal']}) "
# f"入场={t['entry']:.2f} 出场={t['exit']:.2f} 出场时间={t['exit_time']} "
# f"差价={t['diff']:.2f}")
# print(i1, i)
# print(f"\n一共交易笔数{len(trades)}")
# print(f"一共盈利:{total_profit:.2f}")
# print(f"一共手续费:{total_fee:.2f}")
# print(f"净利润:{total_profit - total_fee:.2f}")
if total_profit > total_fee * 0.1:
print(i1, i)
print(f"\n一共交易笔数:{len(trades)}")
print(f"一共盈利:{total_profit:.2f}")
print(f"一共手续费:{total_fee:.2f}")
print(f"净利润:{total_profit - total_fee * 0.1}")
print("\n===== 信号统计 =====")
# for k, v in stats.items():
# win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0
# print(f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")