diff --git a/bitmart/test_api.py b/bitmart/test_api.py new file mode 100644 index 0000000..29b472a --- /dev/null +++ b/bitmart/test_api.py @@ -0,0 +1,29 @@ +"""测试 BitMart API K线获取""" +import time +from bitmart.api_contract import APIContract + +api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" +secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" +memo = "数据抓取" + +contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15)) + +# 测试获取最近1小时的15分钟K线 +end_time = int(time.time()) +start_time = end_time - 3600 # 1小时前 + +print(f"当前时间戳: {end_time}") +print(f"开始时间戳: {start_time}") +print(f"当前时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}") +print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}") + +try: + response = contractAPI.get_kline( + contract_symbol="ETHUSDT", + step=15, + start_time=start_time, + end_time=end_time + ) + print(f"\n响应: {response}") +except Exception as e: + print(f"\n错误: {e}") diff --git a/bitmart/回测-三分之一策略.py b/bitmart/回测-三分之一策略.py new file mode 100644 index 0000000..a9e3b06 --- /dev/null +++ b/bitmart/回测-三分之一策略.py @@ -0,0 +1,656 @@ +""" +量化交易回测系统 - 三分之一回归策略(优化版) + +========== 策略规则 ========== + +1. 开多条件: + - 找到实体>=0.1的前一根K线(如果前一根实体<0.1,继续往前找) + - 前一根是阴线(close < open) + - 当前K线的最高价(包括影线)涨到前一根阴线实体的 1/3 处 + - 即:当前high >= prev_close + (prev_open - prev_close) / 3 + +2. 平多/开空条件: + - 找到实体>=0.1的前一根K线(如果前一根实体<0.1,继续往前找) + - 前一根是阳线(close > open) + - 当前K线的最低价(包括影线)跌到前一根阳线实体的 1/3 处 + - 即:当前low <= prev_close - (prev_close - prev_open) / 3 + +3. 执行逻辑: + - 做多时遇到开空信号 -> 平多并反手开空 + - 做空时遇到开多信号 -> 平空并反手开多 + +4. 实体过滤: + - 如果前一根K线的实体部分(|open - close|)< 0.1,继续往前查找 + - 直到找到实体>=0.1的K线,再用那根K线来判断1/3位置 +""" + +import datetime +import calendar +import os +from typing import List, Dict, Optional +from loguru import logger +import pandas as pd +import mplfinance as mpf +import matplotlib.pyplot as plt +import matplotlib +try: + import plotly.graph_objects as go +except Exception: + go = None +from models.bitmart_klines import BitMartETH15M + +# 配置中文字体 +import matplotlib.font_manager as fm +import warnings + +# 忽略matplotlib的字体警告 +warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib.font_manager') +warnings.filterwarnings('ignore', message='.*Glyph.*missing.*', category=UserWarning) + +# 尝试设置中文字体,按优先级尝试 +chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi', 'FangSong', 'STSong', 'STHeiti'] +available_fonts = [f.name for f in fm.fontManager.ttflist] + +# 找到第一个可用的中文字体 +font_found = None +for font_name in chinese_fonts: + if font_name in available_fonts: + font_found = font_name + break + +if font_found: + plt.rcParams['font.sans-serif'] = [font_found] + ['DejaVu Sans'] + logger.info(f"使用中文字体: {font_found}") +else: + # 如果没有找到中文字体,尝试使用系统默认字体 + plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS', 'DejaVu Sans'] + logger.warning("未找到中文字体,使用默认配置") + +plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 +plt.rcParams['font.size'] = 10 # 设置默认字体大小 + +# 尝试清除字体缓存(如果可能) +try: + # 不强制重建,避免性能问题 + pass +except: + pass + +# 获取当前脚本所在目录 +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# ========================= 工具函数 ========================= + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +def get_body_size(candle): + """计算K线实体大小(绝对值)""" + return abs(float(candle['open']) - float(candle['close'])) + + +def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): + """ + 从当前索引往前查找,直到找到实体>=min_body_size的K线 + 返回:(有效K线的索引, K线数据) 或 (None, None) + """ + if current_idx <= 0: + return None, None + + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + body_size = get_body_size(prev) + if body_size >= min_body_size: + return i, prev + + return None, None + + +def get_one_third_level(prev): + """ + 计算前一根K线实体的 1/3 回归位置 + 返回:(触发价格, 方向) + - 如果前一根是阴线:返回向上1/3价格,方向为 'long' + - 如果前一根是阳线:返回向下1/3价格,方向为 'short' + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + + if is_bearish(prev): # 阴线,向上回归 + # 阴线实体 = open - close + body = p_open - p_close + trigger_price = p_close + body / 3 # 从低点涨 1/3 + return trigger_price, 'long' + + elif is_bullish(prev): # 阳线,向下回归 + # 阳线实体 = close - open + body = p_close - p_open + trigger_price = p_close - body / 3 # 从高点跌 1/3 + return trigger_price, 'short' + + return None, None + + +def check_trigger(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1): + """ + 检查当前K线是否触发了交易信号 + 返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None) + 规则:考虑影线部分(high/low),因为实际交易中价格会到达影线位置 + """ + if current_idx <= 0: + return None, None, None + + curr = all_data[current_idx] + + # 查找实体>=0.1的前一根K线 + valid_prev_idx, prev = find_valid_prev_bar(all_data, current_idx, min_body_size) + + if prev is None: + return None, None, None + + trigger_price, direction = get_one_third_level(prev) + + if trigger_price is None: + return None, None, None + + # 使用影线部分(high/low)来判断,因为实际交易中价格会到达这些位置 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 做多:前一根阴线,当前K线的最高价(包括影线)达到触发价格 + if direction == 'long' and c_high >= trigger_price: + return 'long', trigger_price, valid_prev_idx + + # 做空:前一根阳线,当前K线的最低价(包括影线)达到触发价格 + if direction == 'short' and c_low <= trigger_price: + return 'short', trigger_price, valid_prev_idx + + return None, None, None + + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + if data: + data.sort(key=lambda x: x['id']) + + return data + + +# ========================= 回测逻辑 ========================= + +def backtest_one_third_strategy(dates: List[str]): + """三分之一回归策略回测""" + all_data: List[Dict] = [] + + for d in dates: + day_data = get_data_by_date(BitMartETH15M, d) + all_data.extend(day_data) + + logger.info(f"总共查询了 {len(dates)} 天,获取到 {len(all_data)} 条K线数据") + + if not all_data: + logger.warning("未获取到任何数据") + return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0}, + 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}} + + all_data.sort(key=lambda x: x['id']) + + if len(all_data) > 1: + first_time = datetime.datetime.fromtimestamp(all_data[0]['id'] / 1000) + last_time = datetime.datetime.fromtimestamp(all_data[-1]['id'] / 1000) + logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M')} 到 {last_time.strftime('%Y-%m-%d %H:%M')}") + + # 验证排序:打印前5条数据 + logger.info("===== 前5条数据(验证排序)=====") + for i in range(min(5, len(all_data))): + d = all_data[i] + t = datetime.datetime.fromtimestamp(d['id'] / 1000).strftime('%Y-%m-%d %H:%M:%S') + k_type = "阳线" if is_bullish(d) else ("阴线" if is_bearish(d) else "十字星") + logger.info(f" [{i}] {t} | {k_type} | O={d['open']:.2f} H={d['high']:.2f} L={d['low']:.2f} C={d['close']:.2f}") + + stats = { + 'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'}, + 'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'}, + } + + trades: List[Dict] = [] + current_position: Optional[Dict] = None + + for idx in range(1, len(all_data)): + curr = all_data[idx] + + # 使用新的check_trigger函数,它会自动查找实体>=0.1的前一根K线 + direction, trigger_price, valid_prev_idx = check_trigger(all_data, idx, min_body_size=0.1) + + # 获取有效的前一根K线用于日志输出 + valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None + + # 空仓时,有信号就开仓 + if current_position is None: + if direction and valid_prev is not None: + # 调试:打印开仓时的K线信息 + prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M') + curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') + prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星") + curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星") + prev_body = get_body_size(valid_prev) + logger.info(f"【开仓】{direction} @ {trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}") + logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}") + + current_position = { + 'direction': direction, + 'entry_price': trigger_price, + 'entry_time': curr['id'], + 'entry_bar': idx + } + stats[direction]['count'] += 1 + continue + + # 有仓位时,检查是否有反向信号 + pos_dir = current_position['direction'] + + if direction and direction != pos_dir and valid_prev is not None: + # 平仓 + exit_price = trigger_price + + if pos_dir == 'long': + diff = exit_price - current_position['entry_price'] + else: + diff = current_position['entry_price'] - exit_price + + # 调试:打印平仓时的K线信息 + prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M') + curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M') + prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星") + curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星") + prev_body = get_body_size(valid_prev) + logger.info(f"【平仓反手】{pos_dir} -> {direction} @ {exit_price:.2f}, 盈亏: {diff:.2f}") + logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}") + logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}") + + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000), + 'entry_time_ms': current_position['entry_time'], + 'exit_time_ms': curr['id'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + + stats[pos_dir]['total_profit'] += diff + if diff > 0: + stats[pos_dir]['wins'] += 1 + + # 反手开仓 + current_position = { + 'direction': direction, + 'entry_price': trigger_price, + 'entry_time': curr['id'], + 'entry_bar': idx + } + stats[direction]['count'] += 1 + + # 尾仓处理 + if current_position: + last = all_data[-1] + exit_price = float(last['close']) + pos_dir = current_position['direction'] + + if pos_dir == 'long': + diff = exit_price - current_position['entry_price'] + else: + diff = current_position['entry_price'] - exit_price + + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000), + 'entry_time_ms': current_position['entry_time'], + 'exit_time_ms': last['id'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_dir]['total_profit'] += diff + if diff > 0: + stats[pos_dir]['wins'] += 1 + + return trades, stats, all_data + + +# ========================= 绘图函数 ========================= +def plot_trades(all_data: List[Dict], trades: List[Dict], save_path: str = None): + """ + 绘制K线图并标注交易点位 + """ + if not all_data: + logger.warning("没有数据可绘制") + return + + # 转换为 DataFrame + df = pd.DataFrame(all_data) + df['datetime'] = pd.to_datetime(df['id'], unit='ms') + df.set_index('datetime', inplace=True) + df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'}) + df = df[['Open', 'High', 'Low', 'Close']] + + # 准备标记点 + buy_signals = [] # 做多开仓 + sell_signals = [] # 做空开仓 + buy_exits = [] # 做多平仓 + sell_exits = [] # 做空平仓 + + for trade in trades: + entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms') + exit_time = pd.to_datetime(trade['exit_time_ms'], unit='ms') + direction = trade['direction'] + entry_price = trade['entry'] + exit_price = trade['exit'] + + if direction == '做多': + buy_signals.append((entry_time, entry_price)) + buy_exits.append((exit_time, exit_price)) + else: + sell_signals.append((entry_time, entry_price)) + sell_exits.append((exit_time, exit_price)) + + # 创建标记序列 + buy_markers = pd.Series(index=df.index, dtype=float) + sell_markers = pd.Series(index=df.index, dtype=float) + buy_exit_markers = pd.Series(index=df.index, dtype=float) + sell_exit_markers = pd.Series(index=df.index, dtype=float) + + for t, p in buy_signals: + if t in buy_markers.index: + buy_markers[t] = p + for t, p in sell_signals: + if t in sell_markers.index: + sell_markers[t] = p + for t, p in buy_exits: + if t in buy_exit_markers.index: + buy_exit_markers[t] = p + for t, p in sell_exits: + if t in sell_exit_markers.index: + sell_exit_markers[t] = p + + # 添加标记 + add_plots = [] + + if buy_markers.notna().any(): + add_plots.append(mpf.make_addplot(buy_markers, type='scatter', markersize=100, + marker='^', color='green', label='做多开仓')) + if sell_markers.notna().any(): + add_plots.append(mpf.make_addplot(sell_markers, type='scatter', markersize=100, + marker='v', color='red', label='做空开仓')) + if buy_exit_markers.notna().any(): + add_plots.append(mpf.make_addplot(buy_exit_markers, type='scatter', markersize=80, + marker='x', color='darkgreen', label='做多平仓')) + if sell_exit_markers.notna().any(): + add_plots.append(mpf.make_addplot(sell_exit_markers, type='scatter', markersize=80, + marker='x', color='darkred', label='做空平仓')) + + # 绘制K线图(更接近交易所风格) + market_colors = mpf.make_marketcolors( + up='#26a69a', # 常见交易所绿色 + down='#ef5350', # 常见交易所红色 + edge='inherit', + wick='inherit', + volume='inherit' + ) + style = mpf.make_mpf_style( + base_mpf_style='binance', + marketcolors=market_colors, + gridstyle='-', + gridcolor='#e6e6e6' + ) + + fig, axes = mpf.plot( + df, + type='candle', + style=style, + title='三分之一回归策略回测', + ylabel='价格', + addplot=add_plots if add_plots else None, + figsize=(16, 9), + returnfig=True + ) + + # 添加图例 + axes[0].legend(['做多开仓 ▲', '做空开仓 ▼', '做多平仓 ✕', '做空平仓 ✕'], loc='upper left') + + # 标注开仓细节(方向、价格、时间) + if trades: + ax = axes[0] + max_annotate = 60 # 过多会拥挤,可按需调大/调小 + annotated = 0 + for i, trade in enumerate(trades): + if annotated >= max_annotate: + break + entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms') + if entry_time not in df.index: + continue + entry_price = trade['entry'] + direction = trade['direction'] + color = 'green' if direction == '做多' else 'red' + text = f"{direction} @ {entry_price:.2f}\n{entry_time.strftime('%m-%d %H:%M')}" + y_offset = 20 if (i % 2 == 0) else -30 + ax.annotate( + text, + xy=(entry_time, entry_price), + xytext=(0, y_offset), + textcoords='offset points', + ha='center', + va='bottom' if y_offset > 0 else 'top', + fontsize=8, + color=color, + arrowprops=dict(arrowstyle='->', color=color, lw=0.6, alpha=0.6) + ) + annotated += 1 + + if save_path: + plt.savefig(save_path, dpi=150, bbox_inches='tight') + logger.info(f"图表已保存到: {save_path}") + + plt.show() + + +def plot_trades_interactive(all_data: List[Dict], trades: List[Dict], html_path: str = None): + """ + 交互式K线图(TradingView风格:支持缩放、时间区间/价格区间平移缩放) + """ + if not all_data: + logger.warning("没有数据可绘制") + return + if go is None: + logger.warning("未安装 plotly,无法绘制交互式图。请先安装:pip install plotly") + return + + df = pd.DataFrame(all_data) + df['datetime'] = pd.to_datetime(df['id'], unit='ms') + df.sort_values('datetime', inplace=True) + + fig = go.Figure( + data=[ + go.Candlestick( + x=df['datetime'], + open=df['open'], + high=df['high'], + low=df['low'], + close=df['close'], + increasing_line_color='#26a69a', + decreasing_line_color='#ef5350', + name='K线' + ) + ] + ) + + # 标注开仓点 + if trades: + entry_x = [] + entry_y = [] + entry_text = [] + entry_color = [] + for t in trades: + entry_x.append(pd.to_datetime(t['entry_time_ms'], unit='ms')) + entry_y.append(t['entry']) + entry_text.append(f"{t['direction']} @ {t['entry']:.2f}
{t['entry_time']}") + entry_color.append('#26a69a' if t['direction'] == '做多' else '#ef5350') + + fig.add_trace( + go.Scatter( + x=entry_x, + y=entry_y, + mode='markers', + marker=dict(size=8, color=entry_color), + name='开仓', + text=entry_text, + hoverinfo='text' + ) + ) + + # TradingView风格:深色背景 + 交互缩放 + fig.update_layout( + title='三分之一回归策略回测(交互式)', + xaxis=dict( + rangeslider=dict(visible=True), + type='date', + showgrid=False + ), + yaxis=dict( + showgrid=False, + fixedrange=False + ), + plot_bgcolor='#0b0e11', + paper_bgcolor='#0b0e11', + font=dict(color='#d1d4dc'), + hovermode='x unified', + dragmode='zoom' + ) + + fig.show() + if html_path: + fig.write_html(html_path) + logger.info(f"交互图已保存到: {html_path}") + + +# ========================= 主程序 ========================= +if __name__ == '__main__': + # ==================== 配置参数 ==================== + START_DATE = "2025-01-01" + END_DATE = "2025-12-31" + + # ==================== 生成日期列表 ==================== + dates = [] + if START_DATE and END_DATE: + start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d') + end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d') + + current_dt = start_dt + while current_dt <= end_dt: + dates.append(current_dt.strftime('%Y-%m-%d')) + current_dt += datetime.timedelta(days=1) + + logger.info(f"查询日期范围:{START_DATE} 到 {END_DATE},共 {len(dates)} 天") + + # ==================== 执行回测 ==================== + trades, stats, all_data = backtest_one_third_strategy(dates) + + # ==================== 输出结果 ==================== + logger.info("===== 每笔交易详情 =====") + + contract_size = 10000 + open_fee_fixed = 5 + close_fee_rate = 0.0005 + + total_points_profit = 0 + total_money_profit = 0 + total_fee = 0 + + for t in trades: + entry = t['entry'] + exit_p = t['exit'] + direction = t['direction'] + + point_diff = t['diff'] + money_profit = point_diff / entry * contract_size + fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate) + net_profit = money_profit - fee + + t.update({ + 'point_diff': point_diff, + 'raw_profit': money_profit, + 'fee': fee, + 'net_profit': net_profit + }) + + total_points_profit += point_diff + total_money_profit += money_profit + total_fee += fee + + logger.info( + f"{t['entry_time']} {direction} " + f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} " + f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f}" + ) + + # ==================== 汇总统计 ==================== + total_net_profit = total_money_profit - total_fee + + print(f"\n{'='*60}") + print(f"【三分之一回归策略 回测结果】") + print(f"{'='*60}") + print(f"交易笔数:{len(trades)}") + print(f"总点差:{total_points_profit:.2f}") + print(f"总原始盈利:{total_money_profit:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net_profit:.2f}") + print(f"{'='*60}") + + print("\n===== 方向统计 =====") + for k, v in stats.items(): + count = v['count'] + wins = v['wins'] + total_p = v['total_profit'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + avg_p = (total_p / count) if count > 0 else 0.0 + print(f"{v['name']}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}") + + # ==================== 绘制图表 ==================== + if trades and all_data: + # 如果数据太多,只绘制最近一部分(比如最近500根K线) + max_bars = 500 + if len(all_data) > max_bars: + logger.info(f"数据量较大({len(all_data)}条),只绘制最近 {max_bars} 根K线") + plot_data = all_data[-max_bars:] + # 过滤出在这个时间范围内的交易 + min_time = datetime.datetime.fromtimestamp(plot_data[0]['id'] / 1000) + plot_trades_filtered = [t for t in trades if t['entry_time'] >= min_time] + else: + plot_data = all_data + plot_trades_filtered = trades + + save_path = os.path.join(SCRIPT_DIR, '回测图表.png') + plot_trades(plot_data, plot_trades_filtered, save_path=save_path) + # 交互式版本(TradingView风格):支持时间区间/价格缩放 + html_path = os.path.join(SCRIPT_DIR, '回测图表_交互式.html') + plot_trades_interactive(plot_data, plot_trades_filtered, html_path=html_path) diff --git a/bitmart/回测图表.png b/bitmart/回测图表.png new file mode 100644 index 0000000..a944312 Binary files /dev/null and b/bitmart/回测图表.png differ diff --git a/bitmart/回测图表_交互式.html b/bitmart/回测图表_交互式.html new file mode 100644 index 0000000..c13a2f4 --- /dev/null +++ b/bitmart/回测图表_交互式.html @@ -0,0 +1,3888 @@ + + + +
+
+ + \ No newline at end of file diff --git a/bitmart/抓取多周期K线.py b/bitmart/抓取多周期K线.py new file mode 100644 index 0000000..ff19b00 --- /dev/null +++ b/bitmart/抓取多周期K线.py @@ -0,0 +1,279 @@ +""" +BitMart 多周期K线数据抓取脚本 +支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据 +自动创建对应的数据库表 +""" + +import time +import datetime +from pathlib import Path +from loguru import logger +from peewee import * +from bitmart.api_contract import APIContract + +# 数据库配置 +DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db' +db = SqliteDatabase(str(DB_PATH)) + +# K线周期配置:step值 -> 表名后缀 +KLINE_CONFIGS = { + 1: '1m', # 1分钟 + 3: '3m', # 3分钟 + 5: '5m', # 5分钟 + 15: '15m', # 15分钟 + 30: '30m', # 30分钟 + 60: '1h', # 1小时 +} + + +def create_kline_model(step: int): + """ + 动态创建K线数据模型 + :param step: K线周期(分钟) + :return: Model类 + """ + suffix = KLINE_CONFIGS.get(step, f'{step}m') + tbl_name = f'bitmart_eth_{suffix}' + + # 使用 type() 动态创建类,避免闭包问题 + attrs = { + 'id': BigIntegerField(primary_key=True), + 'open': FloatField(null=True), + 'high': FloatField(null=True), + 'low': FloatField(null=True), + 'close': FloatField(null=True), + } + + # 创建 Meta 类 + meta_attrs = { + 'database': db, + 'table_name': tbl_name, + } + Meta = type('Meta', (), meta_attrs) + attrs['Meta'] = Meta + + # 动态创建 Model 类 + model_name = f'BitMartETH{suffix.upper()}' + KlineModel = type(model_name, (Model,), attrs) + + return KlineModel + + +class BitMartMultiKlineCollector: + """多周期K线数据抓取器""" + + def __init__(self): + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "数据抓取" + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + # 存储各周期的模型 + self.models = {} + + # 初始化数据库连接和表 + self._init_database() + + def _init_database(self): + """初始化数据库,创建所有周期的表""" + db.connect(reuse_if_open=True) + + for step in KLINE_CONFIGS.keys(): + model = create_kline_model(step) + self.models[step] = model + # 创建表(如果不存在) + db.create_tables([model], safe=True) + logger.info(f"初始化表: {model._meta.table_name}") + + def get_klines(self, step: int, start_time: int, end_time: int): + """ + 获取K线数据 + :param step: K线周期(分钟) + :param start_time: 开始时间戳(秒级) + :param end_time: 结束时间戳(秒级) + :return: K线数据列表 + """ + try: + # 确保是整数 + start_time = int(start_time) + end_time = int(end_time) + + logger.debug(f"API请求: step={step}, start={start_time}, end={end_time}") + + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=step, + start_time=start_time, + end_time=end_time + )[0] + + if response['code'] != 1000: + logger.error(f"获取 {step}分钟 K线失败: {response}") + return [] + + klines = response.get('data', []) + formatted = [] + for k in klines: + timestamp_ms = int(k["timestamp"]) * 1000 + formatted.append({ + 'id': timestamp_ms, + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + logger.error(f"获取 {step}分钟 K线异常: {e}") + return [] + + def save_klines(self, step: int, klines: list): + """ + 保存K线数据到数据库 + :param step: K线周期 + :param klines: K线数据列表 + :return: 保存的数量 + """ + model = self.models.get(step) + if not model: + logger.error(f"未找到 {step}分钟 的数据模型") + return 0 + + saved_count = 0 + for kline in klines: + try: + model.get_or_create( + id=kline['id'], + defaults={ + 'open': kline['open'], + 'high': kline['high'], + 'low': kline['low'], + 'close': kline['close'], + } + ) + saved_count += 1 + except Exception as e: + logger.error(f"保存 {step}分钟 K线数据失败 {kline['id']}: {e}") + + return saved_count + + def collect_single_period(self, step: int, start_date: str = None, days: int = None): + """ + 抓取单个周期的历史数据(从当前时间向前抓取,直到遇到API限制) + :param step: K线周期(分钟) + :param start_date: 起始日期 'YYYY-MM-DD'(目标,可能无法达到) + :param days: 抓取天数(目标,可能无法达到) + """ + suffix = KLINE_CONFIGS.get(step, f'{step}m') + now = int(time.time()) + + if start_date: + start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') + target_start_time = int(start_dt.timestamp()) + logger.info(f"开始抓取 {suffix} K线数据: 目标从 {start_date} 开始(从现在向前抓取)") + elif days: + target_start_time = now - 3600 * 24 * days + logger.info(f"开始抓取 {suffix} K线数据: 目标最近 {days} 天") + else: + target_start_time = now - 3600 * 24 * 30 + logger.info(f"开始抓取 {suffix} K线数据: 目标最近 30 天") + + # 根据周期调整批次大小 + if step <= 5: + batch_seconds = 3600 * 6 # 小周期每次6小时 + elif step <= 30: + batch_seconds = 3600 * 24 # 中周期每次1天 + else: + batch_seconds = 3600 * 24 * 3 # 大周期每次3天 + + total_saved = 0 + fail_count = 0 + max_fail = 3 + + # 从当前时间向前抓取 + current_end = now + while current_end > target_start_time: + current_start = current_end - batch_seconds + + # 打印时间范围 + start_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start)) + end_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end)) + logger.info(f"[{suffix}] 抓取: {start_str} -> {end_str}") + + klines = self.get_klines(step, current_start, current_end) + if klines: + saved = self.save_klines(step, klines) + total_saved += saved + logger.info(f"[{suffix}] 保存 {saved} 条,累计 {total_saved} 条") + fail_count = 0 + else: + fail_count += 1 + logger.warning(f"[{suffix}] 未获取到数据 (连续失败 {fail_count} 次)") + if fail_count >= max_fail: + earliest = time.strftime('%Y-%m-%d', time.localtime(current_end)) + logger.warning(f"[{suffix}] 达到API历史数据限制,最早可用数据约 {earliest}") + break + + current_end = current_start + time.sleep(0.3) # API请求间隔 + + logger.success(f"[{suffix}] 抓取完成,共保存 {total_saved} 条数据") + return total_saved + + def collect_all_periods(self, start_date: str = None, days: int = None, + periods: list = None): + """ + 抓取所有周期的历史数据 + :param start_date: 起始日期 'YYYY-MM-DD' + :param days: 抓取天数 + :param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部 + """ + if periods is None: + periods = list(KLINE_CONFIGS.keys()) + + logger.info(f"开始抓取多周期K线数据,周期: {[KLINE_CONFIGS[p] for p in periods]}") + + results = {} + for step in periods: + if step not in KLINE_CONFIGS: + logger.warning(f"不支持的周期: {step}分钟,跳过") + continue + + logger.info(f"\n{'='*50}") + logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线") + logger.info(f"{'='*50}") + + saved = self.collect_single_period(step, start_date, days) + results[KLINE_CONFIGS[step]] = saved + + time.sleep(1) # 不同周期之间间隔 + + logger.info(f"\n{'='*50}") + logger.info("所有周期抓取完成!统计:") + for period, count in results.items(): + logger.info(f" {period}: {count} 条") + logger.info(f"{'='*50}") + + return results + + def close(self): + """关闭数据库连接""" + if not db.is_closed(): + db.close() + + +if __name__ == '__main__': + collector = BitMartMultiKlineCollector() + + try: + # 抓取尽可能多的历史数据(从现在向前,直到遇到API限制自动停止) + # 目标:2025-01-01,但实际能抓取多少取决于 BitMart API 的历史数据限制 + collector.collect_all_periods( + start_date='2025-01-01', # 目标起始日期 + periods=[1, 3, 5, 15, 30, 60] # 所有周期 + ) + finally: + collector.close() diff --git a/models/bitmart_klines.py b/models/bitmart_klines.py new file mode 100644 index 0000000..d80f162 --- /dev/null +++ b/models/bitmart_klines.py @@ -0,0 +1,97 @@ +""" +BitMart 多周期K线数据模型 +包含 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据表 +""" + +from peewee import * +from models import db + + +# ==================== 1分钟 K线 ==================== +class BitMartETH1M(Model): + id = BigIntegerField(primary_key=True) # 时间戳(毫秒级) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_1m' + + +# ==================== 3分钟 K线 ==================== +class BitMartETH3M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_3m' + + +# ==================== 5分钟 K线 ==================== +class BitMartETH5M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_5m' + + +# ==================== 15分钟 K线 ==================== +class BitMartETH15M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_15m' + + +# ==================== 30分钟 K线 ==================== +class BitMartETH30M(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_30m' + + +# ==================== 1小时 K线 ==================== +class BitMartETH1H(Model): + id = BigIntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_eth_1h' + + +# 连接数据库并创建表 +db.connect(reuse_if_open=True) +db.create_tables([ + BitMartETH1M, + BitMartETH3M, + BitMartETH5M, + BitMartETH15M, + BitMartETH30M, + BitMartETH1H, +], safe=True) diff --git a/交易/bitmart-三分之一策略交易.py b/交易/bitmart-三分之一策略交易.py new file mode 100644 index 0000000..0bb5ec0 --- /dev/null +++ b/交易/bitmart-三分之一策略交易.py @@ -0,0 +1,557 @@ +""" +BitMart 三分之一回归策略交易 +使用5分钟K线周期 + +策略规则: +1. 开多条件: + - 找到实体>=0.1的前一根K线(如果前一根实体<0.1,继续往前找) + - 前一根是阴线(close < open) + - 当前K线的最高价(包括影线)涨到前一根阴线实体的 1/3 处 + - 即:当前high >= prev_close + (prev_open - prev_close) / 3 + +2. 平多/开空条件: + - 找到实体>=0.1的前一根K线 + - 前一根是阳线(close > open) + - 当前K线的最低价(包括影线)跌到前一根阳线实体的 1/3 处 + - 即:当前low <= prev_close - (prev_close - prev_open) / 3 + +3. 执行逻辑: + - 做多时遇到开空信号 -> 平多并反手开空 + - 做空时遇到开多信号 -> 平空并反手开多 +""" + +import time +import datetime + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract +from 交易.tools import send_dingtalk_message + + +class BitmartOneThirdStrategy: + def __init__(self, bit_id): + + self.page: ChromiumPage | None = None + + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 + self.direction = None + + self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期 + + self.last_kline_time = None + + self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) + self.open_type = "cross" # 全仓模式 + self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% + + self.open_avg_price = None # 开仓价格 + self.current_amount = None # 持仓量 + + self.bit_id = bit_id + + # 三分之一策略参数 + self.min_body_size = 0.1 # 最小实体大小 + self.kline_step = 5 # K线周期(5分钟) + self.kline_count = 20 # 获取的K线数量,用于向前查找有效K线 + + # ========================= 三分之一策略核心函数 ========================= + + def is_bullish(self, c): + """判断阳线""" + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): + """判断阴线""" + return float(c['close']) < float(c['open']) + + def get_body_size(self, candle): + """计算K线实体大小(绝对值)""" + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1): + """ + 从当前索引往前查找,直到找到实体>=min_body_size的K线 + 返回:(有效K线的索引, K线数据) 或 (None, None) + """ + if current_idx <= 0: + return None, None + + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + body_size = self.get_body_size(prev) + if body_size >= min_body_size: + return i, prev + + return None, None + + def get_one_third_level(self, prev): + """ + 计算前一根K线实体的 1/3 回归位置 + 返回:(触发价格, 方向) + - 如果前一根是阴线:返回向上1/3价格,方向为 'long' + - 如果前一根是阳线:返回向下1/3价格,方向为 'short' + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + + if self.is_bearish(prev): # 阴线,向上回归 + # 阴线实体 = open - close + body = p_open - p_close + trigger_price = p_close + body / 3 # 从低点涨 1/3 + return trigger_price, 'long' + + elif self.is_bullish(prev): # 阳线,向下回归 + # 阳线实体 = close - open + body = p_close - p_open + trigger_price = p_close - body / 3 # 从高点跌 1/3 + return trigger_price, 'short' + + return None, None + + def check_trigger(self, all_data, current_idx): + """ + 检查当前K线是否触发了交易信号 + 返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None) + 规则:考虑影线部分(high/low),因为实际交易中价格会到达影线位置 + """ + if current_idx <= 0: + return None, None, None + + curr = all_data[current_idx] + + # 查找实体>=min_body_size的前一根K线 + valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size) + + if prev is None: + return None, None, None + + trigger_price, direction = self.get_one_third_level(prev) + + if trigger_price is None: + return None, None, None + + # 使用影线部分(high/low)来判断 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 做多:前一根阴线,当前K线的最高价(包括影线)达到触发价格 + if direction == 'long' and c_high >= trigger_price: + return 'long', trigger_price, valid_prev_idx + + # 做空:前一根阳线,当前K线的最低价(包括影线)达到触发价格 + if direction == 'short' and c_low <= trigger_price: + return 'short', trigger_price, valid_prev_idx + + return None, None, None + + # ========================= BitMart API 函数 ========================= + + def get_klines(self): + """获取最近N根5分钟K线""" + try: + end_time = int(time.time()) + # 获取足够多的K线用于向前查找有效K线 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, # 5分钟 + start_time=end_time - 3600 * 3, # 取最近3小时 + end_time=end_time + )[0]["data"] + + # 每根: [timestamp, open, high, low, close, volume] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + error_msg = str(e) + # 检查是否是429限流错误 + if "429" in error_msg or "too many requests" in error_msg.lower(): + logger.warning(f"API限流,等待60秒后重试: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding(msg="获取K线异常", error=True) + return None + + def get_current_price(self): + """获取当前最新价格""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, # 1分钟 + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response['code'] == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + """获取合约账户可用USDT余额""" + try: + response = self.contractAPI.get_assets_detail()[0] + if response['code'] == 1000: + data = response['data'] + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + elif isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + """获取当前持仓方向""" + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response['code'] == 1000: + positions = response['data'] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0]['position_type'] == 1 else -1 + self.open_avg_price = positions[0]['open_avg_price'] + self.current_amount = positions[0]['current_amount'] + self.position_cross = positions[0]["position_cross"] + return True + else: + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + """程序启动时设置全仓 + 高杠杆""" + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response['code'] == 1000: + logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") + return True + else: + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器自动化函数 ========================= + + def openBrowser(self): + """打开 TGE 对应浏览器实例""" + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except: + return False + + def close_extra_tabs_in_browser(self): + """关闭多余 tab""" + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except: + return False + + def click_safe(self, xpath, sleep=0.5): + """安全点击""" + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except: + return False + + def 平仓(self): + """市价平仓""" + logger.info("执行平仓操作...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding(msg="执行平仓操作") + + def 开单(self, marketPriceLongOrder=0, size=None): + """ + 市价开单 + marketPriceLongOrder: 1 做多, -1 做空 + """ + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str}操作,金额: {size}") + + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + + self.ding(msg=f"执行{direction_str}操作,金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + """统一消息格式""" + prefix = "❌三分之一策略:" if error else "🔔三分之一策略:" + if error: + logger.error(msg) + for i in range(10): + send_dingtalk_message(f"{prefix}{msg}") + else: + logger.info(msg) + send_dingtalk_message(f"{prefix}{msg}") + + # ========================= 时间计算函数 ========================= + + def get_now_time(self): + """获取当前5分钟整点时间戳""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的5分钟整点 + minute = current_datetime.minute + target_minute = (minute // 5) * 5 # 向下取整到5分钟 + target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0) + + return int(target_datetime.timestamp()) + + def get_time_to_next_5min(self): + """获取距离下一个5分钟的秒数""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + minute = current_datetime.minute + next_5min = ((minute // 5) + 1) * 5 + if next_5min >= 60: + next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1) + else: + next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0) + + return (next_datetime - current_datetime).total_seconds() + + # ========================= 主运行函数 ========================= + + def action(self): + """主运行逻辑""" + # 启动时设置全仓高杠杆 + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + return + + # 1. 打开浏览器 + if not self.openBrowser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + else: + logger.info('关闭多余标签页失败') + + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80) + + self.time_start = None # 时间状态,避免同一时段重复处理 + + while True: + # 更新进度条 + current_time = time.localtime() + current_minute = current_time.tm_min + self.pbar.n = current_minute % 5 + self.pbar.refresh() + + # 检查是否已处理过当前时间段 + if self.time_start == self.get_now_time(): + time.sleep(3) + continue + + # 获取K线数据 + kline_data = self.get_klines() + if not kline_data: + logger.warning("获取K线数据失败") + time.sleep(5) + continue + + # 检查数据是否是最新的 + if len(kline_data) < 3: + logger.warning("K线数据不足") + time.sleep(5) + continue + + # 判断最新K线时间 + latest_kline_time = kline_data[-1]['id'] + if self.get_now_time() != latest_kline_time: + time.sleep(3) + continue + + self.time_start = self.get_now_time() + + # 获取持仓状态 + if not self.get_position_status(): + logger.warning("获取仓位信息失败") + self.ding(msg="获取仓位信息失败!", error=True) + continue + + logger.info(f"当前持仓状态: {self.start} (1=多, -1=空, 0=无)") + + # ========== 三分之一策略信号检测 ========== + current_idx = len(kline_data) - 1 + direction, trigger_price, valid_prev_idx = self.check_trigger(kline_data, current_idx) + + if direction: + # 获取有效前一根K线用于日志 + valid_prev = kline_data[valid_prev_idx] if valid_prev_idx is not None else None + curr = kline_data[current_idx] + + if valid_prev: + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + curr_time = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + logger.info(f"检测到{direction}信号,触发价格: {trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f}") + logger.info(f" 当前根[{curr_time}]: H={curr['high']:.2f} L={curr['low']:.2f}") + + # ========== 执行交易逻辑 ========== + balance = self.get_available_balance() + if balance is None: + balance = 0 + trade_size = balance * self.risk_percent + + if direction == "long": + if self.start == -1: # 当前空仓,平空开多 + logger.info("平空仓,反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + elif self.start == 0: # 当前无仓,直接开多 + logger.info("无仓位,开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + # 已有多仓则不操作 + + elif direction == "short": + if self.start == 1: # 当前多仓,平多开空 + logger.info("平多仓,反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + elif self.start == 0: # 当前无仓,直接开空 + logger.info("无仓位,开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + # 已有空仓则不操作 + + # ========== 发送持仓信息 ========== + self._send_position_message(kline_data[-1]) + + self.pbar.reset() + + def _send_position_message(self, latest_kline): + """发送持仓信息到钉钉""" + current_price = float(latest_kline["close"]) + balance = self.get_available_balance() + self.balance = balance if balance is not None else 0.0 + + if self.start != 0: + open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 + current_amount = float(self.current_amount) if self.current_amount else 0.0 + position_cross = float(self.position_cross) if hasattr(self, 'position_cross') and self.position_cross else 0.0 + + # 计算浮动盈亏 + if self.start == 1: # 多头 + unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) + else: # 空头 + unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) + + # 计算收益率 + if open_avg_price > 0: + if self.start == 1: + pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 + else: + pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 + rate_str = f" ({pnl_rate:+.2f}%)" + else: + rate_str = "" + + direction_str = "空" if self.start == -1 else "多" + pnl_str = f"{unrealized_pnl:+.2f} USDT" + + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:{direction_str}\n" + f"当前现价:{current_price:.2f} USDT\n" + f"开仓均价:{open_avg_price:.2f} USDT\n" + f"持仓量(eth):{float(current_amount) / 1000} eth\n" + f"持仓量(usdt):{position_cross} usdt\n" + f"浮动盈亏:{pnl_str}{rate_str}\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + else: + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:无\n" + f"当前现价:{current_price:.2f} USDT\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + + self.ding(msg=msg) + + +if __name__ == '__main__': + # 启动三分之一策略交易 + BitmartOneThirdStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()