Files
lm_code/bitmart/指数平均线.py
2025-12-19 19:16:41 +08:00

768 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
量化交易回测系统 - EMA交叉策略双EMA/三EMA
"""
import csv
import datetime
import numpy as np
from typing import List, Dict, Optional, Tuple
# ========================= EMA计算函数 =========================
def calculate_ema(prices: List[float], period: int) -> List[Optional[float]]:
"""
计算指数移动平均线(EMA)
Args:
prices: 价格列表(通常是收盘价)
period: EMA周期
Returns:
EMA值列表前period-1个为None
"""
if len(prices) < period:
return [None] * len(prices)
ema_values = [None] * (period - 1)
# 计算初始SMA作为EMA的起点
sma = sum(prices[:period]) / period
# EMA计算公式EMA_today = (Price_today * (2/(period+1))) + (EMA_yesterday * (1 - (2/(period+1))))
multiplier = 2 / (period + 1)
# 第一个EMA值
ema = sma
ema_values.append(ema)
# 计算后续EMA值
for price in prices[period:]:
ema = (price * multiplier) + (ema * (1 - multiplier))
ema_values.append(ema)
return ema_values
# ========================= 策略核心函数 =========================
def check_ema_cross(ema_fast: List[Optional[float]],
ema_slow: List[Optional[float]],
idx: int) -> Optional[str]:
"""
检查EMA金叉/死叉
Args:
ema_fast: 快线EMA值列表
ema_slow: 慢线EMA值列表
idx: 当前K线索引
Returns:
"golden" - 金叉(做多信号)
"dead" - 死叉(做空信号)
None - 无交叉
"""
if idx < 1:
return None
# 确保有足够的数据
if ema_fast[idx] is None or ema_fast[idx - 1] is None:
return None
if ema_slow[idx] is None or ema_slow[idx - 1] is None:
return None
# 前一根K线快线在慢线下方当前K线快线上穿慢线 -> 金叉
if ema_fast[idx - 1] < ema_slow[idx - 1] and ema_fast[idx] > ema_slow[idx]:
return "golden"
# 前一根K线快线在慢线上方当前K线快线下穿慢线 -> 死叉
if ema_fast[idx - 1] > ema_slow[idx - 1] and ema_fast[idx] < ema_slow[idx]:
return "dead"
return None
def check_triple_ema_cross(ema_fast: List[Optional[float]],
ema_mid: List[Optional[float]],
ema_slow: List[Optional[float]],
idx: int) -> Optional[str]:
"""
检查三EMA交叉更稳定的信号
规则:快线 > 中线 > 慢线 -> 多头排列 -> 做多
快线 < 中线 < 慢线 -> 空头排列 -> 做空
Args:
ema_fast: 快线如EMA7
ema_mid: 中线如EMA14
ema_slow: 慢线如EMA30
idx: 当前K线索引
Returns:
"golden" - 金叉多头排列
"dead" - 死叉空头排列
None - 无明确信号
"""
if idx < 1:
return None
# 确保有足够的数据
if any(ema[idx] is None or ema[idx - 1] is None for ema in [ema_fast, ema_mid, ema_slow]):
return None
# 检查是否形成多头排列EMA快 > 中 > 慢)
current_fast = ema_fast[idx]
current_mid = ema_mid[idx]
current_slow = ema_slow[idx]
prev_fast = ema_fast[idx - 1]
prev_mid = ema_mid[idx - 1]
prev_slow = ema_slow[idx - 1]
# 多头排列条件:快线 > 中线 > 慢线
is_golden_triple = current_fast > current_mid > current_slow
was_not_golden = not (prev_fast > prev_mid > prev_slow)
# 空头排列条件:快线 < 中线 < 慢线
is_dead_triple = current_fast < current_mid < current_slow
was_not_dead = not (prev_fast < prev_mid < prev_slow)
if is_golden_triple and was_not_golden:
return "golden"
elif is_dead_triple and was_not_dead:
return "dead"
return None
# ========================= 回测引擎 =========================
class EMABacktester:
"""EMA交叉策略回测器"""
def __init__(self,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9, # 用于MACD信号线
use_triple_ema: bool = False,
mid_period: int = 14, # 三EMA时的中间周期
use_macd_confirmation: bool = False,
stop_loss_pct: float = 0.02, # 2%止损
take_profit_pct: float = 0.05, # 5%止盈
trailing_stop_pct: float = 0.03): # 3%移动止损
self.fast_period = fast_period
self.slow_period = slow_period
self.signal_period = signal_period
self.use_triple_ema = use_triple_ema
self.mid_period = mid_period
self.use_macd_confirmation = use_macd_confirmation
self.stop_loss_pct = stop_loss_pct
self.take_profit_pct = take_profit_pct
self.trailing_stop_pct = trailing_stop_pct
self.stats = {
'golden_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '金叉做多'},
'dead_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '死叉做空'},
}
def calculate_macd(self, prices: List[float]) -> Tuple[List[Optional[float]],
List[Optional[float]],
List[Optional[float]]]:
"""
计算MACD指标
Returns: (MACD线, 信号线, 柱状图)
"""
# 计算快慢EMA
ema_fast = calculate_ema(prices, self.fast_period)
ema_slow = calculate_ema(prices, self.slow_period)
# 计算MACD线 = EMA快线 - EMA慢线
macd_line = []
for i in range(len(prices)):
if ema_fast[i] is not None and ema_slow[i] is not None:
macd_line.append(ema_fast[i] - ema_slow[i])
else:
macd_line.append(None)
# 计算信号线MACD的EMA
signal_line = calculate_ema([x for x in macd_line if x is not None] if any(macd_line) else [],
self.signal_period)
# 补全None值
signal_line_extended = [None] * (len(prices) - len(signal_line)) + signal_line if len(signal_line) < len(
prices) else signal_line
# 计算柱状图
histogram = []
for i in range(len(prices)):
if macd_line[i] is not None and signal_line_extended[i] is not None:
histogram.append(macd_line[i] - signal_line_extended[i])
else:
histogram.append(None)
return macd_line, signal_line_extended, histogram
def backtest(self, data: List[Dict]) -> Tuple[List[Dict], Dict]:
"""
执行EMA交叉策略回测
Args:
data: K线数据列表
Returns:
trades: 交易记录列表
stats: 统计数据
"""
# 提取收盘价
close_prices = [float(c['close']) for c in data]
# 计算EMA
ema_fast = calculate_ema(close_prices, self.fast_period)
ema_slow = calculate_ema(close_prices, self.slow_period)
# 如果需要三EMA计算中间EMA
ema_mid = None
if self.use_triple_ema:
ema_mid = calculate_ema(close_prices, self.mid_period)
# 如果需要MACD确认计算MACD
macd_line, signal_line, histogram = None, None, None
if self.use_macd_confirmation:
macd_line, signal_line, histogram = self.calculate_macd(close_prices)
trades: List[Dict] = []
current_position: Optional[Dict] = None
highest_price_since_entry = 0 # 用于移动止损
lowest_price_since_entry = float('inf') # 用于移动止损
# 遍历K线数据跳过前几个没有EMA值的
start_idx = max(self.fast_period, self.slow_period)
if self.use_triple_ema:
start_idx = max(start_idx, self.mid_period)
for idx in range(start_idx, len(data)):
current_bar = data[idx]
current_price = float(current_bar['close'])
open_price = float(current_bar['open'])
# ========== 信号检测 ==========
signal = None
# 基础双EMA交叉信号
if not self.use_triple_ema:
signal = check_ema_cross(ema_fast, ema_slow, idx)
# 三EMA排列信号
else:
signal = check_triple_ema_cross(ema_fast, ema_mid, ema_slow, idx)
# MACD确认可选
if signal and self.use_macd_confirmation:
macd_confirmed = False
if signal == "golden":
# 金叉确认MACD线在信号线上方且柱状图为正
if macd_line[idx] is not None and signal_line[idx] is not None:
macd_confirmed = (macd_line[idx] > signal_line[idx]) and (
histogram[idx] is not None and histogram[idx] > 0)
elif signal == "dead":
# 死叉确认MACD线在信号线下方且柱状图为负
if macd_line[idx] is not None and signal_line[idx] is not None:
macd_confirmed = (macd_line[idx] < signal_line[idx]) and (
histogram[idx] is not None and histogram[idx] < 0)
if not macd_confirmed:
signal = None
# ========== 空仓时开仓 ==========
if current_position is None and signal:
# 下一根K线开盘价入场
if idx + 1 < len(data):
entry_price = float(data[idx + 1]['open'])
if signal == "golden": # 做多
current_position = {
'direction': 'long',
'entry_price': entry_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'golden_cross',
'highest_price': entry_price, # 用于移动止损
'lowest_price': entry_price, # 用于空头的移动止损
}
self.stats['golden_cross']['count'] += 1
elif signal == "dead": # 做空
current_position = {
'direction': 'short',
'entry_price': entry_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'dead_cross',
'highest_price': entry_price,
'lowest_price': entry_price,
}
self.stats['dead_cross']['count'] += 1
# 跳过下一根因为已经在这根K线开盘入场
continue
# ========== 持仓时处理 ==========
if current_position:
pos_dir = current_position['direction']
entry_price = current_position['entry_price']
signal_key = current_position['signal']
# 更新最高/最低价(用于移动止损)
if pos_dir == 'long':
current_position['highest_price'] = max(current_position['highest_price'], current_price)
else: # short
current_position['lowest_price'] = min(current_position['lowest_price'], current_price)
# ========== 止损止盈检查 ==========
should_exit = False
exit_reason = ""
exit_price = current_price # 默认用收盘价平仓
# 固定止损
if pos_dir == 'long':
stop_loss_price = entry_price * (1 - self.stop_loss_pct)
if current_price <= stop_loss_price:
should_exit = True
exit_reason = "止损"
exit_price = stop_loss_price
# 固定止盈
take_profit_price = entry_price * (1 + self.take_profit_pct)
if current_price >= take_profit_price:
should_exit = True
exit_reason = "止盈"
exit_price = take_profit_price
# 移动止损
trailing_stop_price = current_position['highest_price'] * (1 - self.trailing_stop_pct)
if current_price <= trailing_stop_price:
should_exit = True
exit_reason = "移动止损"
exit_price = trailing_stop_price
else: # short
stop_loss_price = entry_price * (1 + self.stop_loss_pct)
if current_price >= stop_loss_price:
should_exit = True
exit_reason = "止损"
exit_price = stop_loss_price
# 固定止盈
take_profit_price = entry_price * (1 - self.take_profit_pct)
if current_price <= take_profit_price:
should_exit = True
exit_reason = "止盈"
exit_price = take_profit_price
# 移动止损
trailing_stop_price = current_position['lowest_price'] * (1 + self.trailing_stop_pct)
if current_price >= trailing_stop_price:
should_exit = True
exit_reason = "移动止损"
exit_price = trailing_stop_price
# ========== 反向信号检查 ==========
if signal and (
(signal == "dead" and pos_dir == "long") or
(signal == "golden" and pos_dir == "short")
):
should_exit = True
exit_reason = "反向信号"
# 反向信号用下一根开盘价平仓
if idx + 1 < len(data):
exit_price = float(data[idx + 1]['open'])
# ========== 执行平仓 ==========
if should_exit:
# 计算盈亏
if pos_dir == 'long':
diff = exit_price - entry_price
else: # short
diff = entry_price - exit_price
# 记录交易
trade = {
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']),
'exit_time': datetime.datetime.fromtimestamp(current_bar['id']),
'signal': self.stats[signal_key]['name'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': entry_price,
'exit': exit_price,
'diff': diff,
'exit_reason': exit_reason,
'holding_bars': idx - current_position['entry_idx'] + 1,
}
trades.append(trade)
# 更新统计
self.stats[signal_key]['total_profit'] += diff
if diff > 0:
self.stats[signal_key]['wins'] += 1
# 平仓
current_position = None
# 如果是因为反向信号平仓,立即反手开仓
if exit_reason == "反向信号" and signal and idx + 1 < len(data):
if signal == "golden": # 反手做多
current_position = {
'direction': 'long',
'entry_price': exit_price, # 同价反手
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'golden_cross',
'highest_price': exit_price,
'lowest_price': exit_price,
}
self.stats['golden_cross']['count'] += 1
elif signal == "dead": # 反手做空
current_position = {
'direction': 'short',
'entry_price': exit_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'dead_cross',
'highest_price': exit_price,
'lowest_price': exit_price,
}
self.stats['dead_cross']['count'] += 1
# 跳过下一根K线
continue
# ========== 尾仓处理 ==========
if current_position:
last_bar = data[-1]
exit_price = float(last_bar['close'])
pos_dir = current_position['direction']
entry_price = current_position['entry_price']
signal_key = current_position['signal']
diff = (exit_price - entry_price) if pos_dir == 'long' else (entry_price - exit_price)
trade = {
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']),
'exit_time': datetime.datetime.fromtimestamp(last_bar['id']),
'signal': self.stats[signal_key]['name'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': entry_price,
'exit': exit_price,
'diff': diff,
'exit_reason': "尾仓平仓",
'holding_bars': len(data) - current_position['entry_idx'],
}
trades.append(trade)
self.stats[signal_key]['total_profit'] += diff
if diff > 0:
self.stats[signal_key]['wins'] += 1
return trades, self.stats
# ========================= 可视化分析 =========================
def analyze_trades(trades: List[Dict], stats: Dict) -> Dict:
"""深入分析交易结果"""
if not trades:
return {}
# 基础统计
total_trades = len(trades)
winning_trades = [t for t in trades if t['diff'] > 0]
losing_trades = [t for t in trades if t['diff'] <= 0]
win_rate = len(winning_trades) / total_trades * 100 if total_trades > 0 else 0
# 盈亏统计
total_profit = sum(t['diff'] for t in trades)
avg_profit_per_trade = total_profit / total_trades if total_trades > 0 else 0
# 胜率相关
avg_win = np.mean([t['diff'] for t in winning_trades]) if winning_trades else 0
avg_loss = np.mean([t['diff'] for t in losing_trades]) if losing_trades else 0
# 盈亏比
profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
# 最大连续盈利/亏损
max_consecutive_wins = 0
max_consecutive_losses = 0
current_wins = 0
current_losses = 0
for trade in trades:
if trade['diff'] > 0:
current_wins += 1
current_losses = 0
max_consecutive_wins = max(max_consecutive_wins, current_wins)
else:
current_losses += 1
current_wins = 0
max_consecutive_losses = max(max_consecutive_losses, current_losses)
# 持仓时间分析
holding_bars = [t.get('holding_bars', 0) for t in trades]
avg_holding_bars = np.mean(holding_bars) if holding_bars else 0
# 按平仓原因分析
exit_reasons = {}
for trade in trades:
reason = trade.get('exit_reason', '未知')
exit_reasons[reason] = exit_reasons.get(reason, 0) + 1
return {
'total_trades': total_trades,
'win_rate': win_rate,
'total_profit': total_profit,
'avg_profit_per_trade': avg_profit_per_trade,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': profit_factor,
'max_consecutive_wins': max_consecutive_wins,
'max_consecutive_losses': max_consecutive_losses,
'avg_holding_bars': avg_holding_bars,
'exit_reasons': exit_reasons,
'winning_trades_count': len(winning_trades),
'losing_trades_count': len(losing_trades),
}
# ========================= 主程序 =========================
if __name__ == '__main__':
# 从CSV文件读取数据
csv_file = "kline_3.csv" # 请替换为你的CSV文件路径
read_data = []
try:
with open(csv_file, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
read_data.append({
'id': int(row['id']),
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close'])
})
print(f"成功读取 {len(read_data)} 条K线数据")
except FileNotFoundError:
print(f"文件 {csv_file} 未找到,请检查路径")
exit(1)
except Exception as e:
print(f"读取CSV文件时出错: {e}")
exit(1)
# 按时间排序
read_data.sort(key=lambda x: x['id'])
print("\n" + "=" * 60)
print("EMA交叉策略回测系统")
print("=" * 60)
# 策略参数选择
print("\n请选择EMA策略类型")
print("1. 双EMA交叉策略 (默认: EMA12/EMA26)")
print("2. 三EMA排列策略 (默认: EMA7/EMA14/EMA30)")
print("3. EMA+MACD双重确认策略")
choice = input("\n请输入选择 (1-3默认1): ").strip()
if choice == "2":
# 三EMA策略
backtester = EMABacktester(
fast_period=7,
slow_period=30,
use_triple_ema=True,
mid_period=14,
use_macd_confirmation=False,
stop_loss_pct=0.01, # 1.5%止损
take_profit_pct=0.4, # 4%止盈
trailing_stop_pct=0.04 # 2%移动止损
)
strategy_name = "三EMA排列策略(7/14/30)"
elif choice == "3":
# EMA+MACD策略
backtester = EMABacktester(
fast_period=12,
slow_period=26,
signal_period=9,
use_triple_ema=False,
use_macd_confirmation=True,
stop_loss_pct=0.1, # 1%止损
take_profit_pct=0.4, # 3%止盈
trailing_stop_pct=0.04 # 1.5%移动止损
)
strategy_name = "EMA+MACD双重确认策略(12/26/9)"
else:
# 默认双EMA策略
backtester = EMABacktester(
fast_period=12,
slow_period=26,
use_triple_ema=False,
use_macd_confirmation=False,
stop_loss_pct=0.02, # 2%止损
take_profit_pct=0.05, # 5%止盈
trailing_stop_pct=0.03 # 3%移动止损
)
strategy_name = "双EMA交叉策略(12/26)"
print(f"\n使用策略: {strategy_name}")
print("开始回测...")
# 执行回测
trades, stats = backtester.backtest(read_data)
# ========== 交易详情和盈利计算 ==========
print(f"\n{'=' * 60}")
print(f"回测结果 - {strategy_name}")
print(f"{'=' * 60}")
# 参数设定
contract_size = 10000 # 合约规模
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 平仓手续费率
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
print(f"\n交易详情 (共{len(trades)}笔):")
print("-" * 100)
for i, t in enumerate(trades, 1):
entry = t['entry']
exit = t['exit']
direction = t['direction']
# 原始价差
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# 净利润
net_profit = money_profit - fee
# 保存结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
# 输出交易详情
profit_color = "\033[92m" if net_profit > 0 else "\033[91m"
reset_color = "\033[0m"
print(f"{i:3d}. {t['entry_time'].strftime('%Y-%m-%d %H:%M')} -> "
f"{t['exit_time'].strftime('%Y-%m-%d %H:%M')} "
f"{direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} "
f"{profit_color}净利={net_profit:+.2f}{reset_color} "
f"(持有:{t.get('holding_bars', '?')}根K线, 原因:{t.get('exit_reason', '未知')})")
# ========== 汇总统计 ==========
total_net_profit = total_money_profit - total_fee
print(f"\n{'=' * 60}")
print("汇总统计")
print(f"{'=' * 60}")
print(f"总交易笔数: {len(trades)}")
print(f"总点差: {total_points_profit:.2f}")
print(f"总原始盈利(未扣费): {total_money_profit:.2f}")
print(f"总手续费: {total_fee:.2f}")
print(f"总净利润: {total_net_profit:.2f}")
# 深入分析
analysis = analyze_trades(trades, stats)
if analysis:
print(f"\n策略分析:")
print(f"- 胜率: {analysis['win_rate']:.2f}%")
print(f"- 平均每笔盈利: {analysis['avg_profit_per_trade']:.2f}")
print(f"- 平均盈利: {analysis['avg_win']:.2f}")
print(f"- 平均亏损: {analysis['avg_loss']:.2f}")
print(f"- 盈亏比: {analysis['profit_factor']:.2f}")
print(f"- 最大连续盈利: {analysis['max_consecutive_wins']}")
print(f"- 最大连续亏损: {analysis['max_consecutive_losses']}")
print(f"- 平均持仓K线数: {analysis['avg_holding_bars']:.1f}")
print(f"\n平仓原因统计:")
for reason, count in analysis['exit_reasons'].items():
percentage = count / len(trades) * 100
print(f" - {reason}: {count} 笔 ({percentage:.1f}%)")
# ========== 信号统计 ==========
print(f"\n{'=' * 60}")
print("信号统计")
print(f"{'=' * 60}")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
if count > 0:
win_rate = (wins / count * 100)
avg_p = total_p / count
profit_color = "\033[92m" if total_p > 0 else "\033[91m"
reset_color = "\033[0m"
print(f"{name}:")
print(f" 信号次数: {count}")
print(f" 胜率: {win_rate:.2f}%")
print(f" 总价差: {profit_color}{total_p:.2f}{reset_color}")
print(f" 平均价差: {avg_p:.2f}")
# ========== 风险指标 ==========
if len(trades) > 1:
returns = [t['net_profit'] for t in trades]
# 夏普比率(简化版)
avg_return = np.mean(returns)
std_return = np.std(returns)
sharpe_ratio = avg_return / std_return if std_return > 0 else 0
# 最大回撤
cumulative_returns = np.cumsum(returns)
running_max = np.maximum.accumulate(cumulative_returns)
drawdown = cumulative_returns - running_max
max_drawdown = abs(np.min(drawdown)) if len(drawdown) > 0 else 0
print(f"\n{'=' * 60}")
print("风险指标")
print(f"{'=' * 60}")
print(f"夏普比率(简化): {sharpe_ratio:.4f}")
print(f"最大回撤: {max_drawdown:.2f}")
# 盈亏分布
print(f"\n盈亏分布:")
profit_ranges = {
"大亏 (< -100)": len([r for r in returns if r < -100]),
"中亏 (-100 ~ -50)": len([r for r in returns if -100 <= r < -50]),
"小亏 (-50 ~ 0)": len([r for r in returns if -50 <= r < 0]),
"小盈 (0 ~ 50)": len([r for r in returns if 0 <= r < 50]),
"中盈 (50 ~ 100)": len([r for r in returns if 50 <= r < 100]),
"大盈 (> 100)": len([r for r in returns if r >= 100]),
}
for range_name, count in profit_ranges.items():
if count > 0:
percentage = count / len(returns) * 100
print(f" {range_name}: {count} 笔 ({percentage:.1f}%)")
print(f"\n{'=' * 60}")
print("回测完成!")
print(f"{'=' * 60}")