Files
lm_code/bitmart/回测-三分之一策略-精准版.py
2026-02-02 00:34:04 +08:00

746 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 五分之一回归策略回测(精准版)
使用3分钟K线周期计算触发价格1分钟K线判断触发顺序
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根3分钟K线同时触及做多和做空价格时
- 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发
- 这样可以更精准地还原真实交易场景
"""
import datetime
import calendar
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
from peewee import *
# 数据库配置
DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# ========================= 数据库模型 =========================
class BitMartETH1m(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BitMartETH3m(Model):
"""3分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
# 连接数据库
db.connect(reuse_if_open=True)
# ========================= 工具函数 =========================
def is_bullish(c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(prev):
"""
计算前一根K线实体的 1/5 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5实体
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5实体
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def get_3m_data_by_date(date_str: str) -> List[Dict]:
"""按天获取3分钟K线数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = BitMartETH3m.select().where(
BitMartETH3m.id.between(start_ts, end_ts)
).order_by(BitMartETH3m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_by_range(start_ts: int, end_ts: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:return: 1分钟K线数据列表
"""
query = BitMartETH1m.select().where(
BitMartETH1m.id.between(start_ts, end_ts - 1)
).order_by(BitMartETH1m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]:
"""
获取3分钟K线对应的3根1分钟K线
:param bar_3m: 3分钟K线数据
:return: 对应的1分钟K线数据列表最多3根
"""
start_ts = bar_3m['id']
end_ts = start_ts + 3 * 60 * 1000 # 3分钟后
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> str:
"""
使用1分钟K线精确判断在3分钟周期内是先触发做多还是做空
:param bars_1m: 3根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:return: 'long', 'short', 或 None
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
# 如果只触发了一个方向
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
# 如果两个方向都触发了在同一根1分钟K线内
if long_triggered and short_triggered:
# 根据开盘价判断:
# 如果开盘价更接近做空触发价,说明先往下走,先触发做空
# 如果开盘价更接近做多触发价,说明先往上走,先触发做多
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
if dist_to_short < dist_to_long:
return 'short'
else:
return 'long'
return None
def check_reverse_signal_in_first_minute(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float,
current_direction: str
) -> bool:
"""
检查反手信号是否在第一分钟触发
:param bars_1m: 3根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:param current_direction: 当前持仓方向 ('long''short')
:return: True 表示反手信号在第一分钟触发False 表示不是
"""
if not bars_1m:
return False
# 只检查第一分钟K线
first_bar = bars_1m[0]
high = float(first_bar['high'])
low = float(first_bar['low'])
# 如果当前是多仓,检查空信号是否在第一分钟触发
if current_direction == 'long':
return low <= short_trigger
# 如果当前是空仓,检查多信号是否在第一分钟触发
if current_direction == 'short':
return high >= long_trigger
return False
def get_body_percent(candle) -> float:
"""
计算K线实体占价格的百分比
:param candle: K线数据
:return: 实体百分比如0.1表示0.1%
"""
body = abs(float(candle['open']) - float(candle['close']))
price = (float(candle['open']) + float(candle['close'])) / 2
if price == 0:
return 0
return (body / price) * 100
def check_breakout_reverse_signal(
all_data_3m: List[Dict],
current_idx: int,
current_position_direction: str,
min_body_percent: float = 0.1
) -> tuple:
"""
检查"突破上一根K线高低点"的反手信号
规则:
- 持多单时当前K线跌破上一根K线最低点 → 反手开空
条件上一根K线是阴线且实体>0.1%波动
- 持空单时当前K线涨破上一根K线最高点 → 反手开多
条件上一根K线是阳线且实体>0.1%波动
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param current_position_direction: 当前持仓方向 ('long''short')
:param min_body_percent: 最小实体百分比默认0.1%
:return: (方向, 触发价格, 信号类型) 或 (None, None, None)
"""
if current_idx <= 0 or current_position_direction is None:
return None, None, None
curr = all_data_3m[current_idx]
prev = all_data_3m[current_idx - 1]
c_high = float(curr['high'])
c_low = float(curr['low'])
prev_high = float(prev['high'])
prev_low = float(prev['low'])
# 计算上一根K线的实体百分比
body_percent = get_body_percent(prev)
# 持多单时检查是否跌破上一根K线最低点
if current_position_direction == 'long':
# 条件上一根K线是阴线且实体>min_body_percent%
if is_bearish(prev) and body_percent >= min_body_percent:
if c_low < prev_low:
# 触发反手开空信号
logger.debug(f"突破反手信号持多单当前K线低点{c_low:.2f}跌破上一根阴线低点{prev_low:.2f},实体{body_percent:.3f}%")
return 'short', prev_low, 'breakout'
# 持空单时检查是否涨破上一根K线最高点
elif current_position_direction == 'short':
# 条件上一根K线是阳线且实体>min_body_percent%
if is_bullish(prev) and body_percent >= min_body_percent:
if c_high > prev_high:
# 触发反手开多信号
logger.debug(f"突破反手信号持空单当前K线高点{c_high:.2f}涨破上一根阳线高点{prev_high:.2f},实体{body_percent:.3f}%")
return 'long', prev_high, 'breakout'
return None, None, None
def check_trigger_with_1m(
all_data_3m: List[Dict],
current_idx: int,
min_body_size: float = 0.1,
current_position_direction: str = None,
first_minute_only: bool = True
) -> tuple:
"""
检查当前3分钟K线是否触发了交易信号
如果同时触发两个方向使用1分钟K线精确判断顺序
新增逻辑:如果有持仓且 first_minute_only=True反手信号必须在第一分钟触发才有效
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param min_body_size: 最小实体大小
:param current_position_direction: 当前持仓方向 ('long', 'short', 或 None)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效
返回:(方向, 触发价格, 有效前一根K线索引, 1分钟数据是否使用)
"""
if current_idx <= 0:
return None, None, None, False
curr = all_data_3m[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size)
if prev is None:
return None, None, None, False
long_trigger, short_trigger = get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, False
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发使用1分钟K线精确判断
if long_triggered and short_triggered:
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
# 检查反手信号是否需要在第一分钟触发
if first_minute_only and current_position_direction and direction != current_position_direction:
# 这是一个反手信号,检查是否在第一分钟触发
if not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, current_position_direction):
# 反手信号不是在第一分钟触发,忽略
logger.debug(f"反手信号 {direction} 不在第一分钟触发,忽略")
return None, None, None, False
return direction, trigger_price, valid_prev_idx, True
# 如果没有1分钟数据使用开盘价距离判断
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False
else:
return 'long', long_trigger, valid_prev_idx, False
if short_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'long':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'long'):
logger.debug(f"空信号不在第一分钟触发,忽略(当前持多仓)")
return None, None, None, False
return 'short', short_trigger, valid_prev_idx, False
if long_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'short':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'short'):
logger.debug(f"多信号不在第一分钟触发,忽略(当前持空仓)")
return None, None, None, False
return 'long', long_trigger, valid_prev_idx, False
return None, None, None, False
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(
dates: List[str],
min_body_size: float = 0.1,
first_minute_only: bool = True,
enable_breakout_reverse: bool = True,
breakout_min_body_percent: float = 0.1
):
"""
三分之一策略回测(精准版)
:param dates: 日期列表
:param min_body_size: 最小实体大小(绝对值)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效默认True
:param enable_breakout_reverse: 是否启用"突破上一根K线高低点"反手信号默认True
:param breakout_min_body_percent: 突破反手信号的最小实体百分比默认0.1%
:return: (trades, stats)
"""
# 获取所有3分钟K线数据
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_3m_data_by_date(d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据")
logger.info(f"反手信号仅第一分钟有效: {first_minute_only}")
logger.info(f"突破反手信号启用: {enable_breakout_reverse},最小实体百分比: {breakout_min_body_percent}%")
if not all_data:
logger.warning("未获取到任何数据,请检查数据库")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
# 按时间戳排序
all_data.sort(key=lambda x: x['id'])
# 验证排序结果
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
# 统计使用1分钟数据精准判断的次数
precise_count = 0
fallback_count = 0
# 统计突破反手信号触发次数
breakout_reverse_count = 0
# 记录每根K线是否已经执行过突破反手当前K线只执行一次
last_breakout_bar_id = None
idx = 1
while idx < len(all_data):
curr = all_data[idx]
curr_bar_id = curr['id']
# 获取当前持仓方向(用于判断反手信号是否在第一分钟触发)
current_pos_dir = current_position['direction'] if current_position else None
# 检测信号使用1分钟K线精准判断并考虑反手信号第一分钟限制
direction, trigger_price, valid_prev_idx, used_1m = check_trigger_with_1m(
all_data, idx, min_body_size,
current_position_direction=current_pos_dir,
first_minute_only=first_minute_only
)
# 如果没有五分之一信号,且有持仓,检查突破反手信号
signal_type = 'one_fifth' # 信号类型one_fifth五分之一或 breakout突破
if direction is None and current_position is not None and enable_breakout_reverse:
# 检查当前K线是否已经执行过突破反手
if last_breakout_bar_id != curr_bar_id:
breakout_dir, breakout_price, breakout_type = check_breakout_reverse_signal(
all_data, idx, current_pos_dir, breakout_min_body_percent
)
if breakout_dir:
direction = breakout_dir
trigger_price = breakout_price
signal_type = 'breakout'
if used_1m:
precise_count += 1
elif direction and signal_type == 'one_fifth':
fallback_count += 1
# 无持仓 -> 开仓(只用五分之一信号开仓,突破信号不用于开仓)
if current_position is None:
if direction and signal_type == 'one_fifth':
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 开仓{direction} @ {trigger_price:.2f}")
idx += 1
continue
# 有持仓 -> 检查是否需要反向
pos_dir = current_position['direction']
if direction and direction != pos_dir:
# 反向信号,平仓并反手
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
# 记录信号类型
signal_type_str = '突破' if signal_type == 'breakout' else '五分之一'
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
'signal_type': signal_type_str
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 如果是突破反手信号记录当前K线ID防止重复执行
if signal_type == 'breakout':
last_breakout_bar_id = curr_bar_id
breakout_reverse_count += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 平{pos_dir}反手{direction}({signal_type_str}) @ {trigger_price:.2f} 盈亏={diff:.2f}")
idx += 1
# 尾仓处理最后一根K线收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count} 次,突破反手信号 {breakout_reverse_count}")
return trades, stats
# ========================= 运行回测 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
MIN_BODY_SIZE = 0.1 # 最小实体大小(绝对值)
FIRST_MINUTE_ONLY = True # 反手信号仅在3分钟K线的第一分钟触发才有效
# 突破反手信号配置
ENABLE_BREAKOUT_REVERSE = True # 是否启用"突破上一根K线高低点"反手信号
BREAKOUT_MIN_BODY_PERCENT = 0.1 # 突破反手信号的最小实体百分比0.1表示0.1%
# ==================== 生成查询日期列表 ====================
dates = []
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"回测日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e}")
exit(1)
# ==================== 执行回测 ====================
trades, stats = backtest_one_third_strategy(
dates,
MIN_BODY_SIZE,
FIRST_MINUTE_ONLY,
ENABLE_BREAKOUT_REVERSE,
BREAKOUT_MIN_BODY_PERCENT
)
# ==================== 输出交易详情 ====================
logger.info("===== 每笔交易详情 =====")
# 参数设定
contract_size = 10000 # 合约规模
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 平仓手续费率
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_price = t['exit']
direction = t['direction']
# 原始价差
point_diff = t['diff']
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit_price * close_fee_rate)
# 净利润
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_price:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
print(f"\n{'='*60}")
print(f"【BitMart 五分之一策略回测结果3分钟K线 + 1分钟精准判断")
print(f"{'='*60}")
print(f"回测周期:{START_DATE}{END_DATE}")
print(f"最小实体要求:{MIN_BODY_SIZE}")
print(f"反手信号仅第一分钟有效:{'' if FIRST_MINUTE_ONLY else ''}")
print(f"突破反手信号:{'启用' if ENABLE_BREAKOUT_REVERSE else '禁用'}(最小实体{BREAKOUT_MIN_BODY_PERCENT}%")
print(f"{'='*60}")
print(f"总交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
name = v['name']
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
# 保存交易记录到CSV
if trades:
import csv
csv_path = Path(__file__).parent / 'backtest_one_third_trades.csv'
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'entry_time', 'exit_time', 'direction', 'entry', 'exit',
'point_diff', 'raw_profit', 'fee', 'net_profit', 'signal_type'
])
writer.writeheader()
for t in trades:
writer.writerow({
'entry_time': t['entry_time'],
'exit_time': t['exit_time'],
'direction': t['direction'],
'entry': t['entry'],
'exit': t['exit'],
'point_diff': t['point_diff'],
'raw_profit': t['raw_profit'],
'fee': t['fee'],
'net_profit': t['net_profit'],
'signal_type': t.get('signal_type', '五分之一')
})
print(f"\n交易记录已保存到:{csv_path}")