This commit is contained in:
27942
2025-10-14 15:59:27 +08:00
parent 03cc1cf986
commit aab552d7db
2 changed files with 137 additions and 115 deletions

View File

@@ -1,7 +1,7 @@
import datetime
from loguru import logger
from models.weex import Weex15
from models.weex import Weex15, Weex1
def is_bullish(candle):
@@ -30,16 +30,16 @@ def check_signal(prev, curr):
if c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 情况2前涨后跌且跌线包住前涨线 -> 做空信号
if is_bearish(curr) and is_bullish(prev):
if c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
# # 情况2前涨后跌且跌线包住前涨线 -> 做空信号
# if is_bearish(curr) and is_bullish(prev):
# if c_open >= p_close and c_close <= p_open:
# return "short", "bull_bear_engulf"
# # 情况3前涨后涨且后涨线包住前涨线 -> 做多信号
# if is_bullish(curr) and is_bullish(prev):
# if c_open <= p_open and c_close >= p_close:
# return "long", "bull_bull_engulf"
#
# # 情况4前跌后跌且后跌线包住前跌线 -> 做空信号
# if is_bearish(curr) and is_bearish(prev):
# if c_open >= p_open and c_close <= p_close:
@@ -119,7 +119,7 @@ def get_data_by_date(date_str):
end_timestamp = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
# 查询该天的数据,并按照 id 字段从小到大排序
query = Weex15.select().where(Weex15.id.between(start_timestamp, end_timestamp)).order_by(Weex15.id.asc())
query = Weex1.select().where(Weex1.id.between(start_timestamp, end_timestamp)).order_by(Weex1.id.asc())
results = list(query)
# 将结果转换为列表嵌套字典的形式
@@ -142,86 +142,96 @@ if __name__ == '__main__':
# 示例调用
datas = []
for i in range(1, 31):
date_str = f'2025-6-{i}'
date_str = f'2025-9-{i}'
data = get_data_by_date(date_str)
datas.extend(data)
datas = sorted(datas, key=lambda x: x["id"])
zh_project = 0 # 累计盈亏
all_trades = [] # 保存所有交易明细
daily_signals = 0 # 信号总数
daily_wins = 0
daily_profit = 0 # 价差总和
for i in range(1, 11):
for i1 in range(1, 51):
# 四种信号类型的统计
signal_stats = {
"bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"},
"bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"},
# "bull_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包涨"},
# "bear_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包跌"}
}
zh_project = 0 # 累计盈亏
all_trades = [] # 保存所有交易明细
daily_signals = 0 # 信号总数
daily_wins = 0
daily_profit = 0 # 价差总和
# 遍历每根K线寻找信号
for idx in range(1, len(datas) - 1): # 留出未来K线
prev, curr = datas[idx - 1], datas[idx] # 前一笔,当前一笔
entry_candle = datas[idx + 1] # 下一根开盘价作为入场价
future_candles = datas[idx + 1:] # 未来行情
# 四种信号类型的统计
signal_stats = {
"bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"},
"bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"},
# "bull_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包涨"},
# "bear_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包跌"}
}
entry_open = float(entry_candle['open']) # 开仓价格
direction, signal_type = check_signal(prev, curr) # 判断开仓方向和信号类型
# 遍历每根K线寻找信号
for idx in range(1, len(datas) - 1): # 留出未来K线
prev, curr = datas[idx - 1], datas[idx] # 前一笔,当前一笔
entry_candle = datas[idx + 1] # 下一根开盘价作为入场价
future_candles = datas[idx + 1:] # 未来行情
if direction and signal_type:
daily_signals += 1
entry_open = float(entry_candle['open']) # 开仓价格
direction, signal_type = check_signal(prev, curr) # 判断开仓方向和信号类型
exit_price, diff, exit_time = simulate_trade(direction, entry_open, future_candles, take_profit_diff=6,
stop_loss_diff=-2)
if direction and signal_type:
daily_signals += 1
# 统计该信号类型的表现
signal_stats[signal_type]["count"] += 1
signal_stats[signal_type]["total_profit"] += diff
if diff > 0:
signal_stats[signal_type]["wins"] += 1
daily_wins += 1
exit_price, diff, exit_time = simulate_trade(
direction,
entry_open,
future_candles,
take_profit_diff=i1,
stop_loss_diff=-i
)
daily_profit += diff
# 统计该信号类型的表现
signal_stats[signal_type]["count"] += 1
signal_stats[signal_type]["total_profit"] += diff
if diff > 0:
signal_stats[signal_type]["wins"] += 1
daily_wins += 1
# 将时间戳转换为本地时间
local_time = datetime.datetime.fromtimestamp(entry_candle['id'] / 1000)
formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S")
daily_profit += diff
exit_time = datetime.datetime.fromtimestamp(exit_time / 1000)
exit_time1 = exit_time.strftime("%Y-%m-%d %H:%M:%S")
# 将时间戳转换为本地时间
local_time = datetime.datetime.fromtimestamp(entry_candle['id'] / 1000)
formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S")
# 保存交易详情
all_trades.append(
(
f"{formatted_time}",
"做多" if direction == "long" else "做空",
signal_stats[signal_type]["name"],
entry_open,
exit_price,
diff,
exit_time1
)
)
exit_time = datetime.datetime.fromtimestamp(exit_time / 1000)
exit_time1 = exit_time.strftime("%Y-%m-%d %H:%M:%S")
# ===== 输出每笔交易详情 =====
logger.info("===== 每笔交易详情 =====")
n = n1 = 0 # n = 总盈利n1 = 总手续费
for date, direction, signal_name, entry, exit, diff, end_time in all_trades:
profit_amount = diff / entry * 10000 # 计算盈利金额
close_fee = 10000 / entry * exit * 0.0005 # 平仓手续费
# 保存交易详情
all_trades.append(
(
f"{formatted_time}",
"做多" if direction == "long" else "做空",
signal_stats[signal_type]["name"],
entry_open,
exit_price,
diff,
exit_time1
)
)
logger.info(
f"{date} {direction}({signal_name}) 入场={entry:.2f} 出场={exit:.2f} 出场时间={end_time} "
f"差价={diff:.2f} 盈利={profit_amount:.2f} "
f"开仓手续费=5u 平仓手续费={close_fee:.2f}"
)
n1 += 5 + close_fee
n += profit_amount
# ===== 输出每笔交易详情 =====
logger.info("===== 每笔交易详情 =====")
logger.info(f"{i},{i1}")
n = n1 = 0 # n = 总盈利n1 = 总手续费
for date, direction, signal_name, entry, exit, diff, end_time in all_trades:
profit_amount = diff / entry * 10000 # 计算盈利金额
close_fee = 10000 / entry * exit * 0.0005 # 平仓手续费
print(f'一共笔数:{len(all_trades)}')
print(f"一共盈利:{n:.2f}")
print(f'一共手续费:{n1:.2f}')
# logger.info(
# f"{date} {direction}({signal_name}) 入场={entry:.2f} 出场={exit:.2f} 出场时间={end_time} "
# f"差价={diff:.2f} 盈利={profit_amount:.2f} "
# f"开仓手续费=5u 平仓手续费={close_fee:.2f}"
# )
n1 += 5 + close_fee
n += profit_amount
if n > n1 * 0.1:
print(f'一共笔数:{len(all_trades)}')
print(f"一共盈利:{n:.2f}")
print(f'一共手续费:{n1:.2f}')

View File

@@ -131,9 +131,9 @@ def check_signal(prev, curr):
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# # 前涨后跌包住 -> 做空
# if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
# return "short", "bull_bear_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
return None, None
@@ -147,20 +147,26 @@ def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl
用 1 分钟数据进行精细化止盈止损模拟
entry_time: 当前信号的 entry candle id毫秒时间戳
next_15min_time: 下一个15min时间戳用于界定止盈止损分析范围
direction信号类型
entry_price开仓价格
entry_time开仓时间
next_15min_time15分钟未来行情
"""
# 查 15 分钟之间的 1 分钟数据
future_candles = get_future_data_1min(entry_time, next_15min_time)
if not future_candles:
return None, 0, None
tp_price = entry_price + tp if direction == "long" else entry_price - tp
sl_price = entry_price + sl if direction == "long" else entry_price - sl
tp_price = entry_price + tp if direction == "long" else entry_price - tp # 止盈价位
sl_price = entry_price + sl if direction == "long" else entry_price - sl # 止损价位
for candle in future_candles:
open_p, high, low = map(float, (candle['open'], candle['high'], candle['low']))
if direction == "long":
if open_p >= tp_price: # 开盘跳空止盈
if direction == "long": # long
if open_p >= tp_price: # 开盘跳空止盈 涨信号,
return open_p, open_p - entry_price, candle['id']
if open_p <= sl_price: # 开盘跳空止损
return open_p, open_p - entry_price, candle['id']
@@ -169,8 +175,8 @@ def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl
if low <= sl_price:
return sl_price, sl, candle['id']
else: # short
if open_p <= tp_price:
else: # short 跌信号
if open_p <= tp_price: #
return open_p, entry_price - open_p, candle['id']
if open_p >= sl_price:
return open_p, entry_price - open_p, candle['id']
@@ -190,7 +196,7 @@ def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl
# 📊 主回测流程
# ===============================================================
def backtest(dates, tp=8, sl=-1):
def backtest(dates, tp, sl):
"""
datas日期的列表
@@ -202,7 +208,7 @@ def backtest(dates, tp=8, sl=-1):
all_data = []
for date_str in dates:
all_data.extend(get_data_by_date(Weex15, date_str)) # 获取天的数据15分钟k线数据
all_data.extend(get_data_by_date(Weex15, date_str)) # 获取天的数据15分钟k线数据
all_data.sort(key=lambda x: x['id'])
@@ -215,18 +221,24 @@ def backtest(dates, tp=8, sl=-1):
for idx in range(1, len(all_data) - 1):
prev, curr = all_data[idx - 1], all_data[idx] # 前一笔,当前一笔
entry_candle = all_data[idx + 1] # 开仓
entry_candle = all_data[idx + 1] # 下一笔开仓k线
direction, signal = check_signal(prev, curr)
if not direction:
continue
# 下一个 15 分钟K线的时间范围
next_15min_time = all_data[idx + 1]['id'] if idx + 2 < len(all_data) else all_data[-1]['id']
next_15min_time = all_data[idx + 5]['id'] if idx + 5 < len(all_data) else all_data[-1]['id']
entry_price = float(entry_candle['open'])
exit_price, diff, exit_time = simulate_trade(direction, entry_price, entry_candle['id'], next_15min_time, tp=tp,
sl=sl)
entry_price = float(entry_candle['open']) # 开仓价格
exit_price, diff, exit_time = simulate_trade(
direction,
entry_price,
entry_candle['id'],
next_15min_time,
tp=tp,
sl=sl
)
if exit_price is None:
continue
@@ -254,36 +266,36 @@ def backtest(dates, tp=8, sl=-1):
# ===============================================================
if __name__ == '__main__':
dates = [f"2025-7-{i}" for i in range(1, 31)]
for i in range(1, 11):
for i1 in range(1, 51):
trades, stats = backtest(dates, tp=i1, sl=0 - i)
dates = [f"2025-9-{i}" for i in range(1, 31)]
total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades)
total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades)
trades, stats = backtest(dates, tp=10, sl=-2)
# logger.info("===== 每笔交易详情 =====")
# for t in trades:
# logger.info(f"{t['entry_time']} {t['direction']}({t['signal']}) "
# f"入场={t['entry']:.2f} 出场={t['exit']:.2f} 出场时间={t['exit_time']} "
# f"差价={t['diff']:.2f}")
#
# print(i1, i)
# print(f"\n一共交易笔数{len(trades)}")
# print(f"一共盈利:{total_profit:.2f}")
# print(f"一共手续费:{total_fee:.2f}")
# print(f"净利润:{total_profit - total_fee:.2f}")
logger.info("===== 每笔交易详情 =====")
for t in trades:
logger.info(
f"{t['entry_time']} {t['direction']}({t['signal']}) "
f"入场={t['entry']:.2f} 出场={t['exit']:.2f} 出场时间={t['exit_time']} "
f"差价={t['diff']:.2f}"
)
if total_profit > total_fee * 0.1:
print(i1, i)
print(f"\n一共交易笔数:{len(trades)}")
print(f"一共盈利:{total_profit:.2f}")
print(f"一共手续费:{total_fee:.2f}")
print(f"净利润:{total_profit - total_fee * 0.1}")
total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades)
total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades)
print("\n===== 信号统计 =====")
print(f"\n一共交易笔数:{len(trades)}")
print(f"一共盈利:{total_profit:.2f}")
print(f"一共手续费:{total_fee:.2f}")
print(f"净利润:{total_profit - total_fee:.2f}")
# for k, v in stats.items():
# win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0
# print(
# f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")
# if total_profit > total_fee * 0.1:
# print(i1, i)
# print(f"\n一共交易笔数{len(trades)}")
# print(f"一共盈利:{total_profit:.2f}")
# print(f"一共手续费:{total_fee:.2f}")
# print(f"净利润:{total_profit - total_fee * 0.1}")
#
# print("\n===== 信号统计 =====")
#
# for k, v in stats.items():
# win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0
# print(
# f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")