From 90dcfb6deee7e6d9d42eaff5c1be70e729a6827c Mon Sep 17 00:00:00 2001 From: 27942 <2794236280@qq.com> Date: Mon, 13 Oct 2025 17:49:07 +0800 Subject: [PATCH] dededdew --- weex/读取数据库分析数据2.0.py | 328 ++++++++++++++++------------------ 1 file changed, 153 insertions(+), 175 deletions(-) diff --git a/weex/读取数据库分析数据2.0.py b/weex/读取数据库分析数据2.0.py index 3a3ca84..6ff0543 100644 --- a/weex/读取数据库分析数据2.0.py +++ b/weex/读取数据库分析数据2.0.py @@ -1,227 +1,205 @@ import datetime - from loguru import logger -from models.weex import Weex15 +from peewee import fn +from models.weex import Weex15, Weex1 -def is_bullish(candle): - """判断是否是阳线(开盘价 < 收盘价,即涨)""" - return float(candle['open']) < float(candle['close']) +# =============================================================== +# 📊 数据获取模块 +# =============================================================== + +def get_data_by_date(model, date_str): + """按天获取指定表的数据""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = (model + .select() + .where(model.id.between(start_ts, end_ts)) + .order_by(model.id.asc())) + + return [ + {'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} + for i in query + ] -def is_bearish(candle): - """判断是否是阴线(开盘价 > 收盘价,即跌)""" - return float(candle['open']) > float(candle['close']) +def get_future_data_1min(start_ts, end_ts): + """获取指定时间范围内的 1 分钟数据""" + query = (Weex1 + .select() + .where(Weex1.id.between(start_ts, end_ts)) + .order_by(Weex1.id.asc())) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + +# =============================================================== +# 📈 信号判定模块 +# =============================================================== + +def is_bullish(c): return float(c['open']) < float(c['close']) + + +def is_bearish(c): return float(c['open']) > float(c['close']) def check_signal(prev, curr): - """ - 判断是否出现包住形态,返回信号类型和方向: - 1. 前跌后涨包住 -> 做多信号 (bear_bull_engulf) - 2. 前涨后跌包住 -> 做空信号 (bull_bear_engulf) - 3. 前涨后涨包住 -> 做多信号 (bull_bull_engulf) - 4. 前跌后跌包住 -> 做空信号 (bear_bear_engulf) - """ - p_open, p_close = float(prev['open']), float(prev['close']) # 前一笔 - c_open, c_close = float(curr['open']), float(curr['close']) # 当前一笔 + """判断是否出现包住形态""" + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) - # 情况1:前跌后涨,且涨线包住前跌线 -> 做多信号 - if is_bullish(curr) and is_bearish(prev): - if c_open <= p_close and c_close >= p_open: - return "long", "bear_bull_engulf" + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" - # 情况2:前涨后跌,且跌线包住前涨线 -> 做空信号 - if is_bearish(curr) and is_bullish(prev): - if c_open >= p_close and c_close <= p_open: - return "short", "bull_bear_engulf" - - # # 情况3:前涨后涨,且后涨线包住前涨线 -> 做多信号 - # if is_bullish(curr) and is_bullish(prev): - # if c_open <= p_open and c_close >= p_close: - # return "long", "bull_bull_engulf" - # - # # 情况4:前跌后跌,且后跌线包住前跌线 -> 做空信号 - # if is_bearish(curr) and is_bearish(prev): - # if c_open >= p_open and c_close <= p_close: - # return "short", "bear_bear_engulf" + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" return None, None -def simulate_trade(direction, entry_price, future_candles, take_profit_diff=30, stop_loss_diff=-10): +# =============================================================== +# 💹 回测模拟模块(使用 1 分钟数据) +# =============================================================== + +def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl=-1): """ - 模拟交易(逐根K线回测) - 改进版:考虑开盘跳空触发止盈/止损的情况 + 用 1 分钟数据进行精细化止盈止损模拟 + entry_time: 当前信号的 entry candle id(毫秒时间戳) + next_15min_time: 下一个15min时间戳,用于界定止盈止损分析范围 """ + # 查 15 分钟之间的 1 分钟数据 + future_candles = get_future_data_1min(entry_time, next_15min_time) + if not future_candles: + return None, 0, None + + tp_price = entry_price + tp if direction == "long" else entry_price - tp + sl_price = entry_price + sl if direction == "long" else entry_price - sl + for candle in future_candles: - open_p = float(candle['open']) - high = float(candle['high']) - low = float(candle['low']) - close = float(candle['close']) + open_p, high, low = map(float, (candle['open'], candle['high'], candle['low'])) if direction == "long": - tp_price = entry_price + take_profit_diff - sl_price = entry_price + stop_loss_diff - - # 🧩 开盘就跳空止盈 - if open_p >= tp_price: + if open_p >= tp_price: # 开盘跳空止盈 return open_p, open_p - entry_price, candle['id'] - - # 🧩 开盘就跳空止损 - if open_p <= sl_price: + if open_p <= sl_price: # 开盘跳空止损 return open_p, open_p - entry_price, candle['id'] - - # 正常区间内触发止盈/止损 if high >= tp_price: - return tp_price, take_profit_diff, candle['id'] + return tp_price, tp, candle['id'] if low <= sl_price: - return sl_price, stop_loss_diff, candle['id'] + return sl_price, sl, candle['id'] - elif direction == "short": - tp_price = entry_price - take_profit_diff - sl_price = entry_price - stop_loss_diff - - # 🧩 开盘就跳空止盈 + else: # short if open_p <= tp_price: return open_p, entry_price - open_p, candle['id'] - - # 🧩 开盘就跳空止损 if open_p >= sl_price: return open_p, entry_price - open_p, candle['id'] - - # 正常区间内触发止盈/止损 if low <= tp_price: - return tp_price, take_profit_diff, candle['id'] + return tp_price, tp, candle['id'] if high >= sl_price: - return sl_price, stop_loss_diff, candle['id'] + return sl_price, sl, candle['id'] - # 未触发止盈止损,按最后收盘价平仓 - final_price = float(future_candles[-1]['close']) - if direction == "long": - diff_money = final_price - entry_price - else: - diff_money = entry_price - final_price - - return final_price, diff_money, future_candles[-1]['id'] + # 未触发止盈止损,用最后一根收盘价平仓 + final = future_candles[-1] + final_price = float(final['close']) + diff = (final_price - entry_price) if direction == "long" else (entry_price - final_price) + return final_price, diff, final['id'] -def get_data_by_date(date_str): - # 将日期字符串转换为 datetime 对象 - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - print("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] +# =============================================================== +# 📊 主回测流程 +# =============================================================== - # 计算该天的开始时间戳(毫秒级) - start_timestamp = int(target_date.timestamp() * 1000) - # 计算该天的结束时间戳(毫秒级),即下一天 00:00:00 的时间戳减去 1 毫秒 - end_timestamp = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 +def backtest(dates, tp=8, sl=-1): + all_data = [] + for date_str in dates: + all_data.extend(get_data_by_date(Weex15, date_str)) - # 查询该天的数据,并按照 id 字段从小到大排序 - query = Weex15.select().where(Weex15.id.between(start_timestamp, end_timestamp)).order_by(Weex15.id.asc()) - results = list(query) + all_data.sort(key=lambda x: x['id']) - # 将结果转换为列表嵌套字典的形式 - data_list = [] - for item in results: - item_dict = { - 'id': item.id, - 'open': item.open, - 'high': item.high, - 'low': item.low, - 'close': item.close - } - data_list.append(item_dict) - - return data_list - - -if __name__ == '__main__': - - # 示例调用 - datas = [] - for i in range(1, 31): - date_str = f'2025-8-{i}' - data = get_data_by_date(date_str) - - datas.extend(data) - - datas = sorted(datas, key=lambda x: x["id"]) - - zh_project = 0 # 累计盈亏 - all_trades = [] # 保存所有交易明细 - daily_signals = 0 # 信号总数 - daily_wins = 0 - daily_profit = 0 # 价差总和 - - # 四种信号类型的统计 - signal_stats = { + stats = { "bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"}, "bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"}, - # "bull_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包涨"}, - # "bear_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包跌"} } - # 遍历每根K线,寻找信号 - for idx in range(1, len(datas) - 1): # 留出未来K线 - prev, curr = datas[idx - 1], datas[idx] # 前一笔,当前一笔 - entry_candle = datas[idx + 1] # 下一根开盘价作为入场价 - future_candles = datas[idx + 1:] # 未来行情 + trades = [] - entry_open = float(entry_candle['open']) # 开仓价格 - direction, signal_type = check_signal(prev, curr) # 判断开仓方向和信号类型 + for idx in range(1, len(all_data) - 1): + prev, curr = all_data[idx - 1], all_data[idx] + entry_candle = all_data[idx + 1] - if direction and signal_type: - daily_signals += 1 + direction, signal = check_signal(prev, curr) + if not direction: + continue - exit_price, diff, exit_time = simulate_trade(direction, entry_open, future_candles, take_profit_diff=6, - stop_loss_diff=-2) + # 下一个 15 分钟K线的时间范围 + next_15min_time = all_data[idx + 1]['id'] if idx + 2 < len(all_data) else all_data[-1]['id'] - # 统计该信号类型的表现 - signal_stats[signal_type]["count"] += 1 - signal_stats[signal_type]["total_profit"] += diff - if diff > 0: - signal_stats[signal_type]["wins"] += 1 - daily_wins += 1 + entry_price = float(entry_candle['open']) + exit_price, diff, exit_time = simulate_trade(direction, entry_price, entry_candle['id'], next_15min_time, tp=tp, + sl=sl) - daily_profit += diff + if exit_price is None: + continue - # 将时间戳转换为本地时间 - local_time = datetime.datetime.fromtimestamp(entry_candle['id'] / 1000) - formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S") + stats[signal]['count'] += 1 + stats[signal]['total_profit'] += diff + if diff > 0: + stats[signal]['wins'] += 1 - exit_time = datetime.datetime.fromtimestamp(exit_time / 1000) - exit_time1 = exit_time.strftime("%Y-%m-%d %H:%M:%S") + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(entry_candle['id'] / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal": stats[signal]['name'], + "direction": "做多" if direction == "long" else "做空", + "entry": entry_price, + "exit": exit_price, + "diff": diff + }) - # 保存交易详情 - all_trades.append( - ( - f"{formatted_time}号", - "做多" if direction == "long" else "做空", - signal_stats[signal_type]["name"], - entry_open, - exit_price, - diff, - exit_time1 - ) - ) + return trades, stats - # ===== 输出每笔交易详情 ===== - logger.info("===== 每笔交易详情 =====") - n = n1 = 0 # n = 总盈利,n1 = 总手续费 - for date, direction, signal_name, entry, exit, diff, end_time in all_trades: - profit_amount = diff / entry * 10000 # 计算盈利金额 - close_fee = 10000 / entry * exit * 0.0005 # 平仓手续费 - logger.info( - f"{date} {direction}({signal_name}) 入场={entry:.2f} 出场={exit:.2f} 出场时间={end_time} " - f"差价={diff:.2f} 盈利={profit_amount:.2f} " - f"开仓手续费=5u 平仓手续费={close_fee:.2f}" - ) - n1 += 5 + close_fee - n += profit_amount +# =============================================================== +# 🚀 启动主流程 +# =============================================================== - print(f'一共笔数:{len(all_trades)}') - print(f"一共盈利:{n:.2f}") - print(f'一共手续费:{n1:.2f}') +if __name__ == '__main__': + dates = [f"2025-9-{i}" for i in range(1, 31)] + for i in range(1, 11): + for i1 in range(1, 51): + trades, stats = backtest(dates, tp=i1, sl=0 - i) + + total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades) + total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades) + + # logger.info("===== 每笔交易详情 =====") + # for t in trades: + # logger.info(f"{t['entry_time']} {t['direction']}({t['signal']}) " + # f"入场={t['entry']:.2f} 出场={t['exit']:.2f} 出场时间={t['exit_time']} " + # f"差价={t['diff']:.2f}") + + # print(i1, i) + # print(f"\n一共交易笔数:{len(trades)}") + # print(f"一共盈利:{total_profit:.2f}") + # print(f"一共手续费:{total_fee:.2f}") + # print(f"净利润:{total_profit - total_fee:.2f}") + + if total_profit > total_fee * 0.1: + print(i1, i) + print(f"\n一共交易笔数:{len(trades)}") + print(f"一共盈利:{total_profit:.2f}") + print(f"一共手续费:{total_fee:.2f}") + print(f"净利润:{total_profit - total_fee * 0.1}") + + print("\n===== 信号统计 =====") + # for k, v in stats.items(): + # win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0 + # print(f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")