日志展示优化

This commit is contained in:
ddrwode
2026-02-09 15:12:48 +08:00
parent b9cd4ffc67
commit 18452ce5b9

644
bitmart/交易.py Normal file
View File

@@ -0,0 +1,644 @@
import time
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.pbar = tqdm(total=30, desc="等待K线", ncols=80) # 可选:用于长时间等待时展示进度
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
# 反手频率控制
self.reverse_cooldown_seconds = 1.5 * 60 # 反手冷却时间(秒)
self.reverse_min_move_pct = 0.05 # 反手最小价差过滤(百分比)
self.last_reverse_time = None # 上次反手时间
# 开仓频率控制
self.open_cooldown_seconds = 60 # 开仓冷却时间(秒),两次开仓至少间隔此时长
self.last_open_time = None # 上次开仓时间
self.last_open_kline_id = None # 上次开仓所在 K 线 id同一根 K 线只允许开仓一次
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0 # 未使用;若启用则可为每次开仓占可用余额的百分比
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
self.default_order_size = 25 # 开仓/反手张数,统一在此修改
# 策略相关变量
self.prev_kline = None # 上一根K线
self.current_kline = None # 当前K线
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
return formatted[-2], formatted[-1]
return None, None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None, None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_pnl = None
return True
pos = positions[0]
self.start = 1 if pos['position_type'] == 1 else -1
self.open_avg_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
self.position_cross = pos["position_cross"]
# 直接从API获取未实现盈亏Bitmart返回的是 unrealized_value 字段)
self.unrealized_pnl = float(pos.get('unrealized_value', 0))
logger.debug(f"持仓详情: 方向={self.start}, 开仓均价={self.open_avg_price}, "
f"持仓量={self.current_amount}, 未实现盈亏={self.unrealized_pnl:.2f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def get_unrealized_pnl_usd(self):
"""
获取当前持仓未实现盈亏美元直接使用API返回值
"""
if self.start == 0 or self.unrealized_pnl is None:
return None
return self.unrealized_pnl
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
# ele.scroll.to_see(center=True)
# time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def calculate_upper_shadow(self, kline):
"""计算上阴线(上影线)涨幅百分比"""
# 上阴线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价)
body_top = max(kline['open'], kline['close'])
if body_top == 0:
return 0
return (kline['high'] - body_top) / body_top * 100
def calculate_lower_shadow(self, kline):
"""计算下阴线(下影线)跌幅百分比"""
# 下阴线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价)
body_bottom = min(kline['open'], kline['close'])
if body_bottom == 0:
return 0
return (body_bottom - kline['low']) / body_bottom * 100
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
"""
# 计算上一根K线实体
prev_entity = self.calculate_entity(prev_kline)
# 实体过小不交易(实体 < 0.1
if prev_entity < 0.1:
logger.info(f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测")
return None
# 获取上一根K线的实体上下边
prev_entity_edge = self.get_entity_edge(prev_kline)
prev_entity_upper = prev_entity_edge['upper'] # 实体上边
prev_entity_lower = prev_entity_edge['lower'] # 实体下边
# 优化:以下两种情况以当前这根的开盘价作为计算基准
# 1) 上一根阳线 且 当前开盘价 > 上一根收盘价(跳空高开)
# 2) 上一根阴线 且 当前开盘价 < 上一根收盘价(跳空低开)
prev_is_bullish_for_calc = prev_kline['close'] > prev_kline['open']
prev_is_bearish_for_calc = prev_kline['close'] < prev_kline['open']
current_open_above_prev_close = current_kline['open'] > prev_kline['close']
current_open_below_prev_close = current_kline['open'] < prev_kline['close']
use_current_open_as_base = (prev_is_bullish_for_calc and current_open_above_prev_close) or (prev_is_bearish_for_calc and current_open_below_prev_close)
if use_current_open_as_base:
# 以当前K线开盘价为基准计算跳空时用当前开盘价参与计算
calc_lower = current_kline['open']
calc_upper = current_kline['open'] # 同一基准,上下四分之一对称
long_trigger = calc_lower + prev_entity / 4
short_trigger = calc_upper - prev_entity / 4
long_breakout = calc_upper + prev_entity / 4
short_breakout = calc_lower - prev_entity / 4
else:
# 原有计算方式
long_trigger = prev_entity_lower + prev_entity / 4 # 做多触发价 = 实体下边 + 实体/4下四分之一处
short_trigger = prev_entity_upper - prev_entity / 4 # 做空触发价 = 实体上边 - 实体/4上四分之一处
long_breakout = prev_entity_upper + prev_entity / 4 # 做多突破价 = 实体上边 + 实体/4
short_breakout = prev_entity_lower - prev_entity / 4 # 做空突破价 = 实体下边 - 实体/4
# 上一根阴线 + 当前阳线做多形态不按上一根K线上三分之一做空
prev_is_bearish = prev_kline['close'] < prev_kline['open']
current_is_bullish = current_kline['close'] > current_kline['open']
skip_short_by_upper_third = prev_is_bearish and current_is_bullish
# 上一根阳线 + 当前阴线做空形态不按上一根K线下三分之一做多
prev_is_bullish = prev_kline['close'] > prev_kline['open']
current_is_bearish = current_kline['close'] < current_kline['open']
skip_long_by_lower_third = prev_is_bullish and current_is_bearish
if use_current_open_as_base:
if prev_is_bullish_for_calc and current_open_above_prev_close:
logger.info(f"上一根阳线且当前开盘价({current_kline['open']:.2f})>上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
else:
logger.info(f"上一根阴线且当前开盘价({current_kline['open']:.2f})<上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
logger.info(f"当前价格: {current_price:.2f}, 上一根实体: {prev_entity:.4f}")
logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}")
logger.info(f"做多触发价(下1/4): {long_trigger:.2f}, 做空触发价(上1/4): {short_trigger:.2f}")
logger.info(f"突破做多价(上1/4外): {long_breakout:.2f}, 突破做空价(下1/4外): {short_breakout:.2f}")
if skip_short_by_upper_third:
logger.info("上一根阴线+当前阳线(做多形态),不按上四分之一做空")
if skip_long_by_lower_third:
logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多")
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_breakout and not skip_long_by_lower_third:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 突破价(上1/4外) {long_breakout:.2f}")
return ('long', long_breakout)
elif current_price <= short_breakout and not skip_short_by_upper_third:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 突破价(下1/4外) {short_breakout:.2f}")
return ('short', short_breakout)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
# 反手条件1: 价格跌到上一根K线的上三分之一处做空触发价上一根阴线+当前阳线做多时跳过
if current_price <= short_trigger and not skip_short_by_upper_third:
logger.info(f"持多反手做空!价格 {current_price:.2f} <= 触发价(上1/4) {short_trigger:.2f}")
return ('reverse_short', short_trigger)
# 反手条件2: 上一根K线上阴线涨幅>0.01%,当前跌到上一根实体下边
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower:
logger.info(f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} <= 实体下边 {prev_entity_lower:.2f}")
return ('reverse_short', prev_entity_lower)
elif self.start == -1: # 持空仓
# 反手条件1: 价格涨到上一根K线的下三分之一处做多触发价上一根阳线+当前阴线做空时跳过
if current_price >= long_trigger and not skip_long_by_lower_third:
logger.info(f"持空反手做多!价格 {current_price:.2f} >= 触发价(下1/4) {long_trigger:.2f}")
return ('reverse_long', long_trigger)
# 反手条件2: 上一根K线下阴线跌幅>0.01%,当前涨到上一根实体上边
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper:
logger.info(f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} >= 实体上边 {prev_entity_upper:.2f}")
return ('reverse_long', prev_entity_upper)
return None
def can_open(self, current_kline_id):
"""开仓前过滤:同一根 K 线只开一次 + 开仓冷却时间。仅用于 long/short 新开仓。"""
now = time.time()
if self.last_open_kline_id is not None and self.last_open_kline_id == current_kline_id:
logger.info(f"开仓频率控制:本 K 线({current_kline_id})已开过仓,跳过")
return False
if self.last_open_time is not None and now - self.last_open_time < self.open_cooldown_seconds:
remain = self.open_cooldown_seconds - (now - self.last_open_time)
logger.info(f"开仓冷却中,剩余 {remain:.0f}")
return False
return True
def can_reverse(self, current_price, trigger_price):
"""反手前过滤:冷却时间 + 最小价差"""
now = time.time()
if self.last_reverse_time and now - self.last_reverse_time < self.reverse_cooldown_seconds:
remain = self.reverse_cooldown_seconds - (now - self.last_reverse_time)
logger.info(f"反手冷却中,剩余 {remain:.0f}")
return False
if trigger_price and trigger_price > 0:
move_pct = abs(current_price - trigger_price) / trigger_price * 100
if move_pct < self.reverse_min_move_pct:
logger.info(f"反手价差不足: {move_pct:.4f}% < {self.reverse_min_move_pct}%")
return False
return True
def verify_no_position(self, max_retries=5, retry_interval=3):
"""
验证当前无持仓
返回: True 表示无持仓可以开仓False 表示有持仓不能开仓
"""
for i in range(max_retries):
if self.get_position_status():
if self.start == 0:
logger.info(f"确认无持仓,可以开仓")
return True
else:
logger.warning(
f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
else:
logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓")
return False
def verify_position_direction(self, expected_direction):
"""
验证当前持仓方向是否与预期一致
expected_direction: 1 多仓, -1 空仓
返回: True 表示持仓方向正确False 表示不正确
"""
if self.get_position_status():
if self.start == expected_direction:
logger.info(f"持仓方向验证成功: {self.start}")
return True
else:
logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}")
return False
else:
logger.error("查询持仓状态失败")
return False
def execute_trade(self, signal, size=None):
"""执行交易。size 不传或为 None 时使用 default_order_size。"""
signal_type, trigger_price = signal
size = self.default_order_size if size is None else size
if signal_type == 'long':
# 开多前先确认无持仓
logger.info(f"准备开多,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开多成功")
return True
else:
logger.error("开多后持仓验证失败")
return False
elif signal_type == 'short':
# 开空前先确认无持仓
logger.info(f"准备开空,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开空成功")
return True
else:
logger.error("开空后持仓验证失败")
return False
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多):先平仓,确认无仓后再开多,避免双向持仓
logger.info(f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1) # 给交易所处理平仓的时间
# 轮询确认已无持仓再开多(最多等约 10 秒)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做多:平仓后仍有持仓,放弃本次开多")
return False
logger.info("已确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3)
if self.verify_position_direction(1):
logger.success("反手做多成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做多后持仓验证失败")
return False
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空):先平仓,确认无仓后再开空
logger.info(f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做空:平仓后仍有持仓,放弃本次开空")
return False
logger.info("已确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3)
if self.verify_position_direction(-1):
logger.success("反手做空成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做空后持仓验证失败")
return False
return False
def action(self):
"""主循环"""
logger.info("开始运行四分之一策略交易...")
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
page_start = True
while True:
if page_start:
# 打开浏览器
for i in range(5):
if self.openBrowser():
logger.info("浏览器打开成功")
break
else:
self.ding("打开浏览器失败!", error=True)
return
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True)
page_start = False
try:
# 1. 获取K线数据当前K线和上一根K线
prev_kline, current_kline = self.get_klines()
if not prev_kline or not current_kline:
logger.warning("获取K线失败等待重试...")
time.sleep(5)
continue
# 记录进入新的K线
current_kline_time = current_kline['id']
if self.last_kline_time != current_kline_time:
self.last_kline_time = current_kline_time
logger.info(f"进入新K线: {current_kline_time}")
# 2. 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.warning("获取价格失败,等待重试...")
time.sleep(2)
continue
# 3. 每次循环都通过SDK获取真实持仓状态避免状态不同步导致双向持仓
if not self.get_position_status():
logger.warning("获取持仓状态失败,等待重试...")
time.sleep(2)
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 5. 反手过滤:冷却时间 + 最小价差
if signal and signal[0].startswith('reverse_'):
if not self.can_reverse(current_price, signal[1]):
signal = None
# 5.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却
if signal and signal[0] in ('long', 'short'):
if not self.can_open(current_kline_time):
signal = None
else:
self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录
# 6. 有信号则执行交易
if signal:
trade_success = self.execute_trade(signal)
if trade_success:
logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}")
page_start = True
else:
logger.warning(f"交易执行失败或被阻止: {signal[0]}")
# 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(0.1)
if page_start:
self.page.close()
time.sleep(5)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()