From 7c9ede446fa2e3bd5cf17b0b8f335e857fdfdb54 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Tue, 3 Feb 2026 17:51:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=20weex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitmart/回测-三分之一策略-精准版.py | 745 ------------------ bitmart/回测-三分之一策略.py | 764 ------------------- bitmart/回测数据-30分钟版.py | 418 ---------- bitmart/回测数据-三分之一策略-5分钟精准版.py | 418 ---------- bitmart/回测数据-五分之一策略-3分钟精准版.py | 418 ---------- bitmart/趋势策略.py | 382 ---------- 6 files changed, 3145 deletions(-) delete mode 100644 bitmart/回测-三分之一策略-精准版.py delete mode 100644 bitmart/回测-三分之一策略.py delete mode 100644 bitmart/回测数据-30分钟版.py delete mode 100644 bitmart/回测数据-三分之一策略-5分钟精准版.py delete mode 100644 bitmart/回测数据-五分之一策略-3分钟精准版.py delete mode 100644 bitmart/趋势策略.py diff --git a/bitmart/回测-三分之一策略-精准版.py b/bitmart/回测-三分之一策略-精准版.py deleted file mode 100644 index 6dc6166..0000000 --- a/bitmart/回测-三分之一策略-精准版.py +++ /dev/null @@ -1,745 +0,0 @@ -""" -BitMart 五分之一回归策略回测(精准版) -使用3分钟K线周期计算触发价格,1分钟K线判断触发顺序 - -========== 策略规则 ========== -1. 触发价格计算(基于有效的前一根K线,实体>=0.1): - - 做多触发价格 = 收盘价 + 实体/5(从收盘价往上涨1/5) - - 做空触发价格 = 收盘价 - 实体/5(从收盘价往下跌1/5) - -2. 信号触发条件: - - 当前K线最高价 >= 做多触发价格 → 做多信号 - - 当前K线最低价 <= 做空触发价格 → 做空信号 - -3. 执行逻辑: - - 做多时遇到做空信号 -> 平多并反手开空 - - 做空时遇到做多信号 -> 平空并反手开多 - - 同一根3分钟K线内只交易一次 - -4. 精准判断(使用1分钟K线): - - 当一根3分钟K线同时触及做多和做空价格时 - - 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发 - - 这样可以更精准地还原真实交易场景 -""" - -import datetime -import calendar -from pathlib import Path -from typing import List, Dict, Optional -from loguru import logger -from peewee import * - -# 数据库配置 -DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db' -db = SqliteDatabase(str(DB_PATH)) - - -# ========================= 数据库模型 ========================= - -class BitMartETH1m(Model): - """1分钟K线模型""" - id = BigIntegerField(primary_key=True) # 时间戳(毫秒级) - open = FloatField(null=True) - high = FloatField(null=True) - low = FloatField(null=True) - close = FloatField(null=True) - - class Meta: - database = db - table_name = 'bitmart_eth_1m' - - -class BitMartETH3m(Model): - """3分钟K线模型""" - id = BigIntegerField(primary_key=True) # 时间戳(毫秒级) - open = FloatField(null=True) - high = FloatField(null=True) - low = FloatField(null=True) - close = FloatField(null=True) - - class Meta: - database = db - table_name = 'bitmart_eth_3m' - - -# 连接数据库 -db.connect(reuse_if_open=True) - - -# ========================= 工具函数 ========================= - -def is_bullish(c): - """判断阳线""" - return float(c['close']) > float(c['open']) - - -def is_bearish(c): - """判断阴线""" - return float(c['close']) < float(c['open']) - - -def get_body_size(candle): - """计算K线实体大小(绝对值)""" - return abs(float(candle['open']) - float(candle['close'])) - - -def find_valid_prev_bar(all_data, current_idx, min_body_size=0.1): - """ - 从当前索引往前查找,直到找到实体>=min_body_size的K线 - 返回:(有效K线的索引, K线数据) 或 (None, None) - """ - if current_idx <= 0: - return None, None - - for i in range(current_idx - 1, -1, -1): - prev = all_data[i] - body_size = get_body_size(prev) - if body_size >= min_body_size: - return i, prev - - return None, None - - -def get_one_fifth_levels(prev): - """ - 计算前一根K线实体的 1/5 双向触发价格 - 返回:(做多触发价格, 做空触发价格) - - 基于收盘价计算(无论阴线阳线): - - 做多触发价格 = 收盘价 + 实体/5(从收盘价往上涨1/5实体) - - 做空触发价格 = 收盘价 - 实体/5(从收盘价往下跌1/5实体) - """ - p_open = float(prev['open']) - p_close = float(prev['close']) - - body = abs(p_open - p_close) - - if body < 0.001: # 十字星,忽略 - return None, None - - # 基于收盘价的双向触发价格 - long_trigger = p_close + body / 5 - short_trigger = p_close - body / 5 - - return long_trigger, short_trigger - - -def get_3m_data_by_date(date_str: str) -> List[Dict]: - """按天获取3分钟K线数据""" - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] - - start_ts = int(target_date.timestamp() * 1000) - end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - - query = BitMartETH3m.select().where( - BitMartETH3m.id.between(start_ts, end_ts) - ).order_by(BitMartETH3m.id.asc()) - - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - return data - - -def get_1m_data_by_range(start_ts: int, end_ts: int) -> List[Dict]: - """ - 获取指定时间范围内的1分钟K线数据 - :param start_ts: 开始时间戳(毫秒) - :param end_ts: 结束时间戳(毫秒) - :return: 1分钟K线数据列表 - """ - query = BitMartETH1m.select().where( - BitMartETH1m.id.between(start_ts, end_ts - 1) - ).order_by(BitMartETH1m.id.asc()) - - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - return data - - -def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]: - """ - 获取3分钟K线对应的3根1分钟K线 - :param bar_3m: 3分钟K线数据 - :return: 对应的1分钟K线数据列表(最多3根) - """ - start_ts = bar_3m['id'] - end_ts = start_ts + 3 * 60 * 1000 # 3分钟后 - return get_1m_data_by_range(start_ts, end_ts) - - -def determine_trigger_order_by_1m( - bars_1m: List[Dict], - long_trigger: float, - short_trigger: float -) -> str: - """ - 使用1分钟K线精确判断在3分钟周期内,是先触发做多还是做空 - - :param bars_1m: 3根1分钟K线数据 - :param long_trigger: 做多触发价格 - :param short_trigger: 做空触发价格 - :return: 'long', 'short', 或 None - """ - if not bars_1m: - return None - - for bar in bars_1m: - high = float(bar['high']) - low = float(bar['low']) - open_price = float(bar['open']) - - long_triggered = high >= long_trigger - short_triggered = low <= short_trigger - - # 如果只触发了一个方向 - if long_triggered and not short_triggered: - return 'long' - if short_triggered and not long_triggered: - return 'short' - - # 如果两个方向都触发了(在同一根1分钟K线内) - if long_triggered and short_triggered: - # 根据开盘价判断: - # 如果开盘价更接近做空触发价,说明先往下走,先触发做空 - # 如果开盘价更接近做多触发价,说明先往上走,先触发做多 - dist_to_long = abs(long_trigger - open_price) - dist_to_short = abs(short_trigger - open_price) - - if dist_to_short < dist_to_long: - return 'short' - else: - return 'long' - - return None - - -def check_reverse_signal_in_first_minute( - bars_1m: List[Dict], - long_trigger: float, - short_trigger: float, - current_direction: str -) -> bool: - """ - 检查反手信号是否在第一分钟触发 - - :param bars_1m: 3根1分钟K线数据 - :param long_trigger: 做多触发价格 - :param short_trigger: 做空触发价格 - :param current_direction: 当前持仓方向 ('long' 或 'short') - :return: True 表示反手信号在第一分钟触发,False 表示不是 - """ - if not bars_1m: - return False - - # 只检查第一分钟K线 - first_bar = bars_1m[0] - high = float(first_bar['high']) - low = float(first_bar['low']) - - # 如果当前是多仓,检查空信号是否在第一分钟触发 - if current_direction == 'long': - return low <= short_trigger - - # 如果当前是空仓,检查多信号是否在第一分钟触发 - if current_direction == 'short': - return high >= long_trigger - - return False - - -def get_body_percent(candle) -> float: - """ - 计算K线实体占价格的百分比 - :param candle: K线数据 - :return: 实体百分比(如0.1表示0.1%) - """ - body = abs(float(candle['open']) - float(candle['close'])) - price = (float(candle['open']) + float(candle['close'])) / 2 - if price == 0: - return 0 - return (body / price) * 100 - - -def check_breakout_reverse_signal( - all_data_3m: List[Dict], - current_idx: int, - current_position_direction: str, - min_body_percent: float = 0.1 -) -> tuple: - """ - 检查"突破上一根K线高低点"的反手信号 - - 规则: - - 持多单时:当前K线跌破上一根K线最低点 → 反手开空 - 条件:上一根K线是阴线且实体>0.1%波动 - - 持空单时:当前K线涨破上一根K线最高点 → 反手开多 - 条件:上一根K线是阳线且实体>0.1%波动 - - :param all_data_3m: 3分钟K线数据 - :param current_idx: 当前K线索引 - :param current_position_direction: 当前持仓方向 ('long' 或 'short') - :param min_body_percent: 最小实体百分比(默认0.1%) - :return: (方向, 触发价格, 信号类型) 或 (None, None, None) - """ - if current_idx <= 0 or current_position_direction is None: - return None, None, None - - curr = all_data_3m[current_idx] - prev = all_data_3m[current_idx - 1] - - c_high = float(curr['high']) - c_low = float(curr['low']) - prev_high = float(prev['high']) - prev_low = float(prev['low']) - - # 计算上一根K线的实体百分比 - body_percent = get_body_percent(prev) - - # 持多单时:检查是否跌破上一根K线最低点 - if current_position_direction == 'long': - # 条件:上一根K线是阴线且实体>min_body_percent% - if is_bearish(prev) and body_percent >= min_body_percent: - if c_low < prev_low: - # 触发反手开空信号 - logger.debug(f"突破反手信号:持多单,当前K线低点{c_low:.2f}跌破上一根阴线低点{prev_low:.2f},实体{body_percent:.3f}%") - return 'short', prev_low, 'breakout' - - # 持空单时:检查是否涨破上一根K线最高点 - elif current_position_direction == 'short': - # 条件:上一根K线是阳线且实体>min_body_percent% - if is_bullish(prev) and body_percent >= min_body_percent: - if c_high > prev_high: - # 触发反手开多信号 - logger.debug(f"突破反手信号:持空单,当前K线高点{c_high:.2f}涨破上一根阳线高点{prev_high:.2f},实体{body_percent:.3f}%") - return 'long', prev_high, 'breakout' - - return None, None, None - - -def check_trigger_with_1m( - all_data_3m: List[Dict], - current_idx: int, - min_body_size: float = 0.1, - current_position_direction: str = None, - first_minute_only: bool = True -) -> tuple: - """ - 检查当前3分钟K线是否触发了交易信号 - 如果同时触发两个方向,使用1分钟K线精确判断顺序 - - 新增逻辑:如果有持仓且 first_minute_only=True,反手信号必须在第一分钟触发才有效 - - :param all_data_3m: 3分钟K线数据 - :param current_idx: 当前K线索引 - :param min_body_size: 最小实体大小 - :param current_position_direction: 当前持仓方向 ('long', 'short', 或 None) - :param first_minute_only: 是否只在第一分钟触发反手信号才有效 - - 返回:(方向, 触发价格, 有效前一根K线索引, 1分钟数据是否使用) - """ - if current_idx <= 0: - return None, None, None, False - - curr = all_data_3m[current_idx] - - # 查找实体>=min_body_size的前一根K线 - valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size) - - if prev is None: - return None, None, None, False - - long_trigger, short_trigger = get_one_fifth_levels(prev) - - if long_trigger is None: - return None, None, None, False - - c_high = float(curr['high']) - c_low = float(curr['low']) - - # 检测是否触发 - long_triggered = c_high >= long_trigger - short_triggered = c_low <= short_trigger - - # 如果两个方向都触发,使用1分钟K线精确判断 - if long_triggered and short_triggered: - bars_1m = get_1m_data_for_3m_bar(curr) - - if bars_1m: - direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger) - if direction: - trigger_price = long_trigger if direction == 'long' else short_trigger - - # 检查反手信号是否需要在第一分钟触发 - if first_minute_only and current_position_direction and direction != current_position_direction: - # 这是一个反手信号,检查是否在第一分钟触发 - if not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, current_position_direction): - # 反手信号不是在第一分钟触发,忽略 - logger.debug(f"反手信号 {direction} 不在第一分钟触发,忽略") - return None, None, None, False - - return direction, trigger_price, valid_prev_idx, True - - # 如果没有1分钟数据,使用开盘价距离判断 - c_open = float(curr['open']) - dist_to_long = abs(long_trigger - c_open) - dist_to_short = abs(short_trigger - c_open) - if dist_to_short <= dist_to_long: - return 'short', short_trigger, valid_prev_idx, False - else: - return 'long', long_trigger, valid_prev_idx, False - - if short_triggered: - # 检查是否是反手信号且需要第一分钟触发 - if first_minute_only and current_position_direction == 'long': - bars_1m = get_1m_data_for_3m_bar(curr) - if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'long'): - logger.debug(f"空信号不在第一分钟触发,忽略(当前持多仓)") - return None, None, None, False - return 'short', short_trigger, valid_prev_idx, False - - if long_triggered: - # 检查是否是反手信号且需要第一分钟触发 - if first_minute_only and current_position_direction == 'short': - bars_1m = get_1m_data_for_3m_bar(curr) - if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'short'): - logger.debug(f"多信号不在第一分钟触发,忽略(当前持空仓)") - return None, None, None, False - return 'long', long_trigger, valid_prev_idx, False - - return None, None, None, False - - -# ========================= 回测逻辑 ========================= - -def backtest_one_third_strategy( - dates: List[str], - min_body_size: float = 0.1, - first_minute_only: bool = True, - enable_breakout_reverse: bool = True, - breakout_min_body_percent: float = 0.1 -): - """ - 三分之一策略回测(精准版) - - :param dates: 日期列表 - :param min_body_size: 最小实体大小(绝对值) - :param first_minute_only: 是否只在第一分钟触发反手信号才有效(默认True) - :param enable_breakout_reverse: 是否启用"突破上一根K线高低点"反手信号(默认True) - :param breakout_min_body_percent: 突破反手信号的最小实体百分比(默认0.1%) - :return: (trades, stats) - """ - # 获取所有3分钟K线数据 - all_data: List[Dict] = [] - total_queried = 0 - for d in dates: - day_data = get_3m_data_by_date(d) - all_data.extend(day_data) - if day_data: - total_queried += len(day_data) - - logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据") - logger.info(f"反手信号仅第一分钟有效: {first_minute_only}") - logger.info(f"突破反手信号启用: {enable_breakout_reverse},最小实体百分比: {breakout_min_body_percent}%") - - if not all_data: - logger.warning("未获取到任何数据,请检查数据库") - return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}} - - # 按时间戳排序 - all_data.sort(key=lambda x: x['id']) - - # 验证排序结果 - if len(all_data) > 1: - first_ts = all_data[0]['id'] - last_ts = all_data[-1]['id'] - first_time = datetime.datetime.fromtimestamp(first_ts / 1000) - last_time = datetime.datetime.fromtimestamp(last_ts / 1000) - logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M:%S')} 到 {last_time.strftime('%Y-%m-%d %H:%M:%S')}") - - stats = { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - } - - trades: List[Dict] = [] - current_position: Optional[Dict] = None - - # 统计使用1分钟数据精准判断的次数 - precise_count = 0 - fallback_count = 0 - # 统计突破反手信号触发次数 - breakout_reverse_count = 0 - # 记录每根K线是否已经执行过突破反手(当前K线只执行一次) - last_breakout_bar_id = None - - idx = 1 - while idx < len(all_data): - curr = all_data[idx] - curr_bar_id = curr['id'] - - # 获取当前持仓方向(用于判断反手信号是否在第一分钟触发) - current_pos_dir = current_position['direction'] if current_position else None - - # 检测信号(使用1分钟K线精准判断,并考虑反手信号第一分钟限制) - direction, trigger_price, valid_prev_idx, used_1m = check_trigger_with_1m( - all_data, idx, min_body_size, - current_position_direction=current_pos_dir, - first_minute_only=first_minute_only - ) - - # 如果没有五分之一信号,且有持仓,检查突破反手信号 - signal_type = 'one_fifth' # 信号类型:one_fifth(五分之一)或 breakout(突破) - if direction is None and current_position is not None and enable_breakout_reverse: - # 检查当前K线是否已经执行过突破反手 - if last_breakout_bar_id != curr_bar_id: - breakout_dir, breakout_price, breakout_type = check_breakout_reverse_signal( - all_data, idx, current_pos_dir, breakout_min_body_percent - ) - if breakout_dir: - direction = breakout_dir - trigger_price = breakout_price - signal_type = 'breakout' - - if used_1m: - precise_count += 1 - elif direction and signal_type == 'one_fifth': - fallback_count += 1 - - # 无持仓 -> 开仓(只用五分之一信号开仓,突破信号不用于开仓) - if current_position is None: - if direction and signal_type == 'one_fifth': - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar_idx': idx - } - stats[direction]['count'] += 1 - - time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') - logger.debug(f"[{time_str}] 开仓{direction} @ {trigger_price:.2f}") - - idx += 1 - continue - - # 有持仓 -> 检查是否需要反向 - pos_dir = current_position['direction'] - - if direction and direction != pos_dir: - # 反向信号,平仓并反手 - exit_price = trigger_price - - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - # 记录信号类型 - signal_type_str = '突破' if signal_type == 'breakout' else '五分之一' - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - 'signal_type': signal_type_str - }) - - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - # 如果是突破反手信号,记录当前K线ID,防止重复执行 - if signal_type == 'breakout': - last_breakout_bar_id = curr_bar_id - breakout_reverse_count += 1 - - # 反手开仓 - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar_idx': idx - } - stats[direction]['count'] += 1 - - time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') - logger.debug(f"[{time_str}] 平{pos_dir}反手{direction}({signal_type_str}) @ {trigger_price:.2f} 盈亏={diff:.2f}") - - idx += 1 - - # 尾仓处理:最后一根K线收盘价平仓 - if current_position: - last = all_data[-1] - exit_price = float(last['close']) - pos_dir = current_position['direction'] - - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - logger.info(f"回测完成:使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count} 次,突破反手信号 {breakout_reverse_count} 次") - - return trades, stats - - -# ========================= 运行回测 ========================= - -if __name__ == '__main__': - # ==================== 配置参数 ==================== - START_DATE = "2025-01-01" - END_DATE = "2025-12-31" - MIN_BODY_SIZE = 0.1 # 最小实体大小(绝对值) - FIRST_MINUTE_ONLY = True # 反手信号仅在3分钟K线的第一分钟触发才有效 - - # 突破反手信号配置 - ENABLE_BREAKOUT_REVERSE = True # 是否启用"突破上一根K线高低点"反手信号 - BREAKOUT_MIN_BODY_PERCENT = 0.1 # 突破反手信号的最小实体百分比(0.1表示0.1%) - - # ==================== 生成查询日期列表 ==================== - dates = [] - - try: - start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') - end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') - - if start_dt > end_dt: - logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}") - exit(1) - - current_dt = start_dt - while current_dt <= end_dt: - dates.append(current_dt.strftime('%Y-%m-%d')) - current_dt += datetime.timedelta(days=1) - - logger.info(f"回测日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") - except ValueError as e: - logger.error(f"日期格式错误:{e}") - exit(1) - - # ==================== 执行回测 ==================== - trades, stats = backtest_one_third_strategy( - dates, - MIN_BODY_SIZE, - FIRST_MINUTE_ONLY, - ENABLE_BREAKOUT_REVERSE, - BREAKOUT_MIN_BODY_PERCENT - ) - - # ==================== 输出交易详情 ==================== - logger.info("===== 每笔交易详情 =====") - - # 参数设定 - contract_size = 10000 # 合约规模 - open_fee_fixed = 5 # 固定开仓手续费 - close_fee_rate = 0.0005 # 平仓手续费率 - - total_points_profit = 0 - total_money_profit = 0 - total_fee = 0 - - for t in trades: - entry = t['entry'] - exit_price = t['exit'] - direction = t['direction'] - - # 原始价差 - point_diff = t['diff'] - - # 金额盈利 - money_profit = point_diff / entry * contract_size - - # 手续费 - fee = open_fee_fixed + (contract_size / entry * exit_price * close_fee_rate) - - # 净利润 - net_profit = money_profit - fee - - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - - logger.info( - f"{t['entry_time']} {direction} " - f"入={entry:.2f} 出={exit_price:.2f} 差价={point_diff:.2f} " - f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" - ) - - # ==================== 汇总统计 ==================== - total_net_profit = total_money_profit - total_fee - - print(f"\n{'='*60}") - print(f"【BitMart 五分之一策略回测结果(3分钟K线 + 1分钟精准判断)】") - print(f"{'='*60}") - print(f"回测周期:{START_DATE} 到 {END_DATE}") - print(f"最小实体要求:{MIN_BODY_SIZE}") - print(f"反手信号仅第一分钟有效:{'是' if FIRST_MINUTE_ONLY else '否'}") - print(f"突破反手信号:{'启用' if ENABLE_BREAKOUT_REVERSE else '禁用'}(最小实体{BREAKOUT_MIN_BODY_PERCENT}%)") - print(f"{'='*60}") - print(f"总交易笔数:{len(trades)}") - print(f"总点差:{total_points_profit:.2f}") - print(f"总原始盈利(未扣费):{total_money_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") - print(f"总净利润:{total_net_profit:.2f}") - print(f"{'='*60}") - - print("\n===== 方向统计 =====") - for k, v in stats.items(): - name = v['name'] - count = v['count'] - wins = v['wins'] - total_p = v['total_profit'] - win_rate = (wins / count * 100) if count > 0 else 0.0 - avg_p = (total_p / count) if count > 0 else 0.0 - print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") - - # 保存交易记录到CSV - if trades: - import csv - csv_path = Path(__file__).parent / 'backtest_one_third_trades.csv' - with open(csv_path, 'w', newline='', encoding='utf-8') as f: - writer = csv.DictWriter(f, fieldnames=[ - 'entry_time', 'exit_time', 'direction', 'entry', 'exit', - 'point_diff', 'raw_profit', 'fee', 'net_profit', 'signal_type' - ]) - writer.writeheader() - for t in trades: - writer.writerow({ - 'entry_time': t['entry_time'], - 'exit_time': t['exit_time'], - 'direction': t['direction'], - 'entry': t['entry'], - 'exit': t['exit'], - 'point_diff': t['point_diff'], - 'raw_profit': t['raw_profit'], - 'fee': t['fee'], - 'net_profit': t['net_profit'], - 'signal_type': t.get('signal_type', '五分之一') - }) - print(f"\n交易记录已保存到:{csv_path}") diff --git a/bitmart/回测-三分之一策略.py b/bitmart/回测-三分之一策略.py deleted file mode 100644 index a03750a..0000000 --- a/bitmart/回测-三分之一策略.py +++ /dev/null @@ -1,764 +0,0 @@ -""" -量化交易回测系统 - 三分之一回归策略(双向触发版) - -========== 策略规则 ========== - -1. 触发价格计算(基于有效的前一根K线,实体>=0.1): - - 做多触发价格 = 收盘价 + 实体/3(从收盘价往上涨1/3) - - 做空触发价格 = 收盘价 - 实体/3(从收盘价往下跌1/3) - -2. 信号触发条件: - - 当前K线最高价 >= 做多触发价格 → 做多信号 - - 当前K线最低价 <= 做空触发价格 → 做空信号 - -3. 执行逻辑: - - 做多时遇到做空信号 -> 平多并反手开空 - - 做空时遇到做多信号 -> 平空并反手开多 - - 同一根K线内只交易一次,防止频繁反手 - -4. 实体过滤: - - 如果前一根K线的实体部分(|open - close|)< 0.1,继续往前查找 - - 直到找到实体>=0.1的K线,再用那根K线来计算触发价格 - -示例1(阳线): - 前一根K线:开盘3000,收盘3100(阳线,实体=100) - - 做多触发价格 = 3100 + 33 = 3133(继续上涨做多) - - 做空触发价格 = 3100 - 33 = 3067(回调做空) - -示例2(阴线): - 前一根K线:开盘3100,收盘3000(阴线,实体=100) - - 做多触发价格 = 3000 + 33 = 3033(反弹做多) - - 做空触发价格 = 3000 - 33 = 2967(继续下跌做空) -""" - -import datetime -import calendar -import os -from typing import List, Dict, Optional -from loguru import logger -import pandas as pd -import mplfinance as mpf -import matplotlib.pyplot as plt -import matplotlib -try: - import plotly.graph_objects as go -except Exception: - go = None -from models.bitmart_klines import BitMartETH5M - -# 配置中文字体 -import matplotlib.font_manager as fm -import warnings - -# 忽略matplotlib的字体警告 -warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib.font_manager') -warnings.filterwarnings('ignore', message='.*Glyph.*missing.*', category=UserWarning) - -# 尝试设置中文字体,按优先级尝试 -chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi', 'FangSong', 'STSong', 'STHeiti'] -available_fonts = [f.name for f in fm.fontManager.ttflist] - -# 找到第一个可用的中文字体 -font_found = None -for font_name in chinese_fonts: - if font_name in available_fonts: - font_found = font_name - break - -if font_found: - plt.rcParams['font.sans-serif'] = [font_found] + ['DejaVu Sans'] - logger.info(f"使用中文字体: {font_found}") -else: - # 如果没有找到中文字体,尝试使用系统默认字体 - plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS', 'DejaVu Sans'] - logger.warning("未找到中文字体,使用默认配置") - -plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 -plt.rcParams['font.size'] = 10 # 设置默认字体大小 - -# 尝试清除字体缓存(如果可能) -try: - # 不强制重建,避免性能问题 - pass -except: - pass - -# 获取当前脚本所在目录 -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) - -# ========================= 工具函数 ========================= - -def is_bullish(c): # 阳线 - return float(c['close']) > float(c['open']) - - -def is_bearish(c): # 阴线 - return float(c['close']) < float(c['open']) - - -def get_body_size(candle): - """计算K线实体大小(绝对值)""" - return abs(float(candle['open']) - float(candle['close'])) - - -def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): - """ - 从当前索引往前查找,直到找到实体>=min_body_size的K线 - 返回:(有效K线的索引, K线数据) 或 (None, None) - """ - if current_idx <= 0: - return None, None - - for i in range(current_idx - 1, -1, -1): - prev = all_data[i] - body_size = get_body_size(prev) - if body_size >= min_body_size: - return i, prev - - return None, None - - -def get_one_third_levels(prev): - """ - 计算前一根K线实体的 1/3 双向触发价格 - 返回:(做多触发价格, 做空触发价格) - - 基于收盘价计算(无论阴线阳线): - - 做多触发价格 = 收盘价 + 实体/3(从收盘价往上涨1/3实体) - - 做空触发价格 = 收盘价 - 实体/3(从收盘价往下跌1/3实体) - - 示例: - 阳线 open=3000, close=3100, 实体=100 - - 做多触发 = 3100 + 33 = 3133(继续涨) - - 做空触发 = 3100 - 33 = 3067(回调) - - 阴线 open=3100, close=3000, 实体=100 - - 做多触发 = 3000 + 33 = 3033(反弹) - - 做空触发 = 3000 - 33 = 2967(继续跌) - """ - p_open = float(prev['open']) - p_close = float(prev['close']) - - body = abs(p_open - p_close) - - if body < 0.001: # 十字星,忽略 - return None, None - - # 基于收盘价的双向触发价格 - long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多 - short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空 - - return long_trigger, short_trigger - - -def check_trigger(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): - """ - 检查当前K线是否触发了交易信号(双向检测) - 返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None) - - 规则: - - 当前K线高点 >= 做多触发价格 → 做多信号 - - 当前K线低点 <= 做空触发价格 → 做空信号 - - 如果同时触发两个方向,以先触发的为准(这里简化为优先做空,因为下跌更快) - """ - if current_idx <= 0: - return None, None, None - - curr = all_data[current_idx] - - # 查找实体>=min_body_size的前一根K线 - valid_prev_idx, prev = find_valid_prev_bar(all_data, current_idx, min_body_size) - - if prev is None: - return None, None, None - - long_trigger, short_trigger = get_one_third_levels(prev) - - if long_trigger is None: - return None, None, None - - # 使用影线部分(high/low)来判断 - c_high = float(curr['high']) - c_low = float(curr['low']) - - # 检测是否触发 - long_triggered = c_high >= long_trigger - short_triggered = c_low <= short_trigger - - # 如果两个方向都触发,需要判断哪个先触发 - # 简化处理:比较触发价格距离开盘价的远近,更近的先触发 - if long_triggered and short_triggered: - c_open = float(curr['open']) - dist_to_long = abs(long_trigger - c_open) - dist_to_short = abs(short_trigger - c_open) - if dist_to_short <= dist_to_long: - return 'short', short_trigger, valid_prev_idx - else: - return 'long', long_trigger, valid_prev_idx - - if short_triggered: - return 'short', short_trigger, valid_prev_idx - - if long_triggered: - return 'long', long_trigger, valid_prev_idx - - return None, None, None - - -def get_data_by_date(model, date_str: str): - """按天获取指定表的数据""" - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] - - start_ts = int(target_date.timestamp() * 1000) - end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - - query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - - if data: - data.sort(key=lambda x: x['id']) - - return data - - -# ========================= 回测逻辑 ========================= - -def backtest_one_third_strategy(dates: List[str]): - """三分之一回归策略回测(优化版)""" - all_data: List[Dict] = [] - - for d in dates: - day_data = get_data_by_date(BitMartETH5M, d) - all_data.extend(day_data) - - logger.info(f"总共查询了 {len(dates)} 天,获取到 {len(all_data)} 条K线数据") - - if not all_data: - logger.warning("未获取到任何数据") - return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}} - - all_data.sort(key=lambda x: x['id']) - - if len(all_data) > 1: - first_time = datetime.datetime.fromtimestamp(all_data[0]['id'] / 1000) - last_time = datetime.datetime.fromtimestamp(all_data[-1]['id'] / 1000) - logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M')} 到 {last_time.strftime('%Y-%m-%d %H:%M')}") - - # 验证排序:打印前5条数据 - logger.info("===== 前5条数据(验证排序)=====") - for i in range(min(5, len(all_data))): - d = all_data[i] - t = datetime.datetime.fromtimestamp(d['id'] / 1000).strftime('%Y-%m-%d %H:%M:%S') - k_type = "阳线" if is_bullish(d) else ("阴线" if is_bearish(d) else "十字星") - logger.info(f" [{i}] {t} | {k_type} | O={d['open']:.2f} H={d['high']:.2f} L={d['low']:.2f} C={d['close']:.2f}") - - stats = { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - } - - # 额外统计信息 - extra_stats = { - 'same_dir_ignored': 0, # 同向信号被忽略次数 - 'no_signal_bars': 0, # 无信号K线数 - 'total_bars': len(all_data) - 1, # 总K线数(排除第一根) - } - - trades: List[Dict] = [] - current_position: Optional[Dict] = None - last_trade_bar: Optional[int] = None # 记录上次交易的K线索引,防止同一K线重复交易 - - for idx in range(1, len(all_data)): - curr = all_data[idx] - - # 使用check_trigger函数,它会自动查找实体>=0.1的前一根K线 - direction, trigger_price, valid_prev_idx = check_trigger(all_data, idx, min_body_size=0.1) - - # 获取有效的前一根K线用于日志输出 - valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None - - # 无信号时跳过 - if direction is None: - extra_stats['no_signal_bars'] += 1 - continue - - # 同一K线内已交易,跳过(与交易代码逻辑一致) - if last_trade_bar == idx: - continue - - # 空仓时,有信号就开仓 - if current_position is None: - if valid_prev is not None: - # 打印开仓时的K线信息 - prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M') - curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') - prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星") - curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星") - prev_body = get_body_size(valid_prev) - logger.info(f"【开仓】{direction} @ {trigger_price:.2f}") - logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}") - logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}") - - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx - } - stats[direction]['count'] += 1 - last_trade_bar = idx # 记录交易K线 - continue - - # 有仓位时,检查信号 - pos_dir = current_position['direction'] - - # 同向信号,忽略(与交易代码逻辑一致) - if direction == pos_dir: - extra_stats['same_dir_ignored'] += 1 - continue - - # 反向信号,平仓反手 - if valid_prev is not None: - exit_price = trigger_price - - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - # 打印平仓时的K线信息 - prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M') - curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') - prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星") - curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星") - prev_body = get_body_size(valid_prev) - logger.info(f"【平仓反手】{pos_dir} -> {direction} @ {exit_price:.2f}, 盈亏: {diff:.2f}") - logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}") - logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}") - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': curr['id'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - 'hold_bars': idx - current_position['entry_bar'] # 持仓K线数 - }) - - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - # 反手开仓 - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx - } - stats[direction]['count'] += 1 - last_trade_bar = idx # 记录交易K线 - - # 尾仓处理 - if current_position: - last = all_data[-1] - last_idx = len(all_data) - 1 - exit_price = float(last['close']) - pos_dir = current_position['direction'] - - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': last['id'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - 'hold_bars': last_idx - current_position['entry_bar'], # 持仓K线数 - 'is_tail': True # 标记为尾仓平仓 - }) - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - logger.info(f"【尾仓平仓】{pos_dir} @ {exit_price:.2f}, 盈亏: {diff:.2f}") - - # 打印额外统计信息 - logger.info(f"\n===== 信号统计 =====") - logger.info(f"总K线数: {extra_stats['total_bars']}") - logger.info(f"无信号K线: {extra_stats['no_signal_bars']} ({extra_stats['no_signal_bars']/extra_stats['total_bars']*100:.1f}%)") - logger.info(f"同向信号忽略: {extra_stats['same_dir_ignored']}") - - return trades, stats, all_data, extra_stats - - -# ========================= 绘图函数 ========================= -def plot_trades(all_data: List[Dict], trades: List[Dict], save_path: str = None): - """ - 绘制K线图并标注交易点位 - """ - if not all_data: - logger.warning("没有数据可绘制") - return - - # 转换为 DataFrame - df = pd.DataFrame(all_data) - df['datetime'] = pd.to_datetime(df['id'], unit='ms') - df.set_index('datetime', inplace=True) - df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'}) - df = df[['Open', 'High', 'Low', 'Close']] - - # 准备标记点 - buy_signals = [] # 做多开仓 - sell_signals = [] # 做空开仓 - buy_exits = [] # 做多平仓 - sell_exits = [] # 做空平仓 - - for trade in trades: - entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms') - exit_time = pd.to_datetime(trade['exit_time_ms'], unit='ms') - direction = trade['direction'] - entry_price = trade['entry'] - exit_price = trade['exit'] - - if direction == '做多': - buy_signals.append((entry_time, entry_price)) - buy_exits.append((exit_time, exit_price)) - else: - sell_signals.append((entry_time, entry_price)) - sell_exits.append((exit_time, exit_price)) - - # 创建标记序列 - buy_markers = pd.Series(index=df.index, dtype=float) - sell_markers = pd.Series(index=df.index, dtype=float) - buy_exit_markers = pd.Series(index=df.index, dtype=float) - sell_exit_markers = pd.Series(index=df.index, dtype=float) - - for t, p in buy_signals: - if t in buy_markers.index: - buy_markers[t] = p - for t, p in sell_signals: - if t in sell_markers.index: - sell_markers[t] = p - for t, p in buy_exits: - if t in buy_exit_markers.index: - buy_exit_markers[t] = p - for t, p in sell_exits: - if t in sell_exit_markers.index: - sell_exit_markers[t] = p - - # 添加标记 - add_plots = [] - - if buy_markers.notna().any(): - add_plots.append(mpf.make_addplot(buy_markers, type='scatter', markersize=100, - marker='^', color='green', label='做多开仓')) - if sell_markers.notna().any(): - add_plots.append(mpf.make_addplot(sell_markers, type='scatter', markersize=100, - marker='v', color='red', label='做空开仓')) - if buy_exit_markers.notna().any(): - add_plots.append(mpf.make_addplot(buy_exit_markers, type='scatter', markersize=80, - marker='x', color='darkgreen', label='做多平仓')) - if sell_exit_markers.notna().any(): - add_plots.append(mpf.make_addplot(sell_exit_markers, type='scatter', markersize=80, - marker='x', color='darkred', label='做空平仓')) - - # 绘制K线图(更接近交易所风格) - market_colors = mpf.make_marketcolors( - up='#26a69a', # 常见交易所绿色 - down='#ef5350', # 常见交易所红色 - edge='inherit', - wick='inherit', - volume='inherit' - ) - style = mpf.make_mpf_style( - base_mpf_style='binance', - marketcolors=market_colors, - gridstyle='-', - gridcolor='#e6e6e6' - ) - - fig, axes = mpf.plot( - df, - type='candle', - style=style, - title='三分之一回归策略回测', - ylabel='价格', - addplot=add_plots if add_plots else None, - figsize=(16, 9), - returnfig=True - ) - - # 添加图例 - axes[0].legend(['做多开仓 ▲', '做空开仓 ▼', '做多平仓 ✕', '做空平仓 ✕'], loc='upper left') - - # 标注开仓细节(方向、价格、时间) - if trades: - ax = axes[0] - max_annotate = 60 # 过多会拥挤,可按需调大/调小 - annotated = 0 - for i, trade in enumerate(trades): - if annotated >= max_annotate: - break - entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms') - if entry_time not in df.index: - continue - entry_price = trade['entry'] - direction = trade['direction'] - color = 'green' if direction == '做多' else 'red' - text = f"{direction} @ {entry_price:.2f}\n{entry_time.strftime('%m-%d %H:%M')}" - y_offset = 20 if (i % 2 == 0) else -30 - ax.annotate( - text, - xy=(entry_time, entry_price), - xytext=(0, y_offset), - textcoords='offset points', - ha='center', - va='bottom' if y_offset > 0 else 'top', - fontsize=8, - color=color, - arrowprops=dict(arrowstyle='->', color=color, lw=0.6, alpha=0.6) - ) - annotated += 1 - - if save_path: - plt.savefig(save_path, dpi=150, bbox_inches='tight') - logger.info(f"图表已保存到: {save_path}") - - plt.show() - - -def plot_trades_interactive(all_data: List[Dict], trades: List[Dict], html_path: str = None): - """ - 交互式K线图(TradingView风格:支持缩放、时间区间/价格区间平移缩放) - """ - if not all_data: - logger.warning("没有数据可绘制") - return - if go is None: - logger.warning("未安装 plotly,无法绘制交互式图。请先安装:pip install plotly") - return - - df = pd.DataFrame(all_data) - df['datetime'] = pd.to_datetime(df['id'], unit='ms') - df.sort_values('datetime', inplace=True) - - fig = go.Figure( - data=[ - go.Candlestick( - x=df['datetime'], - open=df['open'], - high=df['high'], - low=df['low'], - close=df['close'], - increasing_line_color='#26a69a', - decreasing_line_color='#ef5350', - name='K线' - ) - ] - ) - - # 标注开仓点 - if trades: - entry_x = [] - entry_y = [] - entry_text = [] - entry_color = [] - for t in trades: - entry_x.append(pd.to_datetime(t['entry_time_ms'], unit='ms')) - entry_y.append(t['entry']) - entry_text.append(f"{t['direction']} @ {t['entry']:.2f}
{t['entry_time']}") - entry_color.append('#26a69a' if t['direction'] == '做多' else '#ef5350') - - fig.add_trace( - go.Scatter( - x=entry_x, - y=entry_y, - mode='markers', - marker=dict(size=8, color=entry_color), - name='开仓', - text=entry_text, - hoverinfo='text' - ) - ) - - # TradingView风格:深色背景 + 交互缩放 - fig.update_layout( - title='三分之一回归策略回测(交互式)', - xaxis=dict( - rangeslider=dict(visible=True), - type='date', - showgrid=False - ), - yaxis=dict( - showgrid=False, - fixedrange=False - ), - plot_bgcolor='#0b0e11', - paper_bgcolor='#0b0e11', - font=dict(color='#d1d4dc'), - hovermode='x unified', - dragmode='zoom' - ) - - fig.show() - if html_path: - fig.write_html(html_path) - logger.info(f"交互图已保存到: {html_path}") - - -# ========================= 主程序 ========================= -if __name__ == '__main__': - # ==================== 配置参数 ==================== - START_DATE = "2025-01-01" - END_DATE = "2025-12-31" - - # ==================== 生成日期列表 ==================== - dates = [] - if START_DATE and END_DATE: - start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') - end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') - - current_dt = start_dt - while current_dt <= end_dt: - dates.append(current_dt.strftime('%Y-%m-%d')) - current_dt += datetime.timedelta(days=1) - - logger.info(f"查询日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") - - # ==================== 执行回测 ==================== - trades, stats, all_data, extra_stats = backtest_one_third_strategy(dates) - - # ==================== 输出结果 ==================== - logger.info("===== 每笔交易详情 =====") - - contract_size = 10000 - open_fee_fixed = 5 - close_fee_rate = 0.0005 - - total_points_profit = 0 - total_money_profit = 0 - total_fee = 0 - - for t in trades: - entry = t['entry'] - exit_p = t['exit'] - direction = t['direction'] - - point_diff = t['diff'] - money_profit = point_diff / entry * contract_size - fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate) - net_profit = money_profit - fee - - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - - logger.info( - f"{t['entry_time']} {direction} " - f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} " - f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f}" - ) - - # ==================== 汇总统计 ==================== - total_net_profit = total_money_profit - total_fee - - # 计算额外统计 - win_count = len([t for t in trades if t['diff'] > 0]) - lose_count = len([t for t in trades if t['diff'] <= 0]) - total_win_rate = (win_count / len(trades) * 100) if trades else 0 - - # 计算平均持仓K线数 - hold_bars_list = [t.get('hold_bars', 0) for t in trades if 'hold_bars' in t] - avg_hold_bars = sum(hold_bars_list) / len(hold_bars_list) if hold_bars_list else 0 - - # 计算最大连续亏损 - max_consecutive_loss = 0 - current_consecutive_loss = 0 - for t in trades: - if t['diff'] <= 0: - current_consecutive_loss += 1 - max_consecutive_loss = max(max_consecutive_loss, current_consecutive_loss) - else: - current_consecutive_loss = 0 - - # 计算最大回撤 - cumulative_profit = 0 - peak = 0 - max_drawdown = 0 - for t in trades: - cumulative_profit += t.get('net_profit', t['diff']) - peak = max(peak, cumulative_profit) - drawdown = peak - cumulative_profit - max_drawdown = max(max_drawdown, drawdown) - - print(f"\n{'='*60}") - print(f"【三分之一回归策略 回测结果】") - print(f"{'='*60}") - print(f"交易笔数:{len(trades)}") - print(f"盈利笔数:{win_count} 亏损笔数:{lose_count}") - print(f"总胜率:{total_win_rate:.2f}%") - print(f"平均持仓K线数:{avg_hold_bars:.1f}") - print(f"最大连续亏损:{max_consecutive_loss} 笔") - print(f"{'='*60}") - print(f"总点差:{total_points_profit:.2f}") - print(f"总原始盈利:{total_money_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") - print(f"总净利润:{total_net_profit:.2f}") - print(f"最大回撤:{max_drawdown:.2f}") - print(f"{'='*60}") - - print("\n===== 方向统计 =====") - for k, v in stats.items(): - count = v['count'] - wins = v['wins'] - total_p = v['total_profit'] - win_rate = (wins / count * 100) if count > 0 else 0.0 - avg_p = (total_p / count) if count > 0 else 0.0 - print(f"{v['name']}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") - - print("\n===== 信号统计 =====") - print(f"总K线数: {extra_stats['total_bars']}") - print(f"无信号K线: {extra_stats['no_signal_bars']} ({extra_stats['no_signal_bars']/extra_stats['total_bars']*100:.1f}%)") - print(f"同向信号忽略: {extra_stats['same_dir_ignored']}") - - # ==================== 绘制图表 ==================== - if trades and all_data: - # 如果数据太多,只绘制最近一部分(比如最近500根K线) - max_bars = 500 - if len(all_data) > max_bars: - logger.info(f"数据量较大({len(all_data)}条),只绘制最近 {max_bars} 根K线") - plot_data = all_data[-max_bars:] - # 过滤出在这个时间范围内的交易 - min_time = datetime.datetime.fromtimestamp(plot_data[0]['id'] / 1000) - plot_trades_filtered = [t for t in trades if t['entry_time'] >= min_time] - else: - plot_data = all_data - plot_trades_filtered = trades - - save_path = os.path.join(SCRIPT_DIR, '回测图表.png') - plot_trades(plot_data, plot_trades_filtered, save_path=save_path) - # 交互式版本(TradingView风格):支持时间区间/价格缩放 - html_path = os.path.join(SCRIPT_DIR, '回测图表_交互式.html') - plot_trades_interactive(plot_data, plot_trades_filtered, html_path=html_path) diff --git a/bitmart/回测数据-30分钟版.py b/bitmart/回测数据-30分钟版.py deleted file mode 100644 index 57e9e73..0000000 --- a/bitmart/回测数据-30分钟版.py +++ /dev/null @@ -1,418 +0,0 @@ -""" -量化交易回测系统 - 30分钟K线策略回测(BitMart数据源) - -========== 策略规则 ========== -重要:所有开仓和平仓操作都在下一根K线的开盘价执行 - -1. 开仓条件(信号出现时,下一根K线开盘价开仓): - - 阳包阴(涨包跌):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价 - -> 下一根K线开盘价开多 - - 阴包阳(跌包涨):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价 - -> 下一根K线开盘价开空 - -2. 平仓条件(所有平仓都在下一根K线开盘价执行): - - 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓 - - 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓 - - 遇到反向信号:下一根K线开盘价平仓并反手开仓 - -3. 续持条件: - - 遇到同向信号:续持 - - 未满足平仓条件:续持 -""" - -import datetime -import calendar -from dataclasses import dataclass -from typing import List, Dict, Optional -from loguru import logger -from models.bitmart import BitMart30 - - -# ========================= 工具函数 ========================= - -def is_bullish(c): # 阳线 - return float(c['close']) > float(c['open']) - - -def is_bearish(c): # 阴线 - return float(c['close']) < float(c['open']) - - -def check_signal(prev, curr): - """ - 包住形态信号判定(优化版): - 只看两种信号,严格按照收盘价与开盘价的比较: - - 1. 跌包涨(前涨后跌)-> 做空: - - 前一根是涨(阳线:close > open) - - 后一根是跌(阴线:close < open) - - 且:跌的收盘价 < 涨的开盘价(curr['close'] < prev['open']) - - 2. 涨包跌(前跌后涨)-> 做多: - - 前一根是跌(阴线:close < open) - - 后一根是涨(阳线:close > open) - - 且:涨的收盘价 > 跌的开盘价(curr['close'] > prev['open']) - """ - p_open = float(prev['open']) - c_close = float(curr['close']) - - # 跌包涨(前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价 - if is_bullish(prev) and is_bearish(curr) and c_close < p_open: - return "short", "bull_bear_engulf" - - # 涨包跌(前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价 - if is_bearish(prev) and is_bullish(curr) and c_close > p_open: - return "long", "bear_bull_engulf" - - return None, None - - -def get_data_by_date(model, date_str: str): - """ - 按天获取指定表的数据(30分钟K线) - 数据格式:时间戳(毫秒级) 开盘价 最高价 最低价 收盘价 - 例如:1767461400000 3106.68 3109.1 3106.22 3107.22 - - 注意:返回的数据已按时间戳(id)升序排序 - """ - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] - - # 将日期转换为毫秒级时间戳进行查询 - start_ts = int(target_date.timestamp() * 1000) - end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - - # 查询时按时间戳升序排序 - query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - - # 确保数据已排序 - if data: - data.sort(key=lambda x: x['id']) - - return data - - -# ========================= 回测逻辑 ========================= - -def backtest_15m_trend_optimized(dates: List[str]): - all_data: List[Dict] = [] - total_queried = 0 - for d in dates: - day_data = get_data_by_date(BitMart30, d) - all_data.extend(day_data) - if day_data: - total_queried += len(day_data) - - logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条K线数据") - - if not all_data: - logger.warning("未获取到任何数据,请检查:") - logger.warning("1. 数据库连接是否正常") - logger.warning("2. 查询的日期范围是否在数据范围内") - logger.warning("3. 时间戳格式是否正确(毫秒级)") - return [], { - 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, - 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, - } - - # 重要:合并所有数据后,必须先按时间戳(id)排序 - all_data.sort(key=lambda x: x['id']) - - # 验证排序结果 - if len(all_data) > 1: - first_ts = all_data[0]['id'] - last_ts = all_data[-1]['id'] - first_time = datetime.datetime.fromtimestamp(first_ts / 1000) - last_time = datetime.datetime.fromtimestamp(last_ts / 1000) - logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')} 到 {last_time.strftime('%Y-%m-%d %H:%M:%S')}") - - stats = { - 'bear_bull_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '涨包跌'}, - 'bull_bear_engulf': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '跌包涨'}, - } - - trades: List[Dict] = [] - current_position: Optional[Dict] = None # 开仓信息 - consecutive_opposite_count = 0 # 连续反色K线计数 - idx = 1 - - while idx < len(all_data) - 1: - prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] - direction, signal_key = check_signal(prev, curr) - - # 空仓 -> 碰到信号则开仓(下一根K线开盘价) - if current_position is None: - if direction: - entry_price = float(next_bar['open']) - current_position = { - 'direction': direction, - 'signal': stats[signal_key]['name'], - 'signal_key': signal_key, - 'entry_price': entry_price, - 'entry_time': next_bar['id'] - } - consecutive_opposite_count = 0 # 重置连续反色计数 - stats[signal_key]['count'] += 1 - idx += 1 - continue - - # 有仓位状态:检查平仓条件 - pos_dir = current_position['direction'] - pos_sig_key = current_position['signal_key'] - - # 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓 - if direction and direction != pos_dir: - exit_price = float(next_bar['open']) - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: stats[pos_sig_key]['wins'] += 1 - - # 反手开仓 - current_position = { - 'direction': direction, - 'signal': stats[signal_key]['name'], - 'signal_key': signal_key, - 'entry_price': exit_price, - 'entry_time': next_bar['id'] - } - consecutive_opposite_count = 0 # 重置连续反色计数 - stats[signal_key]['count'] += 1 - idx += 1 - continue - - # 2. 检查连续反色K线平仓条件(下一根K线开盘价平仓) - # 持有多单:检查是否连续两根阴线 - if pos_dir == 'long' and is_bearish(curr): - consecutive_opposite_count += 1 - # 如果已经连续两根阴线,下一根K线开盘价平仓 - if consecutive_opposite_count >= 2: - exit_price = float(next_bar['open']) - diff = exit_price - current_position['entry_price'] - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: stats[pos_sig_key]['wins'] += 1 - current_position = None - consecutive_opposite_count = 0 - idx += 1 - continue - else: - # 只有一根阴线,续持 - idx += 1 - continue - - # 持有空单:检查是否连续两根阳线 - elif pos_dir == 'short' and is_bullish(curr): - consecutive_opposite_count += 1 - # 如果已经连续两根阳线,下一根K线开盘价平仓 - if consecutive_opposite_count >= 2: - exit_price = float(next_bar['open']) - diff = current_position['entry_price'] - exit_price - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: stats[pos_sig_key]['wins'] += 1 - current_position = None - consecutive_opposite_count = 0 - idx += 1 - continue - else: - # 只有一根阳线,续持 - idx += 1 - continue - - # 3. 同向K线或同向信号 -> 续持,重置连续反色计数 - if (pos_dir == 'long' and is_bullish(curr)) or (pos_dir == 'short' and is_bearish(curr)): - consecutive_opposite_count = 0 # 重置连续反色计数 - - # 同向信号 -> 续持 - if direction and direction == pos_dir: - consecutive_opposite_count = 0 # 重置连续反色计数 - idx += 1 - continue - - idx += 1 - - # 尾仓:最后一根收盘价平仓 - if current_position: - last = all_data[-1] - exit_price = float(last['close']) - pos_dir = current_position['direction'] - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[current_position['signal_key']]['total_profit'] += diff - if diff > 0: stats[current_position['signal_key']]['wins'] += 1 - - return trades, stats - - -# ========================= 运行示例(优化版盈利计算) ========================= -if __name__ == '__main__': - # ==================== 配置参数:指定查询时间范围 ==================== - # 方式1:指定开始和结束日期(推荐) - START_DATE = "2025-01-01" # 开始日期,格式:YYYY-MM-DD - END_DATE = "2025-12-31" # 结束日期,格式:YYYY-MM-DD - - # 方式2:如果上面两个为空,则使用年份和月份范围 - START_YEAR = None # 开始年份,例如:2025 - START_MONTH = None # 开始月份,例如:1 - END_YEAR = None # 结束年份,例如:2025 - END_MONTH = None # 结束月份,例如:12 - - # ==================== 生成查询日期列表 ==================== - dates = [] - - # 优先使用指定的日期范围 - if START_DATE and END_DATE: - try: - start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') - end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') - - if start_dt > end_dt: - logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}") - exit(1) - - current_dt = start_dt - while current_dt <= end_dt: - dates.append(current_dt.strftime('%Y-%m-%d')) - current_dt += datetime.timedelta(days=1) - - logger.info(f"使用指定日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") - except ValueError as e: - logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式") - exit(1) - - # 如果未指定日期范围,使用年份和月份范围 - elif START_YEAR and END_YEAR: - start_m = START_MONTH if START_MONTH else 1 - end_m = END_MONTH if END_MONTH else 12 - - for year in range(START_YEAR, END_YEAR + 1): - month_start = start_m if year == START_YEAR else 1 - month_end = end_m if year == END_YEAR else 12 - - for month in range(month_start, month_end + 1): - days_in_month = calendar.monthrange(year, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"{year}-{month:02d}-{day:02d}") - - logger.info(f"使用年份月份范围:{START_YEAR}年{start_m}月 到 {END_YEAR}年{end_m}月,共 {len(dates)} 天") - - # 如果都没有指定,使用默认范围 - else: - logger.warning("未指定日期范围,使用默认:2025年1-12月") - for month in range(1, 13): - days_in_month = calendar.monthrange(2025, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"2025-{month:02d}-{day:02d}") - - if dates: - logger.info(f"准备查询 {len(dates)} 天的数据,日期范围:{dates[0]} 到 {dates[-1]}") - else: - logger.error("未生成任何查询日期,请检查配置参数") - exit(1) - - trades, stats = backtest_15m_trend_optimized(dates) - - logger.info("===== 每笔交易详情 =====") - - # === 参数设定 === - contract_size = 10000 # 合约规模(1手对应多少基础货币) - open_fee_fixed = 5 # 固定开仓手续费 - close_fee_rate = 0.0005 # 按成交额比例的平仓手续费率 - - total_points_profit = 0 # 累计点差 - total_money_profit = 0 # 累计金额盈利 - total_fee = 0 # 累计手续费 - - for t in trades: - entry = t['entry'] - exit = t['exit'] - direction = t['direction'] - - # === 1️⃣ 原始价差(点差) === - point_diff = (exit - entry) if direction == '做多' else (entry - exit) - - # === 2️⃣ 金额盈利(考虑合约规模) === - money_profit = point_diff / entry * contract_size # 利润以基础货币计(例如USD) - - # === 3️⃣ 手续费计算 === - # 开仓 + 平仓手续费(按比例计算 + 固定) - fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) - - # === 4️⃣ 净利润 === - net_profit = money_profit - fee - - # 保存计算结果 - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - - # if net_profit < -400: - logger.info( - f"{t['entry_time']} {direction}({t['signal']}) " - f"入={entry:.2f} 出={exit:.2f} 差价={point_diff:.2f} " - f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" - ) - - # === 汇总统计 === - total_net_profit = total_money_profit - total_fee - print(f"\n【BitMart 回测结果】") - print(f"一共交易笔数:{len(trades)}") - print(f"总点差:{total_points_profit:.2f}") - print(f"总原始盈利(未扣费):{total_money_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") - print(f"总净利润:{total_net_profit:.2f}\n") - - print(total_money_profit - total_fee * 0.1) - - print("===== 信号统计 =====") - for k, v in stats.items(): - name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] - win_rate = (wins / count * 100) if count > 0 else 0.0 - avg_p = (total_p / count) if count > 0 else 0.0 - print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") diff --git a/bitmart/回测数据-三分之一策略-5分钟精准版.py b/bitmart/回测数据-三分之一策略-5分钟精准版.py deleted file mode 100644 index 57acdf9..0000000 --- a/bitmart/回测数据-三分之一策略-5分钟精准版.py +++ /dev/null @@ -1,418 +0,0 @@ -""" -量化交易回测系统 - 三分之一策略(5分钟K线 + 1分钟精准判断) - -========== 策略规则(与 交易/bitmart-三分之一策略交易.py 一致)========== -1. 触发价格计算(基于有效的前一根5分钟K线,实体>=0.1): - - 做多触发价格 = 收盘价 + 实体/3(从收盘价往上涨1/3) - - 做空触发价格 = 收盘价 - 实体/3(从收盘价往下跌1/3) - -2. 信号触发条件: - - 当前5分钟K线最高价 >= 做多触发价格 → 做多信号 - - 当前5分钟K线最低价 <= 做空触发价格 → 做空信号 - -3. 执行逻辑: - - 做多时遇到做空信号 -> 平多并反手开空 - - 做空时遇到做多信号 -> 平空并反手开多 - - 同一根5分钟K线内只交易一次 - -4. 精准判断(使用1分钟K线): - - 当一根5分钟K线同时触及做多和做空价格时 - - 使用该5分钟K线对应的5根1分钟K线来判断哪个方向先被触发 - - 使回测更贴近真实成交顺序 -""" - -import datetime -import calendar -from typing import List, Dict, Optional -from loguru import logger -from models.bitmart_klines import BitMartETH5M, BitMartETH1M - - -# ========================= 工具函数 ========================= - -def is_bullish(c): - """阳线""" - return float(c['close']) > float(c['open']) - - -def is_bearish(c): - """阴线""" - return float(c['close']) < float(c['open']) - - -def get_body_size(candle): - """K线实体大小(绝对值)""" - return abs(float(candle['open']) - float(candle['close'])) - - -def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): - """ - 从当前索引往前查找,直到找到实体>=min_body_size的K线 - 返回:(有效K线的索引, K线数据) 或 (None, None) - """ - if current_idx <= 0: - return None, None - for i in range(current_idx - 1, -1, -1): - prev = all_data[i] - if get_body_size(prev) >= min_body_size: - return i, prev - return None, None - - -def get_one_third_levels(prev: Dict): - """ - 计算前一根K线实体的 1/3 双向触发价格 - 返回:(做多触发价格, 做空触发价格) - """ - p_open = float(prev['open']) - p_close = float(prev['close']) - body = abs(p_open - p_close) - if body < 0.001: - return None, None - long_trigger = p_close + body / 3 - short_trigger = p_close - body / 3 - return long_trigger, short_trigger - - -def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]: - """ - 获取指定时间范围内的1分钟K线数据(毫秒时间戳) - """ - query = BitMartETH1M.select().where( - BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1) - ).order_by(BitMartETH1M.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - if data: - data.sort(key=lambda x: x['id']) - return data - - -def get_1m_data_for_5m_bar(bar_5m: Dict) -> List[Dict]: - """获取一根5分钟K线对应的5根1分钟K线""" - start_ts = bar_5m['id'] - end_ts = start_ts + 5 * 60 * 1000 - return get_1m_data_by_range(start_ts, end_ts) - - -def determine_trigger_order_by_1m( - bars_1m: List[Dict], - long_trigger: float, - short_trigger: float -) -> Optional[str]: - """ - 使用1分钟K线判断在一根5分钟周期内,先触发做多还是做空。 - 按时间顺序遍历每根1分钟K线,先触及哪个方向则返回该方向; - 若同一根1分钟K线内两个方向都触及,用开盘价距离判断。 - """ - if not bars_1m: - return None - for bar in bars_1m: - high = float(bar['high']) - low = float(bar['low']) - open_price = float(bar['open']) - long_triggered = high >= long_trigger - short_triggered = low <= short_trigger - if long_triggered and not short_triggered: - return 'long' - if short_triggered and not long_triggered: - return 'short' - if long_triggered and short_triggered: - dist_to_long = abs(long_trigger - open_price) - dist_to_short = abs(short_trigger - open_price) - return 'short' if dist_to_short <= dist_to_long else 'long' - return None - - -def check_trigger_with_1m( - all_data_5m: List[Dict], - current_idx: int, - min_body_size: float = 0.1 -) -> tuple: - """ - 检查当前5分钟K线是否触发交易信号。 - 若同时触发多空,则用该5分钟内的1分钟K线判断先后顺序。 - 返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触) - """ - if current_idx <= 0: - return None, None, None, False, False - - curr = all_data_5m[current_idx] - valid_prev_idx, prev = find_valid_prev_bar(all_data_5m, current_idx, min_body_size) - if prev is None: - return None, None, None, False, False - - long_trigger, short_trigger = get_one_third_levels(prev) - if long_trigger is None: - return None, None, None, False, False - - c_high = float(curr['high']) - c_low = float(curr['low']) - long_triggered = c_high >= long_trigger - short_triggered = c_low <= short_trigger - both_triggered = long_triggered and short_triggered - - if both_triggered: - bars_1m = get_1m_data_for_5m_bar(curr) - if bars_1m: - direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger) - if direction: - trigger_price = long_trigger if direction == 'long' else short_trigger - return direction, trigger_price, valid_prev_idx, True, True - c_open = float(curr['open']) - dist_to_long = abs(long_trigger - c_open) - dist_to_short = abs(short_trigger - c_open) - if dist_to_short <= dist_to_long: - return 'short', short_trigger, valid_prev_idx, False, True - return 'long', long_trigger, valid_prev_idx, False, True - - if short_triggered: - return 'short', short_trigger, valid_prev_idx, False, False - if long_triggered: - return 'long', long_trigger, valid_prev_idx, False, False - return None, None, None, False, False - - -def get_data_by_date(model, date_str: str) -> List[Dict]: - """ - 按天获取指定表的K线数据(毫秒时间戳)。 - 返回格式:id(ms), open, high, low, close,已按 id 升序。 - """ - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] - start_ts = int(target_date.timestamp() * 1000) - end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - if data: - data.sort(key=lambda x: x['id']) - return data - - -# ========================= 回测逻辑 ========================= - -def backtest_one_third_precise(dates: List[str], min_body_size: float = 0.1): - """ - 三分之一策略回测(5分钟K线 + 1分钟精准判断)。 - 风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。 - """ - all_data: List[Dict] = [] - total_queried = 0 - for d in dates: - day_data = get_data_by_date(BitMartETH5M, d) - all_data.extend(day_data) - if day_data: - total_queried += len(day_data) - - logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条5分钟K线数据") - - if not all_data: - logger.warning("未获取到任何数据,请检查:") - logger.warning("1. 数据库连接与 bitmart_eth_5m / bitmart_eth_1m 表是否存在") - logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 5 分钟数据") - return [], { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - }, {'precise_1m_count': 0, 'fallback_count': 0} - - all_data.sort(key=lambda x: x['id']) - if len(all_data) > 1: - first_ts = all_data[0]['id'] - last_ts = all_data[-1]['id'] - first_time = datetime.datetime.fromtimestamp(first_ts / 1000) - last_time = datetime.datetime.fromtimestamp(last_ts / 1000) - logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')} 到 {last_time.strftime('%Y-%m-%d %H:%M:%S')}") - - stats = { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - } - extra = {'precise_1m_count': 0, 'fallback_count': 0} - - trades: List[Dict] = [] - current_position: Optional[Dict] = None - last_trade_bar: Optional[int] = None - - for idx in range(1, len(all_data)): - curr = all_data[idx] - direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size) - valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None - - if used_1m: - extra['precise_1m_count'] += 1 - elif both_triggered and direction: - extra['fallback_count'] += 1 - - if direction is None: - continue - if last_trade_bar == idx: - continue - - if current_position is None: - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx, - } - stats[direction]['count'] += 1 - last_trade_bar = idx - continue - - pos_dir = current_position['direction'] - if direction == pos_dir: - continue - - exit_price = trigger_price - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': curr['id'], - 'signal': '三分之一', - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - }) - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx, - } - stats[direction]['count'] += 1 - last_trade_bar = idx - - if current_position: - last = all_data[-1] - exit_price = float(last['close']) - pos_dir = current_position['direction'] - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': last['id'], - 'signal': '三分之一', - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - }) - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - logger.info(f"回测完成:使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']} 次") - return trades, stats, extra - - -# ========================= 运行示例(与 回测数据-30分钟版 风格一致)========================= -if __name__ == '__main__': - START_DATE = "2025-01-01" - END_DATE = "2025-12-31" - START_YEAR = None - START_MONTH = None - END_YEAR = None - END_MONTH = None - - dates = [] - if START_DATE and END_DATE: - try: - start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') - end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') - if start_dt > end_dt: - logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}") - exit(1) - current_dt = start_dt - while current_dt <= end_dt: - dates.append(current_dt.strftime('%Y-%m-%d')) - current_dt += datetime.timedelta(days=1) - logger.info(f"使用指定日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") - except ValueError as e: - logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式") - exit(1) - elif START_YEAR and END_YEAR: - start_m = START_MONTH if START_MONTH else 1 - end_m = END_MONTH if END_MONTH else 12 - for year in range(START_YEAR, END_YEAR + 1): - month_start = start_m if year == START_YEAR else 1 - month_end = end_m if year == END_YEAR else 12 - for month in range(month_start, month_end + 1): - days_in_month = calendar.monthrange(year, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"{year}-{month:02d}-{day:02d}") - logger.info(f"使用年份月份范围:{START_YEAR}年{start_m}月 到 {END_YEAR}年{end_m}月,共 {len(dates)} 天") - else: - logger.warning("未指定日期范围,使用默认:2025年1-12月") - for month in range(1, 13): - days_in_month = calendar.monthrange(2025, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"2025-{month:02d}-{day:02d}") - - if not dates: - logger.error("未生成任何查询日期,请检查配置参数") - exit(1) - - trades, stats, extra = backtest_one_third_precise(dates, min_body_size=0.1) - - logger.info("===== 每笔交易详情 =====") - contract_size = 10000 - open_fee_fixed = 5 - close_fee_rate = 0.0005 - total_points_profit = 0 - total_money_profit = 0 - total_fee = 0 - - for t in trades: - entry = t['entry'] - exit_p = t['exit'] - direction = t['direction'] - point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p) - money_profit = point_diff / entry * contract_size - fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate) - net_profit = money_profit - fee - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - logger.info( - f"{t['entry_time']} {direction}({t['signal']}) " - f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} " - f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" - ) - - total_net_profit = total_money_profit - total_fee - print("\n【BitMart 三分之一策略回测结果(5分钟K线 + 1分钟精准判断)】") - print(f"一共交易笔数:{len(trades)}") - print(f"总点差:{total_points_profit:.2f}") - print(f"总原始盈利(未扣费):{total_money_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") - print(f"总净利润:{total_net_profit:.2f}\n") - - print("===== 信号统计 =====") - for k, v in stats.items(): - name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] - win_rate = (wins / count * 100) if count > 0 else 0.0 - avg_p = (total_p / count) if count > 0 else 0.0 - print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") - print(f"使用1分钟K线精准判断(双触)次数: {extra['precise_1m_count']}") - print(f"使用开盘价距离判断次数: {extra['fallback_count']}") diff --git a/bitmart/回测数据-五分之一策略-3分钟精准版.py b/bitmart/回测数据-五分之一策略-3分钟精准版.py deleted file mode 100644 index 2244fbc..0000000 --- a/bitmart/回测数据-五分之一策略-3分钟精准版.py +++ /dev/null @@ -1,418 +0,0 @@ -""" -量化交易回测系统 - 五分之一策略(3分钟K线 + 1分钟精准判断) - -========== 策略规则 ========== -1. 触发价格计算(基于有效的前一根3分钟K线,实体>=0.1): - - 做多触发价格 = 收盘价 + 实体/5(从收盘价往上涨1/5) - - 做空触发价格 = 收盘价 - 实体/5(从收盘价往下跌1/5) - -2. 信号触发条件: - - 当前3分钟K线最高价 >= 做多触发价格 → 做多信号 - - 当前3分钟K线最低价 <= 做空触发价格 → 做空信号 - -3. 执行逻辑: - - 做多时遇到做空信号 -> 平多并反手开空 - - 做空时遇到做多信号 -> 平空并反手开多 - - 同一根3分钟K线内只交易一次 - -4. 精准判断(使用1分钟K线): - - 当一根3分钟K线同时触及做多和做空价格时 - - 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发 - - 使回测更贴近真实成交顺序 -""" - -import datetime -import calendar -from typing import List, Dict, Optional -from loguru import logger -from models.bitmart_klines import BitMartETH3M, BitMartETH1M - - -# ========================= 工具函数 ========================= - -def is_bullish(c): - """阳线""" - return float(c['close']) > float(c['open']) - - -def is_bearish(c): - """阴线""" - return float(c['close']) < float(c['open']) - - -def get_body_size(candle): - """K线实体大小(绝对值)""" - return abs(float(candle['open']) - float(candle['close'])) - - -def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): - """ - 从当前索引往前查找,直到找到实体>=min_body_size的K线 - 返回:(有效K线的索引, K线数据) 或 (None, None) - """ - if current_idx <= 0: - return None, None - for i in range(current_idx - 1, -1, -1): - prev = all_data[i] - if get_body_size(prev) >= min_body_size: - return i, prev - return None, None - - -def get_one_fifth_levels(prev: Dict): - """ - 计算前一根K线实体的 1/5 双向触发价格 - 返回:(做多触发价格, 做空触发价格) - """ - p_open = float(prev['open']) - p_close = float(prev['close']) - body = abs(p_open - p_close) - if body < 0.001: - return None, None - long_trigger = p_close + body / 5 - short_trigger = p_close - body / 5 - return long_trigger, short_trigger - - -def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]: - """ - 获取指定时间范围内的1分钟K线数据(毫秒时间戳) - """ - query = BitMartETH1M.select().where( - BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1) - ).order_by(BitMartETH1M.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - if data: - data.sort(key=lambda x: x['id']) - return data - - -def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]: - """获取一根3分钟K线对应的3根1分钟K线""" - start_ts = bar_3m['id'] - end_ts = start_ts + 3 * 60 * 1000 - return get_1m_data_by_range(start_ts, end_ts) - - -def determine_trigger_order_by_1m( - bars_1m: List[Dict], - long_trigger: float, - short_trigger: float -) -> Optional[str]: - """ - 使用1分钟K线判断在一根3分钟周期内,先触发做多还是做空。 - 按时间顺序遍历每根1分钟K线,先触及哪个方向则返回该方向; - 若同一根1分钟K线内两个方向都触及,用开盘价距离判断。 - """ - if not bars_1m: - return None - for bar in bars_1m: - high = float(bar['high']) - low = float(bar['low']) - open_price = float(bar['open']) - long_triggered = high >= long_trigger - short_triggered = low <= short_trigger - if long_triggered and not short_triggered: - return 'long' - if short_triggered and not long_triggered: - return 'short' - if long_triggered and short_triggered: - dist_to_long = abs(long_trigger - open_price) - dist_to_short = abs(short_trigger - open_price) - return 'short' if dist_to_short <= dist_to_long else 'long' - return None - - -def check_trigger_with_1m( - all_data_3m: List[Dict], - current_idx: int, - min_body_size: float = 0.1 -) -> tuple: - """ - 检查当前3分钟K线是否触发交易信号。 - 若同时触发多空,则用该3分钟内的1分钟K线判断先后顺序。 - 返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触) - """ - if current_idx <= 0: - return None, None, None, False, False - - curr = all_data_3m[current_idx] - valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size) - if prev is None: - return None, None, None, False, False - - long_trigger, short_trigger = get_one_fifth_levels(prev) - if long_trigger is None: - return None, None, None, False, False - - c_high = float(curr['high']) - c_low = float(curr['low']) - long_triggered = c_high >= long_trigger - short_triggered = c_low <= short_trigger - both_triggered = long_triggered and short_triggered - - if both_triggered: - bars_1m = get_1m_data_for_3m_bar(curr) - if bars_1m: - direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger) - if direction: - trigger_price = long_trigger if direction == 'long' else short_trigger - return direction, trigger_price, valid_prev_idx, True, True - c_open = float(curr['open']) - dist_to_long = abs(long_trigger - c_open) - dist_to_short = abs(short_trigger - c_open) - if dist_to_short <= dist_to_long: - return 'short', short_trigger, valid_prev_idx, False, True - return 'long', long_trigger, valid_prev_idx, False, True - - if short_triggered: - return 'short', short_trigger, valid_prev_idx, False, False - if long_triggered: - return 'long', long_trigger, valid_prev_idx, False, False - return None, None, None, False, False - - -def get_data_by_date(model, date_str: str) -> List[Dict]: - """ - 按天获取指定表的K线数据(毫秒时间戳)。 - 返回格式:id(ms), open, high, low, close,已按 id 升序。 - """ - try: - target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') - except ValueError: - logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") - return [] - start_ts = int(target_date.timestamp() * 1000) - end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) - data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] - if data: - data.sort(key=lambda x: x['id']) - return data - - -# ========================= 回测逻辑 ========================= - -def backtest_one_fifth_precise(dates: List[str], min_body_size: float = 0.1): - """ - 五分之一策略回测(3分钟K线 + 1分钟精准判断)。 - 风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。 - """ - all_data: List[Dict] = [] - total_queried = 0 - for d in dates: - day_data = get_data_by_date(BitMartETH3M, d) - all_data.extend(day_data) - if day_data: - total_queried += len(day_data) - - logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据") - - if not all_data: - logger.warning("未获取到任何数据,请检查:") - logger.warning("1. 数据库连接与 bitmart_eth_3m / bitmart_eth_1m 表是否存在") - logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 3 分钟数据") - return [], { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - }, {'precise_1m_count': 0, 'fallback_count': 0} - - all_data.sort(key=lambda x: x['id']) - if len(all_data) > 1: - first_ts = all_data[0]['id'] - last_ts = all_data[-1]['id'] - first_time = datetime.datetime.fromtimestamp(first_ts / 1000) - last_time = datetime.datetime.fromtimestamp(last_ts / 1000) - logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')} 到 {last_time.strftime('%Y-%m-%d %H:%M:%S')}") - - stats = { - 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, - 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, - } - extra = {'precise_1m_count': 0, 'fallback_count': 0} - - trades: List[Dict] = [] - current_position: Optional[Dict] = None - last_trade_bar: Optional[int] = None - - for idx in range(1, len(all_data)): - curr = all_data[idx] - direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size) - valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None - - if used_1m: - extra['precise_1m_count'] += 1 - elif both_triggered and direction: - extra['fallback_count'] += 1 - - if direction is None: - continue - if last_trade_bar == idx: - continue - - if current_position is None: - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx, - } - stats[direction]['count'] += 1 - last_trade_bar = idx - continue - - pos_dir = current_position['direction'] - if direction == pos_dir: - continue - - exit_price = trigger_price - if pos_dir == 'long': - diff = exit_price - current_position['entry_price'] - else: - diff = current_position['entry_price'] - exit_price - - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': curr['id'], - 'signal': '五分之一', - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - }) - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - current_position = { - 'direction': direction, - 'entry_price': trigger_price, - 'entry_time': curr['id'], - 'entry_bar': idx, - } - stats[direction]['count'] += 1 - last_trade_bar = idx - - if current_position: - last = all_data[-1] - exit_price = float(last['close']) - pos_dir = current_position['direction'] - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), - 'entry_time_ms': current_position['entry_time'], - 'exit_time_ms': last['id'], - 'signal': '五分之一', - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff, - }) - stats[pos_dir]['total_profit'] += diff - if diff > 0: - stats[pos_dir]['wins'] += 1 - - logger.info(f"回测完成:使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']} 次") - return trades, stats, extra - - -# ========================= 运行示例(与 回测数据-30分钟版 风格一致)========================= -if __name__ == '__main__': - START_DATE = "2025-01-01" - END_DATE = "2025-12-31" - START_YEAR = None - START_MONTH = None - END_YEAR = None - END_MONTH = None - - dates = [] - if START_DATE and END_DATE: - try: - start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') - end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') - if start_dt > end_dt: - logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}") - exit(1) - current_dt = start_dt - while current_dt <= end_dt: - dates.append(current_dt.strftime('%Y-%m-%d')) - current_dt += datetime.timedelta(days=1) - logger.info(f"使用指定日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") - except ValueError as e: - logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式") - exit(1) - elif START_YEAR and END_YEAR: - start_m = START_MONTH if START_MONTH else 1 - end_m = END_MONTH if END_MONTH else 12 - for year in range(START_YEAR, END_YEAR + 1): - month_start = start_m if year == START_YEAR else 1 - month_end = end_m if year == END_YEAR else 12 - for month in range(month_start, month_end + 1): - days_in_month = calendar.monthrange(year, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"{year}-{month:02d}-{day:02d}") - logger.info(f"使用年份月份范围:{START_YEAR}年{start_m}月 到 {END_YEAR}年{end_m}月,共 {len(dates)} 天") - else: - logger.warning("未指定日期范围,使用默认:2025年1-12月") - for month in range(1, 13): - days_in_month = calendar.monthrange(2025, month)[1] - for day in range(1, days_in_month + 1): - dates.append(f"2025-{month:02d}-{day:02d}") - - if not dates: - logger.error("未生成任何查询日期,请检查配置参数") - exit(1) - - trades, stats, extra = backtest_one_fifth_precise(dates, min_body_size=0.1) - - logger.info("===== 每笔交易详情 =====") - contract_size = 10000 - open_fee_fixed = 5 - close_fee_rate = 0.0005 - total_points_profit = 0 - total_money_profit = 0 - total_fee = 0 - - for t in trades: - entry = t['entry'] - exit_p = t['exit'] - direction = t['direction'] - point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p) - money_profit = point_diff / entry * contract_size - fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate) - net_profit = money_profit - fee - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - logger.info( - f"{t['entry_time']} {direction}({t['signal']}) " - f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} " - f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}" - ) - - total_net_profit = total_money_profit - total_fee - print("\n【BitMart 五分之一策略回测结果(3分钟K线 + 1分钟精准判断)】") - print(f"一共交易笔数:{len(trades)}") - print(f"总点差:{total_points_profit:.2f}") - print(f"总原始盈利(未扣费):{total_money_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") - print(f"总净利润:{total_net_profit:.2f}\n") - - print("===== 信号统计 =====") - for k, v in stats.items(): - name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] - win_rate = (wins / count * 100) if count > 0 else 0.0 - avg_p = (total_p / count) if count > 0 else 0.0 - print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") - print(f"使用1分钟K线精准判断(双触)次数: {extra['precise_1m_count']}") - print(f"使用开盘价距离判断次数: {extra['fallback_count']}") diff --git a/bitmart/趋势策略.py b/bitmart/趋势策略.py deleted file mode 100644 index 15e2cca..0000000 --- a/bitmart/趋势策略.py +++ /dev/null @@ -1,382 +0,0 @@ -import time -import datetime -import openBrowser - -from tqdm import tqdm -from loguru import logger -from bit_tools import openBrowser -from DrissionPage import ChromiumPage -from DrissionPage import ChromiumOptions - -from bitmart.api_contract import APIContract - - -class BitmartFuturesTransaction: - def __init__(self, bit_id): - - self.page: ChromiumPage | None = None - - self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" - self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" - self.memo = "合约交易" - - self.contract_symbol = "ETHUSDT" - - self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) - - self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 - self.direction = None - - self.pbar = tqdm(total=30, desc="等待K线", ncols=80) - - self.last_kline_time = None - - self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) - self.open_type = "cross" # 全仓模式(你的“成本开仓”需求) - self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% - - self.open_avg_price = None # 开仓价格 - self.current_amount = None # 持仓量 - - self.bit_id = bit_id - - def get_klines(self): - """获取最近3根30分钟K线(step=30)""" - try: - end_time = int(time.time()) - # 获取足够多的条目确保有最新3根 - response = self.contractAPI.get_kline( - contract_symbol=self.contract_symbol, - step=30, # 30分钟 - start_time=end_time - 3600 * 10, # 取最近10小时 - end_time=end_time - )[0]["data"] - - # 每根: [timestamp, open, high, low, close, volume] - formatted = [] - for k in response: - formatted.append({ - 'id': int(k["timestamp"]), - 'open': float(k["open_price"]), - 'high': float(k["high_price"]), - 'low': float(k["low_price"]), - 'close': float(k["close_price"]) - }) - formatted.sort(key=lambda x: x['id']) - return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新) - except Exception as e: - logger.error(f"获取K线异常: {e}") - self.ding(error=True, msg="获取K线异常") - return None - - def get_current_price(self): - """获取当前最新价格,用于计算张数""" - try: - end_time = int(time.time()) - response = self.contractAPI.get_kline( - contract_symbol=self.contract_symbol, - step=1, # 1分钟 - start_time=end_time - 3600 * 3, # 取最近10小时 - end_time=end_time - )[0] - if response['code'] == 1000: - return float(response['data'][-1]["close_price"]) - return None - except Exception as e: - logger.error(f"获取价格异常: {e}") - return None - - def get_available_balance(self): - """获取合约账户可用USDT余额""" - try: - response = self.contractAPI.get_assets_detail()[0] - if response['code'] == 1000: - data = response['data'] - if isinstance(data, dict): - return float(data.get('available_balance', 0)) - elif isinstance(data, list): - for asset in data: - if asset.get('currency') == 'USDT': - return float(asset.get('available_balance', 0)) - return None - except Exception as e: - logger.error(f"余额查询异常: {e}") - return None - - # 获取当前持仓方向 - def get_position_status(self): - """获取当前持仓方向""" - try: - response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] - if response['code'] == 1000: - positions = response['data'] - if not positions: - self.start = 0 - return True - self.start = 1 if positions[0]['position_type'] == 1 else -1 - self.open_avg_price = positions[0]['open_avg_price'] - self.current_amount = positions[0]['current_amount'] - self.position_cross = positions[0]["position_cross"] - return True - - else: - return False - - except Exception as e: - logger.error(f"持仓查询异常: {e}") - return False - - # 设置杠杆和全仓 - def set_leverage(self): - """程序启动时设置全仓 + 高杠杆""" - try: - response = self.contractAPI.post_submit_leverage( - contract_symbol=self.contract_symbol, - leverage=self.leverage, - open_type=self.open_type - )[0] - if response['code'] == 1000: - logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") - return True - else: - logger.error(f"杠杆设置失败: {response}") - return False - except Exception as e: - logger.error(f"设置杠杆异常: {e}") - return False - - def openBrowser(self): - """打开 TGE 对应浏览器实例""" - try: - bit_port = openBrowser(id=self.bit_id) - co = ChromiumOptions() - co.set_local_port(port=bit_port) - self.page = ChromiumPage(addr_or_opts=co) - return True - except: - return False - - def take_over_browser(self): - """接管浏览器""" - try: - co = ChromiumOptions() - co.set_local_port(self.tge_port) - self.page = ChromiumPage(addr_or_opts=co) - self.page.set.window.max() - return True - except: - return False - - def close_extra_tabs(self): - """关闭多余 tab""" - try: - for idx, tab in enumerate(self.page.get_tabs()): - if idx > 0: - tab.close() - return True - except: - return False - - def click_safe(self, xpath, sleep=0.5): - """安全点击""" - try: - ele = self.page.ele(xpath) - if not ele: - return False - ele.scroll.to_see(center=True) - time.sleep(sleep) - ele.click() - return True - except: - return False - - def 平仓(self): - self.click_safe('x://span[normalize-space(text()) ="市价"]') - - def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None): - """ - marketPriceLongOrder 市价最多或者做空,1是做多,-1是做空 - limitPriceShortOrder 限价最多或者做空 - """ - - if marketPriceLongOrder == -1: - self.click_safe('x://button[normalize-space(text()) ="市价"]') - self.page.ele('x://*[@id="size_0"]').input(size) - self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') - elif marketPriceLongOrder == 1: - self.click_safe('x://button[normalize-space(text()) ="市价"]') - self.page.ele('x://*[@id="size_0"]').input(size) - self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') - - if limitPriceShortOrder == -1: - self.click_safe('x://button[normalize-space(text()) ="限价"]') - self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) - time.sleep(1) - self.page.ele('x://*[@id="size_0"]').input(1) - self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') - elif limitPriceShortOrder == 1: - self.click_safe('x://button[normalize-space(text()) ="限价"]') - self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) - time.sleep(1) - self.page.ele('x://*[@id="size_0"]').input(1) - self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') - - def ding(self, text, error=False): - logger.info(text) - - def close_extra_tabs_in_browser(self): - - try: - for _, i in enumerate(self.page.get_tabs()): - if _ == 0: - continue - - i.close() - - return True - except: - pass - - return False - - def get_now_time(self): - # 获取当前时间戳 - current_timestamp = time.time() - # 将当前时间戳转换为 datetime 对象 - current_datetime = datetime.datetime.fromtimestamp(current_timestamp) - - # 计算距离当前时间最近的整点或 30 分时刻 - if current_datetime.minute < 30: - target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) - else: - target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0) - - # 将目标 datetime 对象转换为时间戳 - target_timestamp = target_datetime.timestamp() - - return int(target_timestamp) - - def is_bullish(self, c): # 阳线 - return float(c['close']) > float(c['open']) - - def is_bearish(self, c): # 阴线 - return float(c['close']) < float(c['open']) - - def check_signal(self, prev, curr): - """ - 包住形态信号判定(优化版): - 只看两种信号,严格按照收盘价与开盘价的比较: - - 1. 阳包阴(涨包跌,前跌后涨)-> 做多: - - 前一根是跌(阴线:close < open) - - 后一根是涨(阳线:close > open) - - 且:涨的收盘价 > 跌的开盘价(curr['close'] > prev['open']) - - 2. 阴包阳(跌包涨,前涨后跌)-> 做空: - - 前一根是涨(阳线:close > open) - - 后一根是跌(阴线:close < open) - - 且:跌的收盘价 < 涨的开盘价(curr['close'] < prev['open']) - """ - p_open = float(prev['open']) - c_close = float(curr['close']) - - # 阳包阴(涨包跌,前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价 - if self.is_bearish(prev) and self.is_bullish(curr) and c_close > p_open: - return "long", "bear_bull_engulf" - - # 阴包阳(跌包涨,前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价 - if self.is_bullish(prev) and self.is_bearish(curr) and c_close < p_open: - return "short", "bull_bear_engulf" - - return None, None - - def action(self): - # 启动时设置全仓高杠杆 - if not self.set_leverage(): - logger.error("杠杆设置失败,程序继续运行但可能下单失败") - return - - # 1. 打开浏览器 - if not self.openBrowser(): - self.ding("打开 TGE 失败!", error=True) - return - logger.info("TGE 端口获取成功") - - if self.close_extra_tabs_in_browser(): - logger.info('关闭多余标签页成功!!!') - else: - logger.info('关闭多余标签页失败!!!') - - self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") - - self.click_safe('x://button[normalize-space(text()) ="市价"]') - - self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc:进度条说明,ncols:长度 - - self.time_start = None # 时间状态 避免同一个时段,发生太多消息 - while True: - - # 获取当前时间 - current_time = time.localtime() - current_minute = current_time.tm_min - - if current_minute < 30: - self.pbar.n = current_minute - self.pbar.refresh() - else: - self.pbar.n = current_minute - 30 - self.pbar.refresh() - - if self.time_start == self.get_now_time(): - time.sleep(5) - continue - - new_price_datas = self.get_klines() - if not new_price_datas: - logger.info("获取最新价格有问题!!!") - - new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"]) - self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:] - - # 判断抓取的数据是否正确 - if self.get_now_time() != self.kline_3["id"]: - continue - - self.time_start = self.get_now_time() - - if self.get_position_status(): - logger.info("获取仓位信息成功!!!") - else: - logger.info("获取仓位信息失败!!!") - - self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0) - continue - - if self.start == 1: - if self.is_bearish(self.kline_1) and self.is_bearish(self.kline_2): - self.平仓() - elif self.start == -1: - if self.is_bullish(self.kline_1) and self.is_bullish(self.kline_2): - self.平仓() - - self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号 - - if self.direction == "long": - if self.start == -1: - self.平仓() - self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent) - elif self.start == 0: - self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent) - - if self.direction == "short": - if self.start == 1: - self.平仓() - self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent) - elif self.start == 0: - self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent) - - self.pbar.reset() # 重置进度条 - - -if __name__ == '__main__': - BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()