Compare commits

...

10 Commits

Author SHA1 Message Date
ddrwode
f68b23fcf1 优化交易代码 2026-02-02 13:11:09 +08:00
27942
6c43150cbe 加入精准回测数据 2026-02-02 11:33:39 +08:00
ddrwode
0b8e7a59a2 优化交易代码 2026-02-02 11:09:02 +08:00
ddrwode
8cfaf1d5e9 优化交易代码 2026-02-02 11:00:45 +08:00
ddrwode
b398e1cac2 优化交易代码 2026-02-02 10:55:54 +08:00
Administrator
ea909a02d8 bitmart优化完成 2026-02-02 10:48:39 +08:00
27942
1043d000be 加入精准回测数据 2026-02-02 01:13:14 +08:00
27942
a65f5d8167 加入精准回测数据 2026-02-02 01:10:16 +08:00
27942
251f794ea8 加入精准回测数据 2026-02-02 00:34:04 +08:00
27942
03e209f28a 加入精准回测数据 2026-02-02 00:16:35 +08:00
10 changed files with 84088 additions and 85097 deletions

24
1111
View File

@@ -21,9 +21,27 @@ db2e87093c120766bca60d2282y72760688
yx20250715@gmail.com
Abc12345678
yx20250715@gmail.com
248.23
2611
基于前一根有效 K 线(实体 ≥ 0.1
做多触发价 = 当前 k 线开盘价 + 实体 / 5收盘价向上 1/5 实体)
做空触发价 = 当前 k 线开盘价 - 实体 / 5收盘价向下 1/5 实体)
若已有持仓,在 3 分钟 K 线的第一分钟可单独检测反手:
持空反手做多:价格涨到 开仓价 + 前一根实体 / 5
持多反手做空:价格跌到 开仓价 - 前一根实体 / 5
555.73
2278

File diff suppressed because it is too large Load Diff

View File

@@ -215,15 +215,128 @@ def determine_trigger_order_by_1m(
return None
def check_reverse_signal_in_first_minute(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float,
current_direction: str
) -> bool:
"""
检查反手信号是否在第一分钟触发
:param bars_1m: 3根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:param current_direction: 当前持仓方向 ('long''short')
:return: True 表示反手信号在第一分钟触发False 表示不是
"""
if not bars_1m:
return False
# 只检查第一分钟K线
first_bar = bars_1m[0]
high = float(first_bar['high'])
low = float(first_bar['low'])
# 如果当前是多仓,检查空信号是否在第一分钟触发
if current_direction == 'long':
return low <= short_trigger
# 如果当前是空仓,检查多信号是否在第一分钟触发
if current_direction == 'short':
return high >= long_trigger
return False
def get_body_percent(candle) -> float:
"""
计算K线实体占价格的百分比
:param candle: K线数据
:return: 实体百分比如0.1表示0.1%
"""
body = abs(float(candle['open']) - float(candle['close']))
price = (float(candle['open']) + float(candle['close'])) / 2
if price == 0:
return 0
return (body / price) * 100
def check_breakout_reverse_signal(
all_data_3m: List[Dict],
current_idx: int,
current_position_direction: str,
min_body_percent: float = 0.1
) -> tuple:
"""
检查"突破上一根K线高低点"的反手信号
规则:
- 持多单时当前K线跌破上一根K线最低点 → 反手开空
条件上一根K线是阴线且实体>0.1%波动
- 持空单时当前K线涨破上一根K线最高点 → 反手开多
条件上一根K线是阳线且实体>0.1%波动
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param current_position_direction: 当前持仓方向 ('long''short')
:param min_body_percent: 最小实体百分比默认0.1%
:return: (方向, 触发价格, 信号类型) 或 (None, None, None)
"""
if current_idx <= 0 or current_position_direction is None:
return None, None, None
curr = all_data_3m[current_idx]
prev = all_data_3m[current_idx - 1]
c_high = float(curr['high'])
c_low = float(curr['low'])
prev_high = float(prev['high'])
prev_low = float(prev['low'])
# 计算上一根K线的实体百分比
body_percent = get_body_percent(prev)
# 持多单时检查是否跌破上一根K线最低点
if current_position_direction == 'long':
# 条件上一根K线是阴线且实体>min_body_percent%
if is_bearish(prev) and body_percent >= min_body_percent:
if c_low < prev_low:
# 触发反手开空信号
logger.debug(f"突破反手信号持多单当前K线低点{c_low:.2f}跌破上一根阴线低点{prev_low:.2f},实体{body_percent:.3f}%")
return 'short', prev_low, 'breakout'
# 持空单时检查是否涨破上一根K线最高点
elif current_position_direction == 'short':
# 条件上一根K线是阳线且实体>min_body_percent%
if is_bullish(prev) and body_percent >= min_body_percent:
if c_high > prev_high:
# 触发反手开多信号
logger.debug(f"突破反手信号持空单当前K线高点{c_high:.2f}涨破上一根阳线高点{prev_high:.2f},实体{body_percent:.3f}%")
return 'long', prev_high, 'breakout'
return None, None, None
def check_trigger_with_1m(
all_data_3m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
min_body_size: float = 0.1,
current_position_direction: str = None,
first_minute_only: bool = True
) -> tuple:
"""
检查当前3分钟K线是否触发了交易信号
如果同时触发两个方向使用1分钟K线精确判断顺序
新增逻辑:如果有持仓且 first_minute_only=True反手信号必须在第一分钟触发才有效
:param all_data_3m: 3分钟K线数据
:param current_idx: 当前K线索引
:param min_body_size: 最小实体大小
:param current_position_direction: 当前持仓方向 ('long', 'short', 或 None)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效
返回:(方向, 触发价格, 有效前一根K线索引, 1分钟数据是否使用)
"""
if current_idx <= 0:
@@ -257,6 +370,15 @@ def check_trigger_with_1m(
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
# 检查反手信号是否需要在第一分钟触发
if first_minute_only and current_position_direction and direction != current_position_direction:
# 这是一个反手信号,检查是否在第一分钟触发
if not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, current_position_direction):
# 反手信号不是在第一分钟触发,忽略
logger.debug(f"反手信号 {direction} 不在第一分钟触发,忽略")
return None, None, None, False
return direction, trigger_price, valid_prev_idx, True
# 如果没有1分钟数据使用开盘价距离判断
@@ -269,9 +391,21 @@ def check_trigger_with_1m(
return 'long', long_trigger, valid_prev_idx, False
if short_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'long':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'long'):
logger.debug(f"空信号不在第一分钟触发,忽略(当前持多仓)")
return None, None, None, False
return 'short', short_trigger, valid_prev_idx, False
if long_triggered:
# 检查是否是反手信号且需要第一分钟触发
if first_minute_only and current_position_direction == 'short':
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m and not check_reverse_signal_in_first_minute(bars_1m, long_trigger, short_trigger, 'short'):
logger.debug(f"多信号不在第一分钟触发,忽略(当前持空仓)")
return None, None, None, False
return 'long', long_trigger, valid_prev_idx, False
return None, None, None, False
@@ -279,12 +413,21 @@ def check_trigger_with_1m(
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
def backtest_one_third_strategy(
dates: List[str],
min_body_size: float = 0.1,
first_minute_only: bool = True,
enable_breakout_reverse: bool = True,
breakout_min_body_percent: float = 0.1
):
"""
三分之一策略回测(精准版)
:param dates: 日期列表
:param min_body_size: 最小实体大小
:param min_body_size: 最小实体大小(绝对值)
:param first_minute_only: 是否只在第一分钟触发反手信号才有效默认True
:param enable_breakout_reverse: 是否启用"突破上一根K线高低点"反手信号默认True
:param breakout_min_body_percent: 突破反手信号的最小实体百分比默认0.1%
:return: (trades, stats)
"""
# 获取所有3分钟K线数据
@@ -297,6 +440,8 @@ def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据")
logger.info(f"反手信号仅第一分钟有效: {first_minute_only}")
logger.info(f"突破反手信号启用: {enable_breakout_reverse},最小实体百分比: {breakout_min_body_percent}%")
if not all_data:
logger.warning("未获取到任何数据,请检查数据库")
@@ -325,24 +470,47 @@ def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
# 统计使用1分钟数据精准判断的次数
precise_count = 0
fallback_count = 0
# 统计突破反手信号触发次数
breakout_reverse_count = 0
# 记录每根K线是否已经执行过突破反手当前K线只执行一次
last_breakout_bar_id = None
idx = 1
while idx < len(all_data):
curr = all_data[idx]
curr_bar_id = curr['id']
# 检测信号使用1分钟K线精准判断
# 获取当前持仓方向(用于判断反手信号是否在第一分钟触发
current_pos_dir = current_position['direction'] if current_position else None
# 检测信号使用1分钟K线精准判断并考虑反手信号第一分钟限制
direction, trigger_price, valid_prev_idx, used_1m = check_trigger_with_1m(
all_data, idx, min_body_size
all_data, idx, min_body_size,
current_position_direction=current_pos_dir,
first_minute_only=first_minute_only
)
# 如果没有五分之一信号,且有持仓,检查突破反手信号
signal_type = 'one_fifth' # 信号类型one_fifth五分之一或 breakout突破
if direction is None and current_position is not None and enable_breakout_reverse:
# 检查当前K线是否已经执行过突破反手
if last_breakout_bar_id != curr_bar_id:
breakout_dir, breakout_price, breakout_type = check_breakout_reverse_signal(
all_data, idx, current_pos_dir, breakout_min_body_percent
)
if breakout_dir:
direction = breakout_dir
trigger_price = breakout_price
signal_type = 'breakout'
if used_1m:
precise_count += 1
elif direction:
elif direction and signal_type == 'one_fifth':
fallback_count += 1
# 无持仓 -> 开仓
# 无持仓 -> 开仓(只用五分之一信号开仓,突破信号不用于开仓)
if current_position is None:
if direction:
if direction and signal_type == 'one_fifth':
current_position = {
'direction': direction,
'entry_price': trigger_price,
@@ -369,19 +537,28 @@ def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
else:
diff = current_position['entry_price'] - exit_price
# 记录信号类型
signal_type_str = '突破' if signal_type == 'breakout' else '五分之一'
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
'diff': diff,
'signal_type': signal_type_str
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 如果是突破反手信号记录当前K线ID防止重复执行
if signal_type == 'breakout':
last_breakout_bar_id = curr_bar_id
breakout_reverse_count += 1
# 反手开仓
current_position = {
'direction': direction,
@@ -392,7 +569,7 @@ def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 平{pos_dir}反手{direction} @ {trigger_price:.2f} 盈亏={diff:.2f}")
logger.debug(f"[{time_str}] 平{pos_dir}反手{direction}({signal_type_str}) @ {trigger_price:.2f} 盈亏={diff:.2f}")
idx += 1
@@ -420,7 +597,7 @@ def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count}")
logger.info(f"回测完成使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count} 次,突破反手信号 {breakout_reverse_count}")
return trades, stats
@@ -431,7 +608,12 @@ if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
MIN_BODY_SIZE = 0.1 # 最小实体大小
MIN_BODY_SIZE = 0.1 # 最小实体大小(绝对值)
FIRST_MINUTE_ONLY = True # 反手信号仅在3分钟K线的第一分钟触发才有效
# 突破反手信号配置
ENABLE_BREAKOUT_REVERSE = True # 是否启用"突破上一根K线高低点"反手信号
BREAKOUT_MIN_BODY_PERCENT = 0.1 # 突破反手信号的最小实体百分比0.1表示0.1%
# ==================== 生成查询日期列表 ====================
dates = []
@@ -455,7 +637,13 @@ if __name__ == '__main__':
exit(1)
# ==================== 执行回测 ====================
trades, stats = backtest_one_third_strategy(dates, MIN_BODY_SIZE)
trades, stats = backtest_one_third_strategy(
dates,
MIN_BODY_SIZE,
FIRST_MINUTE_ONLY,
ENABLE_BREAKOUT_REVERSE,
BREAKOUT_MIN_BODY_PERCENT
)
# ==================== 输出交易详情 ====================
logger.info("===== 每笔交易详情 =====")
@@ -511,6 +699,8 @@ if __name__ == '__main__':
print(f"{'='*60}")
print(f"回测周期:{START_DATE}{END_DATE}")
print(f"最小实体要求:{MIN_BODY_SIZE}")
print(f"反手信号仅第一分钟有效:{'' if FIRST_MINUTE_ONLY else ''}")
print(f"突破反手信号:{'启用' if ENABLE_BREAKOUT_REVERSE else '禁用'}(最小实体{BREAKOUT_MIN_BODY_PERCENT}%")
print(f"{'='*60}")
print(f"总交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
@@ -536,7 +726,7 @@ if __name__ == '__main__':
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'entry_time', 'exit_time', 'direction', 'entry', 'exit',
'point_diff', 'raw_profit', 'fee', 'net_profit'
'point_diff', 'raw_profit', 'fee', 'net_profit', 'signal_type'
])
writer.writeheader()
for t in trades:
@@ -549,6 +739,7 @@ if __name__ == '__main__':
'point_diff': t['point_diff'],
'raw_profit': t['raw_profit'],
'fee': t['fee'],
'net_profit': t['net_profit']
'net_profit': t['net_profit'],
'signal_type': t.get('signal_type', '五分之一')
})
print(f"\n交易记录已保存到:{csv_path}")

View File

@@ -0,0 +1,715 @@
"""
BitMart 三分之一回归策略交易(双向触发版)
使用5分钟K线周期实时监测
策略规则:
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根K线内只交易一次防止频繁反手
示例1阳线
前一根K线开盘3000收盘3100阳线实体=100
- 做多触发价格 = 3100 + 33 = 3133继续上涨做多
- 做空触发价格 = 3100 - 33 = 3067回调做空←当前跌到这里就做空
示例2阴线
前一根K线开盘3100收盘3000阴线实体=100
- 做多触发价格 = 3000 + 33 = 3033反弹做多
- 做空触发价格 = 3000 - 33 = 2967继续下跌做空
"""
import time
import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
class BitmartOneThirdStrategy:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a"
self.secret_key = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01"
self.memo = "me"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期
self.last_kline_time = None
self.leverage = "40" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
# 三分之一策略参数
self.min_body_size = 0.1 # 最小实体大小
self.kline_step = 5 # K线周期5分钟
self.kline_count = 20 # 获取的K线数量用于向前查找有效K线
# 实时监测参数
self.check_interval = 3 # 检测间隔(秒)
self.last_trigger_kline_id = None # 记录上次触发信号的K线ID避免同一K线重复触发
self.last_trigger_direction = None # 记录上次触发的方向
self.last_trade_kline_id = None # 记录上次实际交易的K线ID防止同一K线内频繁反手
# ========================= 三分之一策略核心函数 =========================
def is_bullish(self, c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(self, candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = self.get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(self, prev):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3实体
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3实体
示例:
阳线 open=3000, close=3100, 实体=100
- 做多触发 = 3100 + 33 = 3133继续涨
- 做空触发 = 3100 - 33 = 3067回调
阴线 open=3100, close=3000, 实体=100
- 做多触发 = 3000 + 33 = 3033反弹
- 做空触发 = 3000 - 33 = 2967继续跌
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多
short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空
return long_trigger, short_trigger
def check_trigger(self, all_data, current_idx):
"""
检查当前K线是否触发了交易信号双向检测
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则:
- 当前K线高点 >= 做多触发价格 → 做多信号
- 当前K线低点 <= 做空触发价格 → 做空信号
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size)
if prev is None:
return None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None
# 使用影线部分high/low来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发,判断哪个先触发
if long_triggered and short_triggered:
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx
else:
return 'long', long_trigger, valid_prev_idx
if short_triggered:
return 'short', short_trigger, valid_prev_idx
if long_triggered:
return 'long', long_trigger, valid_prev_idx
return None, None, None
def check_realtime_trigger(self, kline_data):
"""
实时检测当前K线是否触发信号双向检测
基于已收盘的K线计算触发价格用当前正在形成的K线判断
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
# 当前正在形成的K线最后一根未收盘
curr = kline_data[-1]
curr_kline_id = curr['id']
# 从倒数第二根开始往前找有效K线已收盘的K线
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, None
# 使用当前K线的实时高低点来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 确定触发方向
direction = None
trigger_price = None
if long_triggered and short_triggered:
# 两个方向都触发,判断哪个先(距离开盘价更近的先触发)
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
direction = 'short'
trigger_price = short_trigger
else:
direction = 'long'
trigger_price = long_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
# 检查是否在同一根K线内已经触发过相同方向
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ========================= BitMart API 函数 =========================
def get_klines(self):
"""获取最近N根5分钟K线"""
try:
end_time = int(time.time())
# 获取足够多的K线用于向前查找有效K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
error_msg = str(e)
# 检查是否是429限流错误
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器自动化函数 =========================
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def close_extra_tabs_in_browser(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""市价平仓"""
logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
"""
市价开单
marketPriceLongOrder: 1 做多, -1 做空
"""
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}")
size = 50
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌三分之一策略:" if error else "🔔三分之一策略:"
if error:
logger.error(msg)
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}")
# ========================= 时间计算函数 =========================
def get_now_time(self):
"""获取当前5分钟整点时间戳"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的5分钟整点
minute = current_datetime.minute
target_minute = (minute // 5) * 5 # 向下取整到5分钟
target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0)
return int(target_datetime.timestamp())
def get_time_to_next_5min(self):
"""获取距离下一个5分钟的秒数"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
minute = current_datetime.minute
next_5min = ((minute // 5) + 1) * 5
if next_5min >= 60:
next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)
else:
next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0)
return (next_datetime - current_datetime).total_seconds()
# ========================= 主运行函数 =========================
def action(self):
"""主运行逻辑 - 实时监测版本"""
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(f"开始实时监测,检测间隔: {self.check_interval}")
# 用于定时发送持仓信息每5分钟发一次
last_report_time = 0
report_interval = 300 # 5分钟报告一次持仓
while True:
# 1. 打开浏览器
for i in range(5):
if self.openBrowser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
# 获取K线数据
kline_data = self.get_klines()
if not kline_data:
logger.warning("获取K线数据失败等待重试...")
time.sleep(self.check_interval)
continue
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(self.check_interval)
continue
# 获取当前K线信息用于日志
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
# ========== 实时信号检测 ==========
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
# 检查是否在同一K线内已经交易过防止频繁反手
if self.last_trade_kline_id == curr_kline_id:
logger.debug(f"同一K线内已交易跳过本次{direction}信号")
# 更新触发记录,避免重复日志
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
# 检查信号与持仓是否同向(避免重复日志)
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
# 信号与持仓同向,静默忽略
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
logger.info(f"{'=' * 50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info(
f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info(
f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance()
if balance is None:
balance = 0
trade_size = balance * self.risk_percent
executed = False
if direction == "long":
if self.start == -1: # 当前空仓,平空开多
logger.info("📈 平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开多
logger.info("📈 无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1: # 当前多仓,平多开空
logger.info("📉 平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开空
logger.info("📉 无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
# 记录本次触发
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
# 记录交易K线防止同一K线内频繁反手
self.last_trade_kline_id = curr_kline_id
# 交易后立即发送持仓信息
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
logger.info(f"{'=' * 50}")
else:
# 没有信号时,显示实时价格
logger.debug(
f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
# ========== 定时发送持仓信息 ==========
current_time = time.time()
if current_time - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = current_time
# 等待下次检测
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(15)
self.page.close()
time.sleep(15)
def _send_position_message(self, latest_kline):
"""发送持仓信息到钉钉"""
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if hasattr(self,
'position_cross') and self.position_cross else 0.0
# 计算浮动盈亏
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
# 计算收益率
if open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eth){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
self.ding(msg=msg)
if __name__ == '__main__':
# 启动三分之一策略交易
BitmartOneThirdStrategy(bit_id="62f9107d0c674925972084e282df55b3").action()

View File

@@ -0,0 +1,25 @@
# 基于开盘价的五分之一策略
策略规则1111
- **做多触发价** = 当前K线开盘价 + 前一根实体/5
- **做空触发价** = 当前K线开盘价 - 前一根实体/5
- **前一根有效K线**:实体 ≥ 0.1
## 执行逻辑
- 当前K线最高价 ≥ 做多触发价 → 做多信号
- 当前K线最低价 ≤ 做空触发价 → 做空信号
- 同根K线多空都触及时用1分钟K线判断先后
- 触及信号则开仓或反手同根3分钟K线只交易一次
## 运行
```bash
cd /path/to/lm_code
python open_fifth_strategy/main.py
```
## 配置
`config.py` 中修改 API、合约、杠杆等参数。

View File

@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
"""基于开盘价的五分之一策略"""

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
基于开盘价的五分之一策略 - 配置文件
策略规则1111
- 做多触发价 = 当前K线开盘价 + 前一根实体/5
- 做空触发价 = 当前K线开盘价 - 前一根实体/5
- 前一根有效K线实体 >= 0.1
"""
# BitMart API请勿提交敏感信息到版本库
API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
MEMO = "合约交易"
# 交易参数
CONTRACT_SYMBOL = "ETHUSDT"
KLINE_STEP = 3 # 3分钟K线
MIN_BODY_SIZE = 0.1 # 有效K线最小实体
CHECK_INTERVAL = 3 # 检测间隔(秒)
LEVERAGE = "100"
OPEN_TYPE = "cross" # 全仓
RISK_PERCENT = 0.01 # 每次开仓占用可用余额的比例
# 比特浏览器ID用于网页下单
BIT_ID = "f2320f57e24c45529a009e1541e25961"

536
open_fifth_strategy/main.py Normal file
View File

@@ -0,0 +1,536 @@
# -*- coding: utf-8 -*-
"""
BitMart 基于开盘价的五分之一策略交易
策略规则1111
- 做多触发价 = 当前K线开盘价 + 实体/5
- 做空触发价 = 当前K线开盘价 - 实体/5
- 基于前一根有效K线实体 >= 0.1
执行:触及做多/做空触发价则开仓或反手同根K线只交易一次
运行python open_fifth_strategy/main.py在项目根目录 lm_code 下)
"""
import sys
from pathlib import Path
_root = Path(__file__).resolve().parent.parent
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
import random
import time
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
from open_fifth_strategy.config import (
API_KEY,
SECRET_KEY,
MEMO,
CONTRACT_SYMBOL,
KLINE_STEP,
MIN_BODY_SIZE,
CHECK_INTERVAL,
LEVERAGE,
OPEN_TYPE,
RISK_PERCENT,
BIT_ID,
)
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
class OpenBasedFifthStrategy:
"""基于开盘价的五分之一策略"""
def __init__(self, bit_id=None):
self.page = None
self.api_key = API_KEY
self.secret_key = SECRET_KEY
self.memo = MEMO
self.contract_symbol = CONTRACT_SYMBOL
self.contractAPI = APIContract(
self.api_key, self.secret_key, self.memo, timeout=(5, 15)
)
self.start = 0 # 持仓: -1空, 0无, 1多
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
self.bit_id = bit_id or BIT_ID
self.min_body_size = MIN_BODY_SIZE
self.kline_step = KLINE_STEP
self.check_interval = CHECK_INTERVAL
self.leverage = LEVERAGE
self.open_type = OPEN_TYPE
self.risk_percent = RISK_PERCENT
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.last_trade_kline_id = None
# ==================== 策略核心 ====================
def get_body_size(self, candle):
return abs(float(candle["open"]) - float(candle["close"]))
def find_valid_prev_bar(self, all_data, current_idx):
"""找前一根有效K线实体>=min_body_size"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= self.min_body_size:
return i, prev
return None, None
def get_trigger_levels(self, prev, curr_open):
"""
基于当前K线开盘价计算触发价1111
做多触发 = 当前K线开盘价 + 实体/5
做空触发 = 当前K线开盘价 - 实体/5
"""
body = self.get_body_size(prev)
if body < 0.001:
return None, None
curr_o = float(curr_open)
return curr_o + body / 5, curr_o - body / 5
def get_1m_bars_for_3m_bar(self, bar_3m):
"""获取当前3分钟K线对应的3根1分钟K线"""
try:
start_ts = int(bar_3m["id"])
end_ts = start_ts + 3 * 60
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=start_ts,
end_time=end_ts,
)[0]
if response.get("code") != 1000:
return []
data = response.get("data", [])
out = []
for k in data:
out.append(
{
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
}
)
out.sort(key=lambda x: x["id"])
return out
except Exception as e:
logger.warning(f"获取1分钟K线失败: {e}")
return []
def determine_trigger_order_by_1m(self, bars_1m, long_trigger, short_trigger):
"""同根K线多空都触及时用1分钟K线判断先后"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar["high"])
low = float(bar["low"])
open_price = float(bar["open"])
long_ok = high >= long_trigger
short_ok = low <= short_trigger
if long_ok and not short_ok:
return "long"
if short_ok and not long_ok:
return "short"
if long_ok and short_ok:
d_long = abs(long_trigger - open_price)
d_short = abs(short_trigger - open_price)
return "short" if d_short < d_long else "long"
return None
def check_trigger(self, kline_data):
"""
检测当前K线是否触发信号1111当前开盘价±实体/5
返回:(方向, 触发价, 有效前一根, 当前K线) 或 (None,...)
"""
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr["id"]
curr_open = curr["open"]
_, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_trigger_levels(prev, curr_open)
if long_trigger is None:
return None, None, None, None
c_high = float(curr["high"])
c_low = float(curr["low"])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
direction = None
trigger_price = None
if long_triggered and short_triggered:
bars_1m = self.get_1m_bars_for_3m_bar(curr)
if bars_1m:
direction = self.determine_trigger_order_by_1m(
bars_1m, long_trigger, short_trigger
)
trigger_price = long_trigger if direction == "long" else short_trigger
if direction is None:
c_open_f = float(curr["open"])
d_long = abs(long_trigger - c_open_f)
d_short = abs(short_trigger - c_open_f)
direction = "short" if d_short <= d_long else "long"
trigger_price = long_trigger if direction == "long" else short_trigger
elif short_triggered:
direction = "short"
trigger_price = short_trigger
elif long_triggered:
direction = "long"
trigger_price = long_trigger
if direction is None:
return None, None, None, None
if (
self.last_trigger_kline_id == curr_kline_id
and self.last_trigger_direction == direction
):
return None, None, None, None
return direction, trigger_price, prev, curr
# ==================== BitMart API ====================
def get_klines(self):
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step,
start_time=end_time - 3600 * 3,
end_time=end_time,
)[0]["data"]
formatted = []
for k in response:
formatted.append(
{
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
}
)
formatted.sort(key=lambda x: x["id"])
return formatted
except Exception as e:
if "429" in str(e) or "too many requests" in str(e).lower():
logger.warning(f"API限流等待60秒: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding("获取K线异常", error=True)
return None
def get_available_balance(self):
try:
response = self.contractAPI.get_assets_detail()[0]
if response["code"] == 1000:
data = response["data"]
if isinstance(data, dict):
return float(data.get("available_balance", 0))
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
try:
response = self.contractAPI.get_position(
contract_symbol=self.contract_symbol
)[0]
if response["code"] == 1000:
positions = response["data"]
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]["position_type"] == 1 else -1
self.open_avg_price = positions[0]["open_avg_price"]
self.current_amount = positions[0]["current_amount"]
self.position_cross = positions[0]["position_cross"]
return True
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type,
)[0]
if response["code"] == 1000:
logger.success(f"全仓 {self.leverage}x 杠杆设置成功")
return True
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ==================== 浏览器 ====================
def _open_browser(self):
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception:
return False
def close_extra_tabs(self):
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
def click_safe(self, xpath, sleep=0.5):
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except Exception:
return False
def 平仓(self):
logger.info("执行平仓...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding("执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str},金额: {size}")
size = max(1, min(25, int(size)))
try:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(str(size))
if marketPriceLongOrder == -1:
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
else:
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(f"执行{direction_str},金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
prefix = "❌五分之一:" if error else "🔔五分之一:"
full_msg = f"{prefix}{msg}"
if error:
logger.error(msg)
for _ in range(3):
ding_executor.submit(self._send_ding_safe, full_msg)
else:
logger.info(msg)
ding_executor.submit(self._send_ding_safe, full_msg)
def _send_ding_safe(self, msg):
try:
send_dingtalk_message(msg)
except Exception as e:
logger.warning(f"消息发送失败: {e}")
def _send_position_message(self, latest_kline):
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price)
current_amount = float(self.current_amount)
position_cross = float(getattr(self, "position_cross", 0) or 0)
if self.start == 1:
unrealized_pnl = current_amount * 0.001 * (
current_price - open_avg_price
)
else:
unrealized_pnl = current_amount * 0.001 * (
open_avg_price - current_price
)
pnl_rate = (
(current_price - open_avg_price) / open_avg_price * 100
if self.start == 1
else (open_avg_price - current_price) / open_avg_price * 100
)
direction_str = "" if self.start == -1 else ""
msg = (
f"【五分之一 {self.contract_symbol}\n"
f"方向:{direction_str}\n"
f"现价:{current_price:.2f}\n"
f"开仓均价:{open_avg_price:.2f}\n"
f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n"
f"余额:{self.balance:.2f}"
)
else:
msg = (
f"【五分之一 {self.contract_symbol}\n"
f"方向:无\n"
f"现价:{current_price:.2f}\n"
f"余额:{self.balance:.2f}"
)
self.ding(msg)
# ==================== 主循环 ====================
def action(self):
if not self.set_leverage():
logger.error("杠杆设置失败")
return
if not self._open_browser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(
f"五分之一策略3分钟K线开始监测间隔: {self.check_interval}"
)
last_report_time = 0
report_interval = 300
while True:
for _ in range(5):
if self._open_browser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
kline_data = self.get_klines()
if not kline_data or len(kline_data) < 3:
logger.warning("K线数据不足...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
if not self.get_position_status():
logger.warning("获取仓位失败,使用缓存")
direction, trigger_price, valid_prev, curr_kline = (
self.check_trigger(kline_data)
)
if direction:
curr_kline_id = curr_kline["id"]
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
if (direction == "long" and self.start == 1) or (
direction == "short" and self.start == -1
):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
executed = False
if direction == "long":
if self.start == -1:
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0:
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1:
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0:
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
self.last_trade_kline_id = curr_kline_id
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
if time.time() - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = time.time()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(3)
if random.randint(1, 10) > 7:
self.page.close()
time.sleep(15)
if __name__ == "__main__":
try:
OpenBasedFifthStrategy(bit_id=BIT_ID).action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
ding_executor.shutdown(wait=True)
logger.info("已退出")

Binary file not shown.

View File

@@ -19,7 +19,7 @@ BitMart 五分之一回归策略交易(精准版)
4. 精准判断使用1分钟K线
- 当当前3分钟K线同时触及做多和做空价格时拉取该3分钟对应的3根1分钟K线判断哪个方向先被触发
"""
import random
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
@@ -71,6 +71,18 @@ class BitmartOneFifthStrategy:
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.last_trade_kline_id = None
# 反手信号当前价格容差(美元)
# 当检测到反手信号时,当前价格必须在触发价附近才执行
# 避免"先涨后跌"或"先跌后涨"的情况下错误开仓
self.reverse_price_tolerance = 2.0 # 2美元容差
# 基于开仓价格的反手信号参数
# 记录开仓时使用的前一根K线实体大小用于计算反手触发价
self.entry_prev_body = None # 开仓时前一根K线的实体大小
self.entry_price = None # 开仓价格(用于计算反手触发价)
self.entry_kline_id = None # 开仓时的K线ID用于判断是否在同一根K线内
self.first_minute_reverse_executed = False # 当前K线是否已执行过第一分钟反手
# ========================= 五分之一策略核心 =========================
@@ -158,10 +170,105 @@ class BitmartOneFifthStrategy:
return 'short' if d_short < d_long else 'long'
return None
def check_realtime_trigger(self, kline_data):
def check_first_minute_reverse_signal(self, curr_kline, kline_data):
"""
检测基于开仓价格的反手信号只在3分钟K线的第一分钟有效
反手规则:
- 空仓反手开多:开空仓后,价格涨到 开仓价格 + 前一根K线实体/5 → 平空开多
- 多仓反手开空:开多仓后,价格跌到 开仓价格 - 前一根K线实体/5 → 平多开空
:param curr_kline: 当前3分钟K线数据
:param kline_data: 所有K线数据用于获取前一根K线实体
:return: (方向, 触发价格) 或 (None, None)
"""
# 检查是否有持仓
if self.start == 0:
return None, None
curr_kline_id = curr_kline['id']
# 如果K线切换了重置第一分钟反手标记
if self.entry_kline_id != curr_kline_id:
self.first_minute_reverse_executed = False
self.entry_kline_id = curr_kline_id # 更新当前K线ID
# 检查当前K线是否已执行过第一分钟反手
if self.first_minute_reverse_executed:
return None, None
# 获取开仓价格如果没有记录使用API返回的开仓均价
entry_price = self.entry_price
if entry_price is None and self.open_avg_price:
entry_price = float(self.open_avg_price)
if entry_price is None:
return None, None
# 获取前一根有效K线的实体大小
valid_prev_idx, valid_prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if valid_prev is None:
return None, None
prev_body = self.get_body_size(valid_prev)
# 计算反手触发价格
reverse_offset = prev_body / 5
# 获取第一分钟K线
bars_1m = self.get_1m_bars_for_3m_bar(curr_kline)
if not bars_1m or len(bars_1m) < 1:
return None, None
first_1m = bars_1m[0] # 第一分钟K线
first_1m_high = float(first_1m['high'])
first_1m_low = float(first_1m['low'])
first_1m_close = float(first_1m['close'])
if self.start == -1:
# 持有空仓,检测反手开多信号
# 反手触发价 = 开仓价格 + 前一根实体/5
reverse_long_trigger = entry_price + reverse_offset
# 检查第一分钟是否触及反手触发价
if first_1m_high >= reverse_long_trigger:
# 第一分钟高点触及反手触发价,并且当前价格在触发价附近
if first_1m_close >= reverse_long_trigger - self.reverse_price_tolerance:
logger.info(f"🔄 第一分钟反手信号检测:持空仓,第一分钟高点{first_1m_high:.2f}>=反手触发价{reverse_long_trigger:.2f}")
logger.info(f" 开仓价={entry_price:.2f}, 前一根实体={prev_body:.2f}, 实体/5={reverse_offset:.2f}")
return 'long', reverse_long_trigger
else:
logger.debug(f"第一分钟反手信号被过滤:当前价格{first_1m_close:.2f}已远离触发价{reverse_long_trigger:.2f}")
elif self.start == 1:
# 持有多仓,检测反手开空信号
# 反手触发价 = 开仓价格 - 前一根实体/5
reverse_short_trigger = entry_price - reverse_offset
# 检查第一分钟是否触及反手触发价
if first_1m_low <= reverse_short_trigger:
# 第一分钟低点触及反手触发价,并且当前价格在触发价附近
if first_1m_close <= reverse_short_trigger + self.reverse_price_tolerance:
logger.info(f"🔄 第一分钟反手信号检测:持多仓,第一分钟低点{first_1m_low:.2f}<=反手触发价{reverse_short_trigger:.2f}")
logger.info(f" 开仓价={entry_price:.2f}, 前一根实体={prev_body:.2f}, 实体/5={reverse_offset:.2f}")
return 'short', reverse_short_trigger
else:
logger.debug(f"第一分钟反手信号被过滤:当前价格{first_1m_close:.2f}已远离触发价{reverse_short_trigger:.2f}")
return None, None
def check_realtime_trigger(self, kline_data, current_position=0):
"""
实时检测当前3分钟K线是否触发信号
若多空都触发优先用1分钟K线判断先后否则用开盘价距离
参数:
kline_data: K线数据列表
current_position: 当前持仓状态 (1=多, -1=空, 0=无)
逻辑优化:
- 当已有持仓时,只关心反向信号(有多仓只看空信号,有空仓只看多信号)
- 无仓位时用1分钟K线判断先触发的方向
- 【重要】反手信号不仅要求K线高/低点触及触发价,还要求当前价格在触发价附近
避免"先涨后跌""先跌后涨"的情况下错误开仓
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
@@ -181,31 +288,57 @@ class BitmartOneFifthStrategy:
c_high = float(curr['high'])
c_low = float(curr['low'])
c_close = float(curr['close']) # 当前价格(用于检查价格是否仍在触发价附近)
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
direction = None
trigger_price = None
if long_triggered and short_triggered:
bars_1m = self.get_1m_bars_for_3m_bar(curr)
if bars_1m:
direction = self.determine_trigger_order_by_1m(
bars_1m, long_trigger, short_trigger
)
trigger_price = long_trigger if direction == 'long' else short_trigger
if direction is None:
c_open = float(curr['open'])
d_long = abs(long_trigger - c_open)
d_short = abs(short_trigger - c_open)
direction = 'short' if d_short <= d_long else 'long'
trigger_price = long_trigger if direction == 'long' else short_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
# 关键优化:根据当前持仓状态决定关注哪个方向的信号
if current_position == 1:
# 当前是多仓,只关心空信号(用于平多开空)
if short_triggered:
# 【重要】额外检查:当前价格必须在做空触发价附近或下方
# 如果价格已经涨回去了(当前价格远高于做空触发价),则不触发
if c_close <= short_trigger + self.reverse_price_tolerance:
direction = 'short'
trigger_price = short_trigger
else:
logger.debug(f"空信号被过滤:当前价格{c_close:.2f}已远离做空触发价{short_trigger:.2f}(容差{self.reverse_price_tolerance}")
elif current_position == -1:
# 当前是空仓,只关心多信号(用于平空开多)
if long_triggered:
# 【重要】额外检查:当前价格必须在做多触发价附近或上方
# 如果价格已经跌回去了(当前价格远低于做多触发价),则不触发
if c_close >= long_trigger - self.reverse_price_tolerance:
direction = 'long'
trigger_price = long_trigger
else:
logger.debug(f"多信号被过滤:当前价格{c_close:.2f}已远离做多触发价{long_trigger:.2f}(容差{self.reverse_price_tolerance}")
else:
# 无仓位,按原逻辑判断先触发的方向
if long_triggered and short_triggered:
bars_1m = self.get_1m_bars_for_3m_bar(curr)
if bars_1m:
direction = self.determine_trigger_order_by_1m(
bars_1m, long_trigger, short_trigger
)
trigger_price = long_trigger if direction == 'long' else short_trigger
if direction is None:
c_open = float(curr['open'])
d_long = abs(long_trigger - c_open)
d_short = abs(short_trigger - c_open)
direction = 'short' if d_short <= d_long else 'long'
trigger_price = long_trigger if direction == 'long' else short_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
@@ -383,7 +516,7 @@ class BitmartOneFifthStrategy:
"""
prefix = "❌五分之一策略:" if error else "🔔五分之一策略:"
full_msg = f"{prefix}{msg}"
if error:
logger.error(msg)
# 异步发送多条错误消息
@@ -393,7 +526,7 @@ class BitmartOneFifthStrategy:
logger.info(msg)
# 异步发送单条消息
ding_executor.submit(self._send_ding_safe, full_msg)
def _send_ding_safe(self, msg):
"""
安全发送钉钉消息,捕获异常防止线程崩溃
@@ -449,7 +582,74 @@ class BitmartOneFifthStrategy:
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
# 优化:先获取持仓状态,再检测信号(传入持仓状态以便只关注反向信号)
if not self.get_position_status():
logger.warning("获取仓位信息失败,使用缓存的持仓状态")
# ========== 第一分钟反手信号检测 ==========
# 如果有持仓,先检测基于开仓价格的第一分钟反手信号
first_min_direction = None
first_min_trigger_price = None
if self.start != 0:
first_min_direction, first_min_trigger_price = self.check_first_minute_reverse_signal(curr, kline_data)
# 如果检测到第一分钟反手信号,优先执行
if first_min_direction:
curr_kline_id = curr['id']
if self.last_trade_kline_id != curr_kline_id:
logger.info(f"{'=' * 50}")
# 安全获取开仓价格和前一根实体
entry_price_display = self.entry_price if self.entry_price else (float(self.open_avg_price) if self.open_avg_price else 0)
entry_body_display = self.entry_prev_body if self.entry_prev_body else 0
logger.info(f"🔄 第一分钟反手信号触发!方向: {first_min_direction}, 触发价: {first_min_trigger_price:.2f}")
logger.info(f" 开仓价: {entry_price_display:.2f}, 前一根实体/5: {entry_body_display/5:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空)")
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
if first_min_direction == 'long' and self.start == -1:
logger.info("📈 第一分钟反手:平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
self.first_minute_reverse_executed = True
self.last_trade_kline_id = curr_kline_id
# 更新开仓信息
valid_prev_idx, valid_prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if valid_prev:
self.entry_prev_body = self.get_body_size(valid_prev)
self.entry_price = float(curr['close'])
self.entry_kline_id = curr_kline_id
self.get_position_status()
self._send_position_message(curr)
elif first_min_direction == 'short' and self.start == 1:
logger.info("📉 第一分钟反手:平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
self.first_minute_reverse_executed = True
self.last_trade_kline_id = curr_kline_id
# 更新开仓信息
valid_prev_idx, valid_prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if valid_prev:
self.entry_prev_body = self.get_body_size(valid_prev)
self.entry_price = float(curr['close'])
self.entry_kline_id = curr_kline_id
self.get_position_status()
self._send_position_message(curr)
logger.info(f"{'=' * 50}")
time.sleep(self.check_interval)
continue
# ========== 原有的五分之一信号检测 ==========
# 传入当前持仓状态,确保有持仓时只关注反向信号
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(
kline_data, current_position=self.start
)
if direction:
curr_kline_id = curr_kline['id']
@@ -458,16 +658,15 @@ class BitmartOneFifthStrategy:
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
# 由于 check_realtime_trigger 已经考虑了持仓状态,这里理论上不会出现同向信号
# 但保留这个检查作为安全措施
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
logger.debug(f"同向信号被过滤: direction={direction}, position={self.start}")
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
@@ -512,6 +711,13 @@ class BitmartOneFifthStrategy:
self.last_trigger_direction = direction
if executed:
self.last_trade_kline_id = curr_kline_id
# 记录开仓信息,用于第一分钟反手信号检测
self.entry_price = trigger_price # 开仓触发价格
self.entry_prev_body = self.get_body_size(valid_prev) # 前一根K线实体大小
self.entry_kline_id = curr_kline_id # 开仓时的K线ID
self.first_minute_reverse_executed = False # 重置第一分钟反手标记
logger.info(f" 记录开仓信息:开仓价={self.entry_price:.2f}, 前一根实体={self.entry_prev_body:.2f}, K线ID={self.entry_kline_id}")
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
@@ -529,9 +735,11 @@ class BitmartOneFifthStrategy:
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(15)
self.page.close()
time.sleep(15)
time.sleep(3)
if random.randint(1,10)>7:
self.page.close()
time.sleep(15)
def _send_position_message(self, latest_kline):
current_price = float(latest_kline["close"])