Files
lm_code/bitmart/交易.py
2026-02-11 14:57:34 +08:00

967 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import json
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
PRECOMPUTED_TRADE_PARAMS = {
# 你已经算好参数时,把 enabled 改成 True并在下面填入你的值。
# 直接运行本文件即可生效,不需要任何命令行参数。
"enabled": False,
"params": {}
}
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.pbar = tqdm(total=30, desc="等待K线", ncols=80) # 可选:用于长时间等待时展示进度
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
# 反手频率控制
self.reverse_cooldown_seconds = 60 # 反手冷却时间(秒)
self.last_reverse_time = None # 上次反手时间
self.max_reverse_times_per_kline = 1 # 同一根 K 线最多反手次数
self.reverse_count_kline_id = None # 反手计数对应的 K 线 id
self.reverse_count_in_kline = 0 # 当前 K 线已反手次数
# 开仓频率控制
self.open_cooldown_seconds = 60 # 开仓冷却时间(秒),两次开仓至少间隔此时长
self.last_open_time = None # 上次开仓时间
self.last_open_kline_id = None # 上次开仓所在 K 线 id同一根 K 线只允许开仓一次
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0 # 未使用;若启用则可为每次开仓占可用余额的百分比
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
self.default_order_size = 25 # 开仓/反手张数,统一在此修改
# 策略相关变量
self.prev_kline = None # 上一根K线
self.current_kline = None # 当前K线
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
# 策略优化参数
self.min_prev_entity_pct = 0.1 # 上一根K线实体涨幅%),仅当实体涨幅 >= 此值才用作“上一根”
# 自动止盈基于当前K线振幅四等分多仓用 open~high空仓用 open~low
self.take_profit_close_quartile = 3 # 达到极值后,回到 3/4 位置平仓
self.take_profit_reentry_quartile = 2 # 止盈后,继续回到 2/4 位置同向再开仓
self.take_profit_min_gain_pct_from_entry = 0.4 # 相对当前K线开盘价的最小涨跌幅阈值%),超过才允许止盈
self.take_profit_triggered_kline_id = None # 多仓本K线内是否出现过 open~high 的极值触发
self.take_profit_reentry_threshold = None # 多仓止盈平仓后,价格 < 此值则开多(同向)
self.take_profit_triggered_kline_id_short = None # 空仓本K线内是否出现过 open~low 的极值触发
self.take_profit_reentry_threshold_short = None # 空仓止盈平仓后,价格 > 此值则开空(同向)
self.last_take_profit_kline_id = None # 本K线已止盈一次不区分多空则不再止盈
self.optimized_params_file = Path(__file__).resolve().parent / "atr_best_params.json"
self.strategy_log_dir = Path(__file__).resolve().parent # 策略开仓日志目录
self.apply_precomputed_params()
self.load_optimized_params()
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近多根K线列表供主循环按「实体涨幅>=min_prev_entity_pct」向前选取上一根
if len(formatted) >= 2:
return formatted
return None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_pnl = None
return True
pos = positions[0]
self.start = 1 if pos['position_type'] == 1 else -1
self.open_avg_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
self.position_cross = pos["position_cross"]
# 直接从API获取未实现盈亏Bitmart返回的是 unrealized_value 字段)
self.unrealized_pnl = float(pos.get('unrealized_value', 0))
logger.debug(f"持仓详情: 方向={self.start}, 开仓均价={self.open_avg_price}, "
f"持仓量={self.current_amount}, 未实现盈亏={self.unrealized_pnl:.2f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def get_unrealized_pnl_usd(self):
"""
获取当前持仓未实现盈亏美元直接使用API返回值
"""
if self.start == 0 or self.unrealized_pnl is None:
return None
return self.unrealized_pnl
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
# ele.scroll.to_see(center=True)
# time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def _log_take_profit_action(self, operation: str, reason: str):
"""止盈相关操作日志:时间、操作、原因"""
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"[止盈日志] 时间={time_str} | 操作={operation} | 原因={reason}")
def _write_open_log(self, operation: str, reason: str, current_kline: dict, prev_kline: dict | None):
"""写入策略开仓日志文件时间、参考的两根K线、操作、开仓原因、计算参数"""
try:
date_str = datetime.now().strftime("%Y%m%d")
log_file = self.strategy_log_dir / f"strategy_log_{date_str}.txt"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 当前K线信息
cur = current_kline
cur_line = f"id={cur['id']} open={cur['open']:.2f} high={cur['high']:.2f} low={cur['low']:.2f} close={cur['close']:.2f}"
# 上一根K线信息和计算参数
if prev_kline:
prev = prev_kline
prev_entity = abs(prev['close'] - prev['open'])
prev_entity_pct = (prev_entity / prev['open']) * 100 if prev['open'] else 0
prev_line = (
f"id={prev['id']} open={prev['open']:.2f} high={prev['high']:.2f} "
f"low={prev['low']:.2f} close={prev['close']:.2f}\n"
f" 实体大小={prev_entity:.4f} ({prev_entity_pct:.3f}%) "
f"实体/3={prev_entity/3:.4f}"
)
# 计算触发价
base_open = cur['open']
long_trigger = base_open + prev_entity / 3
short_trigger = base_open - prev_entity / 3
calc_params = (
f"\n计算参数:\n"
f" ├─ 实体涨幅阈值: {self.min_prev_entity_pct}%\n"
f" ├─ 当前K线开盘价: {base_open:.2f}\n"
f" ├─ 上一根实体大小: {prev_entity:.4f}\n"
f" ├─ 上一根实体/3: {prev_entity/3:.4f}\n"
f" ├─ 做多触发价: {base_open:.2f} + {prev_entity/3:.4f} = {long_trigger:.2f}\n"
f" └─ 做空触发价: {base_open:.2f} - {prev_entity/3:.4f} = {short_trigger:.2f}"
)
else:
prev_line = "(无)"
calc_params = ""
block = (
f"\n{'='*80}\n"
f"时间: {time_str}\n"
f"操作: {operation}\n"
f"参考K线:\n"
f" 当前K线: {cur_line}\n"
f" 上一根K线: {prev_line}"
f"{calc_params}\n"
f"开仓原因: {reason}\n"
f"{'='*80}\n"
)
with open(log_file, "a", encoding="utf-8") as f:
f.write(block)
logger.info(f"已写入开仓日志: {log_file.name}")
except Exception as e:
logger.warning(f"写入开仓日志失败: {e}")
def load_optimized_params(self):
"""从本地优化结果文件加载参数(可选)。"""
try:
if PRECOMPUTED_TRADE_PARAMS.get("enabled"):
logger.info("已启用 PRECOMPUTED_TRADE_PARAMS跳过 atr_best_params.json 加载")
return
if not self.optimized_params_file.exists():
return
data = json.loads(self.optimized_params_file.read_text(encoding="utf-8"))
if data.get("apply_live") is not True:
logger.info(f"检测到优化参数文件但 apply_live != true跳过加载: {self.optimized_params_file}")
return
params = data.get("params_for_trade_py", data)
allow_keys = {"min_prev_entity_pct"}
applied = []
for key, val in params.items():
if key in allow_keys and hasattr(self, key):
setattr(self, key, val)
applied.append(key)
if applied:
logger.info(f"已加载优化参数文件: {self.optimized_params_file},字段数: {len(applied)}")
except Exception as e:
logger.warning(f"加载优化参数文件失败,将继续使用默认参数: {e}")
def apply_precomputed_params(self):
"""
直接应用本文件内预计算参数(不依赖命令)。
当 PRECOMPUTED_TRADE_PARAMS.enabled=True 时生效。
"""
try:
conf = PRECOMPUTED_TRADE_PARAMS or {}
if conf.get("enabled") is not True:
return
params = conf.get("params") or {}
if not isinstance(params, dict):
logger.warning("PRECOMPUTED_TRADE_PARAMS.params 不是字典,忽略")
return
allow_keys = {"min_prev_entity_pct"}
applied = []
for key, val in params.items():
if key in allow_keys and hasattr(self, key):
setattr(self, key, val)
applied.append(key)
logger.info(f"已应用文件内预计算参数,字段数: {len(applied)}")
except Exception as e:
logger.warning(f"应用文件内预计算参数失败,将继续使用默认参数: {e}")
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def calc_long_take_profit_levels(self, current_kline):
"""
多仓止盈阈值(自动四等分):
基于当前K线 open~high 计算:
- extreme: high4/4 极值)
- close: open + 3/4 * (high-open)
- reentry: open + 2/4 * (high-open)
"""
k_open = float(current_kline['open'])
k_high = float(current_kline['high'])
if k_high <= k_open:
return None
move = k_high - k_open
close_threshold = k_open + move * (self.take_profit_close_quartile / 4)
reentry_threshold = k_open + move * (self.take_profit_reentry_quartile / 4)
return {
"open": k_open,
"extreme": k_high,
"close_threshold": close_threshold,
"reentry_threshold": reentry_threshold
}
def calc_short_take_profit_levels(self, current_kline):
"""
空仓止盈阈值(自动四等分):
基于当前K线 open~low 计算:
- extreme: low4/4 极值)
- close: open - 3/4 * (open-low)
- reentry: open - 2/4 * (open-low)
"""
k_open = float(current_kline['open'])
k_low = float(current_kline['low'])
if k_low >= k_open:
return None
move = k_open - k_low
close_threshold = k_open - move * (self.take_profit_close_quartile / 4)
reentry_threshold = k_open - move * (self.take_profit_reentry_quartile / 4)
return {
"open": k_open,
"extreme": k_low,
"close_threshold": close_threshold,
"reentry_threshold": reentry_threshold
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号(三分之一策略)
规则基于当前K线开盘价 ± 上一根K线实体的1/3
- 做多触发价 = 当前开盘价 + 上一根实体/3
- 做空触发价 = 当前开盘价 - 上一根实体/3
返回: ('long', trigger_price) / ('short', trigger_price) /
('reverse_long', trigger_price) / ('reverse_short', trigger_price) / None
"""
# 计算上一根K线实体主循环已保证所选 prev 实体涨幅 >= min_prev_entity_pct
prev_entity = self.calculate_entity(prev_kline)
prev_entity_pct = (prev_entity / prev_kline['open']) * 100 if prev_kline['open'] else 0
# 计算触发价当前K线开盘价 ± 上一根实体/3
base_open = current_kline['open']
long_trigger = base_open + prev_entity / 3
short_trigger = base_open - prev_entity / 3
# 详细日志:显示计算依据
logger.info("=" * 80)
logger.info(f"【信号检测】当前价格: {current_price:.2f}")
logger.info(f"【参考K线】上一根K线 ID: {prev_kline['id']}")
logger.info(f" ├─ 开盘价: {prev_kline['open']:.2f}")
logger.info(f" ├─ 收盘价: {prev_kline['close']:.2f}")
logger.info(f" ├─ 实体大小: {prev_entity:.4f} ({prev_entity_pct:.3f}%)")
logger.info(f" └─ 实体/3: {prev_entity/3:.4f}")
logger.info(f"【当前K线】ID: {current_kline['id']}, 开盘价: {base_open:.2f}")
logger.info(f"【触发价格】")
logger.info(f" ├─ 做多触发价 = {base_open:.2f} + {prev_entity/3:.4f} = {long_trigger:.2f}")
logger.info(f" └─ 做空触发价 = {base_open:.2f} - {prev_entity/3:.4f} = {short_trigger:.2f}")
logger.info("=" * 80)
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_trigger:
logger.success(f"✅ 触发做多信号!")
logger.success(f" 当前价格 {current_price:.2f} >= 做多触发价 {long_trigger:.2f}")
logger.success(f" 超出触发价: {current_price - long_trigger:.4f} ({(current_price - long_trigger)/long_trigger*100:.3f}%)")
return ('long', long_trigger)
elif current_price <= short_trigger:
logger.success(f"✅ 触发做空信号!")
logger.success(f" 当前价格 {current_price:.2f} <= 做空触发价 {short_trigger:.2f}")
logger.success(f" 低于触发价: {short_trigger - current_price:.4f} ({(short_trigger - current_price)/short_trigger*100:.3f}%)")
return ('short', short_trigger)
# 持多仓时检查反手信号
elif self.start == 1:
if current_price <= short_trigger:
logger.warning(f"⚠️ 持多反手做空信号!")
logger.warning(f" 当前价格 {current_price:.2f} <= 做空触发价 {short_trigger:.2f}")
logger.warning(f" 低于触发价: {short_trigger - current_price:.4f} ({(short_trigger - current_price)/short_trigger*100:.3f}%)")
return ('reverse_short', short_trigger)
# 持空仓时检查反手信号
elif self.start == -1:
if current_price >= long_trigger:
logger.warning(f"⚠️ 持空反手做多信号!")
logger.warning(f" 当前价格 {current_price:.2f} >= 做多触发价 {long_trigger:.2f}")
logger.warning(f" 超出触发价: {current_price - long_trigger:.4f} ({(current_price - long_trigger)/long_trigger*100:.3f}%)")
return ('reverse_long', long_trigger)
return None
def can_open(self, current_kline_id):
"""开仓前过滤:同一根 K 线只开一次 + 开仓冷却时间。仅用于 long/short 新开仓。"""
now = time.time()
if self.last_open_kline_id is not None and self.last_open_kline_id == current_kline_id:
logger.info(f"开仓频率控制:本 K 线({current_kline_id})已开过仓,跳过")
return False
if self.last_open_time is not None and now - self.last_open_time < self.open_cooldown_seconds:
remain = self.open_cooldown_seconds - (now - self.last_open_time)
logger.info(f"开仓冷却中,剩余 {remain:.0f}")
return False
return True
def can_reverse(self, current_kline_id):
"""反手前过滤:冷却时间 + 同一根 K 线最多反手次数。"""
now = time.time()
if self.last_reverse_time and now - self.last_reverse_time < self.reverse_cooldown_seconds:
remain = self.reverse_cooldown_seconds - (now - self.last_reverse_time)
logger.info(f"反手冷却中,剩余 {remain:.0f}")
return False
if self.reverse_count_kline_id != current_kline_id:
self.reverse_count_kline_id = current_kline_id
self.reverse_count_in_kline = 0
if self.reverse_count_in_kline >= self.max_reverse_times_per_kline:
logger.info(
f"本 K 线({current_kline_id})反手次数已达上限({self.max_reverse_times_per_kline}),跳过"
)
return False
return True
def verify_no_position(self, max_retries=5, retry_interval=3):
"""
验证当前无持仓
返回: True 表示无持仓可以开仓False 表示有持仓不能开仓
"""
for i in range(max_retries):
if self.get_position_status():
if self.start == 0:
logger.info(f"确认无持仓,可以开仓")
return True
else:
logger.warning(
f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
else:
logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓")
return False
def verify_position_direction(self, expected_direction):
"""
验证当前持仓方向是否与预期一致
expected_direction: 1 多仓, -1 空仓
返回: True 表示持仓方向正确False 表示不正确
"""
if self.get_position_status():
if self.start == expected_direction:
logger.info(f"持仓方向验证成功: {self.start}")
return True
else:
logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}")
return False
else:
logger.error("查询持仓状态失败")
return False
def execute_trade(self, signal, size=None):
"""执行交易。size 不传或为 None 时使用 default_order_size。"""
signal_type, trigger_price = signal
size = self.default_order_size if size is None else size
if signal_type == 'long':
# 开多前先确认无持仓
logger.info(f"准备开多,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开多成功")
return True
else:
logger.error("开多后持仓验证失败")
return False
elif signal_type == 'short':
# 开空前先确认无持仓
logger.info(f"准备开空,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开空成功")
return True
else:
logger.error("开空后持仓验证失败")
return False
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多):先平仓,确认无仓后再开多,避免双向持仓
logger.info(f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1) # 给交易所处理平仓的时间
# 轮询确认已无持仓再开多(最多等约 10 秒)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做多:平仓后仍有持仓,放弃本次开多")
return False
logger.info("已确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3)
if self.verify_position_direction(1):
logger.success("反手做多成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做多后持仓验证失败")
return False
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空):先平仓,确认无仓后再开空
logger.info(f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做空:平仓后仍有持仓,放弃本次开空")
return False
logger.info("已确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3)
if self.verify_position_direction(-1):
logger.success("反手做空成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做空后持仓验证失败")
return False
return False
def action(self):
"""主循环"""
logger.info("开始运行三分之一策略交易5分钟K线...")
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
page_start = True
while True:
if page_start:
# 打开浏览器
for i in range(5):
if self.openBrowser():
logger.info("浏览器打开成功")
break
else:
self.ding("打开浏览器失败!", error=True)
return
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True)
page_start = False
try:
# ========== 1. 获取K线数据 ==========
# 当前K线=最后一根;上一根=从后往前第一根实体涨幅>=min_prev_entity_pct 的K线
formatted = self.get_klines()
if not formatted or len(formatted) < 2:
logger.warning("获取K线失败等待重试...")
time.sleep(5)
continue
current_kline = formatted[-1]
prev_kline = None
# 向前搜索符合条件的K线实体涨幅 >= 阈值)
logger.debug(f"开始搜索上一根有效K线实体涨幅阈值: {self.min_prev_entity_pct}%")
for i in range(len(formatted) - 2, -1, -1):
k = formatted[i]
entity = abs(k['close'] - k['open'])
entity_pct = entity / k['open'] * 100 if k['open'] else 0
logger.debug(f" 检查K线 id={k['id']}: 实体涨幅={entity_pct:.3f}%")
if entity_pct >= self.min_prev_entity_pct:
prev_kline = k
logger.info(f"✓ 找到上一根有效K线: id={k['id']}, 实体涨幅={entity_pct:.3f}%")
break
if prev_kline is None:
logger.warning(f"✗ 没有实体涨幅>={self.min_prev_entity_pct:.3f}%的上一根K线跳过信号检测")
time.sleep(0.1)
continue
# 记录进入新的K线
current_kline_time = current_kline['id']
if self.last_kline_time != current_kline_time:
self.last_kline_time = current_kline_time
logger.info(f"进入新K线: {current_kline_time}")
# 新K线重置反手计数同一根K线最多允许 2 次反手)
self.reverse_count_kline_id = current_kline_time
self.reverse_count_in_kline = 0
# 新K线内重新判断多/空止盈触发
if self.start == 1:
self.take_profit_triggered_kline_id = None
if self.start == -1:
self.take_profit_triggered_kline_id_short = None
# ========== 2. 获取当前价格 ==========
current_price = self.get_current_price()
if not current_price:
logger.warning("获取价格失败,等待重试...")
time.sleep(2)
continue
# ========== 3. 获取持仓状态 ==========
# 每次循环都通过SDK获取真实持仓状态避免状态不同步导致双向持仓
if not self.get_position_status():
logger.warning("获取持仓状态失败,等待重试...")
time.sleep(2)
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
# ========== 4. 止盈逻辑(自动四等分) ==========
signal = None
# 4.1 多仓止盈open~high 达到极值后,回到 3/4 平仓
if self.start == 1:
# 同一根K线多仓只允许止盈一次避免“止盈->再开->再止盈”循环
if self.last_take_profit_kline_id == current_kline_time:
signal = self.check_signal(current_price, prev_kline, current_kline)
else:
long_levels = self.calc_long_take_profit_levels(current_kline)
if long_levels is not None:
# 计算涨幅百分比
gain_pct = (long_levels["extreme"] - long_levels["open"]) / long_levels["open"] * 100
# 止盈启动前提当前K线涨幅 > 0.4%,标记为已触发
if gain_pct > self.take_profit_min_gain_pct_from_entry:
self.take_profit_triggered_kline_id = current_kline_time
# 止盈执行条件:已触发 且 价格回调到3/4位置
if (self.take_profit_triggered_kline_id == current_kline_time
and current_price <= long_levels["close_threshold"]):
reason = (
f"当前K线开盘价={long_levels['open']:.2f},最高价={long_levels['extreme']:.2f}(4/4)"
f"涨幅={gain_pct:.2f}%;现价{current_price:.2f}已回调至≤{long_levels['close_threshold']:.2f}(3/4)"
f"按规则止盈平多"
)
self._log_take_profit_action("止盈平多仓", reason)
self.平仓()
self.take_profit_reentry_threshold = long_levels["reentry_threshold"]
self.take_profit_triggered_kline_id = None
self.last_take_profit_kline_id = current_kline_time
page_start = True
time.sleep(1)
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
# 4.2 多仓止盈后再开仓:价格继续回调到 2/4 才同向开多
elif self.start == 0 and self.take_profit_reentry_threshold is not None:
if current_price <= self.take_profit_reentry_threshold:
reason = (
f"止盈平仓后价格继续回调到{self.take_profit_reentry_threshold:.2f}(2/4),按规则同向开多"
)
self._log_take_profit_action("止盈后同向开多", reason)
self.开单(marketPriceLongOrder=1, size=self.default_order_size)
time.sleep(3)
if self.verify_position_direction(1):
self.last_open_time = time.time()
self.last_open_kline_id = current_kline_time
self._write_open_log("止盈后同向开多", reason, current_kline, prev_kline)
logger.success("止盈后再开多成功")
page_start = True
else:
logger.error("止盈后再开多验证失败")
self.take_profit_reentry_threshold = None
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
# 4.3 空仓止盈open~low 达到极值后,回到 3/4 平仓
elif self.start == -1:
# 同一根K线空仓只允许止盈一次避免“止盈->再开->再止盈”循环
if self.last_take_profit_kline_id == current_kline_time:
signal = self.check_signal(current_price, prev_kline, current_kline)
else:
short_levels = self.calc_short_take_profit_levels(current_kline)
if short_levels is not None:
# 计算跌幅百分比
drop_pct = (short_levels["open"] - short_levels["extreme"]) / short_levels["open"] * 100
# 止盈启动前提当前K线跌幅 > 0.4%,标记为已触发
if drop_pct > self.take_profit_min_gain_pct_from_entry:
self.take_profit_triggered_kline_id_short = current_kline_time
# 止盈执行条件:已触发 且 价格反弹到3/4位置
if (self.take_profit_triggered_kline_id_short == current_kline_time
and current_price >= short_levels["close_threshold"]):
reason = (
f"当前K线开盘价={short_levels['open']:.2f},最低价={short_levels['extreme']:.2f}(4/4)"
f"跌幅={drop_pct:.2f}%;现价{current_price:.2f}已反弹至≥{short_levels['close_threshold']:.2f}(3/4)"
f"按规则止盈平空"
)
self._log_take_profit_action("止盈平空仓", reason)
self.平仓()
self.take_profit_reentry_threshold_short = short_levels["reentry_threshold"]
self.take_profit_triggered_kline_id_short = None
self.last_take_profit_kline_id = current_kline_time
page_start = True
time.sleep(1)
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
# 4.4 空仓止盈后再开仓:价格反弹到 2/4 才同向开空
elif self.start == 0 and self.take_profit_reentry_threshold_short is not None:
if current_price >= self.take_profit_reentry_threshold_short:
reason = (
f"止盈平空后现价{current_price:.2f}反弹到{self.take_profit_reentry_threshold_short:.2f}(2/4),按规则同向开空"
)
self._log_take_profit_action("止盈后同向开空", reason)
self.开单(marketPriceLongOrder=-1, size=self.default_order_size)
time.sleep(3)
if self.verify_position_direction(-1):
self.last_open_time = time.time()
self.last_open_kline_id = current_kline_time
self._write_open_log("止盈后同向开空", reason, current_kline, prev_kline)
logger.success("止盈后再开空成功")
page_start = True
else:
logger.error("止盈后再开空验证失败")
self.take_profit_reentry_threshold_short = None
else:
signal = self.check_signal(current_price, prev_kline, current_kline)
else:
# 4.5 无止盈条件时,检查开仓/反手信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# ========== 5. 信号过滤(频率控制) ==========
# 5.1 反手过滤:冷却时间
if signal and signal[0].startswith('reverse_'):
if not self.can_reverse(current_kline_time):
signal = None
# 5.2 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却
if signal and signal[0] in ('long', 'short'):
if not self.can_open(current_kline_time):
signal = None
else:
self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录
# ========== 6. 执行交易 ==========
if signal:
trade_success = self.execute_trade(signal)
if trade_success:
if signal[0] in ('reverse_long', 'reverse_short'):
if self.reverse_count_kline_id != current_kline_time:
self.reverse_count_kline_id = current_kline_time
self.reverse_count_in_kline = 0
self.reverse_count_in_kline += 1
# 写入开仓日志参考的两根K线 + 开仓原因 + 详细计算参数
sig_type, trigger_price = signal
op_name = {"long": "开多", "short": "开空", "reverse_long": "反手做多", "reverse_short": "反手做空"}.get(sig_type, sig_type)
# 计算详细参数
prev_entity = abs(prev_kline['close'] - prev_kline['open'])
prev_entity_pct = (prev_entity / prev_kline['open']) * 100 if prev_kline['open'] else 0
base_open = current_kline['open']
price_diff = abs(current_price - trigger_price)
price_diff_pct = (price_diff / trigger_price) * 100 if trigger_price else 0
if sig_type == "long":
open_reason = (
f"三分之一策略开多:当前价{current_price:.2f} >= 触发价{trigger_price:.2f}\n"
f" 计算: 触发价 = 当前开盘{base_open:.2f} + 实体/3({prev_entity/3:.4f}) = {trigger_price:.2f}\n"
f" 超出触发价: {price_diff:.4f} ({price_diff_pct:.3f}%)"
)
elif sig_type == "short":
open_reason = (
f"三分之一策略开空:当前价{current_price:.2f} <= 触发价{trigger_price:.2f}\n"
f" 计算: 触发价 = 当前开盘{base_open:.2f} - 实体/3({prev_entity/3:.4f}) = {trigger_price:.2f}\n"
f" 低于触发价: {price_diff:.4f} ({price_diff_pct:.3f}%)"
)
elif sig_type == "reverse_long":
open_reason = (
f"持空反手做多:当前价{current_price:.2f} >= 触发价{trigger_price:.2f}\n"
f" 计算: 触发价 = 当前开盘{base_open:.2f} + 实体/3({prev_entity/3:.4f}) = {trigger_price:.2f}\n"
f" 超出触发价: {price_diff:.4f} ({price_diff_pct:.3f}%)"
)
else:
open_reason = (
f"持多反手做空:当前价{current_price:.2f} <= 触发价{trigger_price:.2f}\n"
f" 计算: 触发价 = 当前开盘{base_open:.2f} - 实体/3({prev_entity/3:.4f}) = {trigger_price:.2f}\n"
f" 低于触发价: {price_diff:.4f} ({price_diff_pct:.3f}%)"
)
self._write_open_log(op_name, open_reason, current_kline, prev_kline)
logger.success(f"🎉 交易执行完成: {op_name}, 当前持仓状态: {self.start}")
page_start = True
else:
logger.warning(f"交易执行失败或被阻止: {signal[0]}")
# 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(0.1)
if page_start:
self.page.close()
time.sleep(5)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()