日志展示优化

This commit is contained in:
ddrwode
2026-02-10 09:54:32 +08:00
parent 9665764194
commit d94de1bf64

View File

@@ -1,34 +1,29 @@
""" """
BitMart 分之一回归策略交易(双向触发版) BitMart 分之一策略交易(3分钟精准版)
使用5分钟K线周期,实时监测 使用3分钟K线周期计算触发价格实时监测同根K线内多空都触及时用开盘价距离判断先后
策略规则: 策略规则(与 bitmart/回测数据-五分之一策略-3分钟精准版.py 完全一致)
1. 触发价格计算基于有效的前一根K线实体>=0.1 1. 触发价格计算(基于有效的前一根3分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3 - 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3 - 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件: 2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号 - 当前3分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号 - 当前3分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑: 3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空 - 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多 - 做空时遇到做多信号 -> 平空并反手开多
- 同一根K线内只交易一次,防止频繁反手 - 同一根3分钟K线内只交易一次
示例1线): 4. 同根触发判断无需1分钟K线):
前一根K线开盘3000收盘3100阳线实体=100 - 当一根3分钟K线同时触及做多和做空价格时
- 做多触发价格 = 3100 + 33 = 3133继续上涨做多 - 使用该3分钟K线开盘价与触发价的距离判断先后
- 做空触发价格 = 3100 - 33 = 3067回调做空←当前跌到这里就做空
示例2阴线
前一根K线开盘3100收盘3000阴线实体=100
- 做多触发价格 = 3000 + 33 = 3033反弹做多
- 做空触发价格 = 3000 - 33 = 2967继续下跌做空
""" """
import random
import time import time
import datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm from tqdm import tqdm
from loguru import logger from loguru import logger
@@ -39,10 +34,12 @@ from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message from 交易.tools import send_dingtalk_message
# 创建全局线程池用于异步发送钉钉消息
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
class BitmartOneThirdStrategy:
class BitmartOneFifthStrategy:
def __init__(self, bit_id): def __init__(self, bit_id):
self.page: ChromiumPage | None = None self.page: ChromiumPage | None = None
self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a" self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a"
@@ -50,187 +47,102 @@ class BitmartOneThirdStrategy:
self.memo = "me" self.memo = "me"
self.contract_symbol = "ETHUSDT" self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None self.direction = None
self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期 self.pbar = tqdm(total=3, desc="等待K线", ncols=80) # 3分钟周期
self.last_kline_time = None self.last_kline_time = None
self.leverage = "40"
self.open_type = "cross"
self.risk_percent = 0.1
self.leverage = "40" # 高杠杆(全仓模式下可开更大仓位) self.open_avg_price = None
self.open_type = "cross" # 全仓模式 self.current_amount = None
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id self.bit_id = bit_id
# 分之一策略参数 # 分之一策略参数(与回测一致)
self.min_body_size = 0.1 # 最小实体大小 self.min_body_size = 0.1
self.kline_step = 5 # K线周期5分钟 self.kline_step = 3 # 3分钟K线
self.kline_count = 20 # 获取的K线数量用于向前查找有效K线 self.kline_count = 20
# 实时监测参数 self.check_interval = 3
self.check_interval = 3 # 检测间隔(秒) self.last_trigger_kline_id = None
self.last_trigger_kline_id = None # 记录上次触发信号的K线ID避免同一K线重复触发 self.last_trigger_direction = None
self.last_trigger_direction = None # 记录上次触发的方向 self.last_trade_kline_id = None
self.last_trade_kline_id = None # 记录上次实际交易的K线ID防止同一K线内频繁反手
# ========================= 分之一策略核心函数 ========================= # ========================= 分之一策略核心 =========================
def is_bullish(self, c): def is_bullish(self, c):
"""判断阳线"""
return float(c['close']) > float(c['open']) return float(c['close']) > float(c['open'])
def is_bearish(self, c): def is_bearish(self, c):
"""判断阴线"""
return float(c['close']) < float(c['open']) return float(c['close']) < float(c['open'])
def get_body_size(self, candle): def get_body_size(self, candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close'])) return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1): def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0: if current_idx <= 0:
return None, None return None, None
for i in range(current_idx - 1, -1, -1): for i in range(current_idx - 1, -1, -1):
prev = all_data[i] prev = all_data[i]
body_size = self.get_body_size(prev) if self.get_body_size(prev) >= min_body_size:
if body_size >= min_body_size:
return i, prev return i, prev
return None, None return None, None
def get_one_third_levels(self, prev): def get_one_fifth_levels(self, prev):
""" """
计算前一根K线实体的 1/3 双向触发价格 计算前一根K线实体的 1/5 双向触发价格
返回:(做多触发价格, 做空触发价格) 做多触发 = 收盘价 + 实体/5做空触发 = 收盘价 - 实体/5
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3实体
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3实体
示例:
阳线 open=3000, close=3100, 实体=100
- 做多触发 = 3100 + 33 = 3133继续涨
- 做空触发 = 3100 - 33 = 3067回调
阴线 open=3100, close=3000, 实体=100
- 做多触发 = 3000 + 33 = 3033反弹
- 做空触发 = 3000 - 33 = 2967继续跌
""" """
p_open = float(prev['open']) p_open = float(prev['open'])
p_close = float(prev['close']) p_close = float(prev['close'])
body = abs(p_open - p_close) body = abs(p_open - p_close)
if body < 0.001:
if body < 0.001: # 十字星,忽略
return None, None return None, None
long_trigger = p_close + body / 5
# 基于收盘价的双向触发价格 short_trigger = p_close - body / 5
long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多
short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空
return long_trigger, short_trigger return long_trigger, short_trigger
def check_trigger(self, all_data, current_idx):
"""
检查当前K线是否触发了交易信号双向检测
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则:
- 当前K线高点 >= 做多触发价格 → 做多信号
- 当前K线低点 <= 做空触发价格 → 做空信号
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size)
if prev is None:
return None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None
# 使用影线部分high/low来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发,判断哪个先触发
if long_triggered and short_triggered:
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx
else:
return 'long', long_trigger, valid_prev_idx
if short_triggered:
return 'short', short_trigger, valid_prev_idx
if long_triggered:
return 'long', long_trigger, valid_prev_idx
return None, None, None
def check_realtime_trigger(self, kline_data): def check_realtime_trigger(self, kline_data):
""" """
实时检测当前K线是否触发信号双向检测 检查当前3分钟K线是否触发交易信号(与回测逻辑完全一致
基于已收盘的K线计算触发价格用当前正在形成的K线判断 若同时触发多空则用开盘价距离判断先后顺序不请求1分钟K线
参数:
kline_data: K线数据列表
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
""" """
if len(kline_data) < 2: if len(kline_data) < 2:
return None, None, None, None return None, None, None, None
# 当前正在形成的K线最后一根未收盘
curr = kline_data[-1] curr = kline_data[-1]
curr_kline_id = curr['id'] curr_kline_id = curr['id']
valid_prev_idx, prev = self.find_valid_prev_bar(
# 从倒数第二根开始往前找有效K线已收盘的K线 kline_data, len(kline_data) - 1, self.min_body_size
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size) )
if prev is None: if prev is None:
return None, None, None, None return None, None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev) long_trigger, short_trigger = self.get_one_fifth_levels(prev)
if long_trigger is None: if long_trigger is None:
return None, None, None, None return None, None, None, None
# 使用当前K线的实时高低点来判断
c_high = float(curr['high']) c_high = float(curr['high'])
c_low = float(curr['low']) c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
# 确定触发方向
direction = None direction = None
trigger_price = None trigger_price = None
if long_triggered and short_triggered: # 如果同时触发多空用开盘价距离判断先后顺序避免请求1分钟K线
# 两个方向都触发,判断哪个先(距离开盘价更近的先触发) if both_triggered:
c_open = float(curr['open']) c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open) dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open) dist_to_short = abs(short_trigger - c_open)
@@ -250,27 +162,24 @@ class BitmartOneThirdStrategy:
if direction is None: if direction is None:
return None, None, None, None return None, None, None, None
# 检查是否在同一根K线内已经触发相同方向 # 避免同一根K线内重复触发相同信号
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None return None, None, None, None
return direction, trigger_price, prev, curr return direction, trigger_price, prev, curr
# ========================= BitMart API 函数 ========================= # ========================= BitMart API =========================
def get_klines(self): def get_klines(self):
"""获取最近N根5分钟K线""" """获取最近3分钟K线"""
try: try:
end_time = int(time.time()) end_time = int(time.time())
# 获取足够多的K线用于向前查找有效K线
response = self.contractAPI.get_kline( response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol, contract_symbol=self.contract_symbol,
step=self.kline_step, # 5分钟 step=self.kline_step,
start_time=end_time - 3600 * 3, # 取最近3小时 start_time=end_time - 3600 * 3,
end_time=end_time end_time=end_time
)[0]["data"] )[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = [] formatted = []
for k in response: for k in response:
formatted.append({ formatted.append({
@@ -284,7 +193,6 @@ class BitmartOneThirdStrategy:
return formatted return formatted
except Exception as e: except Exception as e:
error_msg = str(e) error_msg = str(e)
# 检查是否是429限流错误
if "429" in error_msg or "too many requests" in error_msg.lower(): if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}") logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60) time.sleep(60)
@@ -294,12 +202,11 @@ class BitmartOneThirdStrategy:
return None return None
def get_current_price(self): def get_current_price(self):
"""获取当前最新价格"""
try: try:
end_time = int(time.time()) end_time = int(time.time())
response = self.contractAPI.get_kline( response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol, contract_symbol=self.contract_symbol,
step=1, # 1分钟 step=1,
start_time=end_time - 3600 * 3, start_time=end_time - 3600 * 3,
end_time=end_time end_time=end_time
)[0] )[0]
@@ -311,14 +218,13 @@ class BitmartOneThirdStrategy:
return None return None
def get_available_balance(self): def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try: try:
response = self.contractAPI.get_assets_detail()[0] response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000: if response['code'] == 1000:
data = response['data'] data = response['data']
if isinstance(data, dict): if isinstance(data, dict):
return float(data.get('available_balance', 0)) return float(data.get('available_balance', 0))
elif isinstance(data, list): if isinstance(data, list):
for asset in data: for asset in data:
if asset.get('currency') == 'USDT': if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0)) return float(asset.get('available_balance', 0))
@@ -328,7 +234,6 @@ class BitmartOneThirdStrategy:
return None return None
def get_position_status(self): def get_position_status(self):
"""获取当前持仓方向"""
try: try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000: if response['code'] == 1000:
@@ -344,14 +249,12 @@ class BitmartOneThirdStrategy:
self.current_amount = positions[0]['current_amount'] self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"] self.position_cross = positions[0]["position_cross"]
return True return True
else: return False
return False
except Exception as e: except Exception as e:
logger.error(f"持仓查询异常: {e}") logger.error(f"持仓查询异常: {e}")
return False return False
def set_leverage(self): def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try: try:
response = self.contractAPI.post_submit_leverage( response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol, contract_symbol=self.contract_symbol,
@@ -361,38 +264,34 @@ class BitmartOneThirdStrategy:
if response['code'] == 1000: if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True return True
else: logger.error(f"杠杆设置失败: {response}")
logger.error(f"杠杆设置失败: {response}") return False
return False
except Exception as e: except Exception as e:
logger.error(f"设置杠杆异常: {e}") logger.error(f"设置杠杆异常: {e}")
return False return False
# ========================= 浏览器自动化函数 ========================= # ========================= 浏览器 =========================
def openBrowser(self): def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try: try:
bit_port = openBrowser(id=self.bit_id) bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions() co = ChromiumOptions()
co.set_local_port(port=bit_port) co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co) self.page = ChromiumPage(addr_or_opts=co)
return True return True
except: except Exception:
return False return False
def close_extra_tabs_in_browser(self): def close_extra_tabs_in_browser(self):
"""关闭多余 tab"""
try: try:
for idx, tab in enumerate(self.page.get_tabs()): for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0: if idx > 0:
tab.close() tab.close()
return True return True
except: except Exception:
return False return False
def click_safe(self, xpath, sleep=0.5): def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try: try:
ele = self.page.ele(xpath) ele = self.page.ele(xpath)
if not ele: if not ele:
@@ -401,29 +300,21 @@ class BitmartOneThirdStrategy:
time.sleep(sleep) time.sleep(sleep)
ele.click(by_js=True) ele.click(by_js=True)
return True return True
except: except Exception:
return False return False
def 平仓(self): def 平仓(self):
"""市价平仓"""
logger.info("执行平仓操作...") logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]') self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5) time.sleep(0.5)
self.ding(msg="执行平仓操作") self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None): def 开单(self, marketPriceLongOrder=0, size=None):
"""
市价开单
marketPriceLongOrder: 1 做多, -1 做空
"""
if size is None or size <= 0: if size is None or size <= 0:
logger.warning("开单金额无效") logger.warning("开单金额无效")
return False return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空" direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}") logger.info(f"执行{direction_str}操作,金额: {size}")
size = 40
try: try:
if marketPriceLongOrder == -1: if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]')
@@ -433,7 +324,6 @@ class BitmartOneThirdStrategy:
self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size) self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}") self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True return True
except Exception as e: except Exception as e:
@@ -441,242 +331,182 @@ class BitmartOneThirdStrategy:
return False return False
def ding(self, msg, error=False): def ding(self, msg, error=False):
"""统一消息格式""" """
prefix = "❌三分之一策略:" if error else "🔔三分之一策略:" 异步发送钉钉消息不阻塞主程序的K线获取和交易操作
"""
prefix = "❌五分之一策略:" if error else "🔔五分之一策略:"
full_msg = f"{prefix}{msg}"
if error: if error:
logger.error(msg) logger.error(msg)
# 异步发送多条错误消息
for i in range(10): for i in range(10):
send_dingtalk_message(f"{prefix}{msg}") ding_executor.submit(self._send_ding_safe, full_msg)
else: else:
logger.info(msg) logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}") # 异步发送单条消息
ding_executor.submit(self._send_ding_safe, full_msg)
# ========================= 时间计算函数 ========================= def _send_ding_safe(self, msg):
"""
安全发送钉钉消息,捕获异常防止线程崩溃
"""
try:
send_dingtalk_message(msg)
except Exception as e:
logger.warning(f"钉钉消息发送失败: {e}")
def get_now_time(self): # ========================= 主循环 =========================
"""获取当前5分钟整点时间戳"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的5分钟整点
minute = current_datetime.minute
target_minute = (minute // 5) * 5 # 向下取整到5分钟
target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0)
return int(target_datetime.timestamp())
def get_time_to_next_5min(self):
"""获取距离下一个5分钟的秒数"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
minute = current_datetime.minute
next_5min = ((minute // 5) + 1) * 5
if next_5min >= 60:
next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)
else:
next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0)
return (next_datetime - current_datetime).total_seconds()
# ========================= 主运行函数 =========================
def action(self): def action(self):
"""主运行逻辑 - 实时监测版本"""
# 启动时设置全仓高杠杆
if not self.set_leverage(): if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败") logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return return
# 1. 打开浏览器
if not self.openBrowser(): if not self.openBrowser():
self.ding("打开浏览器失败!", error=True) self.ding("打开浏览器失败!", error=True)
return return
logger.info("浏览器打开成功") logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser(): if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功') logger.info('关闭多余标签页成功')
else: else:
logger.info('关闭多余标签页失败') logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2) time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(f"五分之一策略3分钟K线开始实时监测检测间隔: {self.check_interval}")
logger.info(f"开始实时监测,检测间隔: {self.check_interval}")
# 用于定时发送持仓信息每5分钟发一次
last_report_time = 0 last_report_time = 0
report_interval = 300 # 5分钟报告一次持仓 report_interval = 300
while True: while True:
# 1. 打开浏览器
for i in range(5): for i in range(5):
if self.openBrowser(): if self.openBrowser():
break break
time.sleep(5) time.sleep(5)
else: else:
self.ding("打开浏览器失败!", error=True) self.ding("打开浏览器失败!", error=True)
return return
logger.info("浏览器打开成功") logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser(): if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功') logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2) time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]')
try: try:
# 获取K线数据
kline_data = self.get_klines() kline_data = self.get_klines()
if not kline_data: if not kline_data or len(kline_data) < 3:
logger.warning("获取K线数据失败,等待重试...") logger.warning("K线数据不足,等待重试...")
time.sleep(self.check_interval) time.sleep(self.check_interval)
continue continue
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(self.check_interval)
continue
# 获取当前K线信息用于日志
curr = kline_data[-1] curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
# ========== 实时信号检测 ========== # 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败,使用缓存的持仓状态")
# 检测五分之一策略信号(与回测逻辑完全一致)
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction: if direction:
curr_kline_id = curr_kline['id'] curr_kline_id = curr_kline['id']
# 检查是否在同一K线内已经交易过防止频繁反手
if self.last_trade_kline_id == curr_kline_id: if self.last_trade_kline_id == curr_kline_id:
logger.debug(f"同一K线内已交易跳过本次{direction}信号")
# 更新触发记录,避免重复日志
self.last_trigger_kline_id = curr_kline_id self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction self.last_trigger_direction = direction
time.sleep(self.check_interval) time.sleep(self.check_interval)
continue continue
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev) prev_body = self.get_body_size(valid_prev)
# 检查信号与持仓是否同向(避免重复日志)
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
# 信号与持仓同向,静默忽略
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
logger.info(f"{'=' * 50}") logger.info(f"{'=' * 50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}") logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info( logger.info(
f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}") f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info( logger.info(
f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}") f" 当前3分钟K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)") logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance() balance = self.get_available_balance()
if balance is None: trade_size = (balance or 0) * self.risk_percent
balance = 0
trade_size = balance * self.risk_percent
executed = False executed = False
# 执行交易逻辑(与回测一致)
if direction == "long": if direction == "long":
if self.start == -1: # 当前空仓,平空开多 if self.start == -1:
# 持空仓遇到做多信号 -> 平空并反手开多
logger.info("📈 平空仓,反手开多") logger.info("📈 平空仓,反手开多")
self.平仓() self.平仓()
time.sleep(1) time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size) self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True executed = True
elif self.start == 0: # 当前无仓,直接开多 elif self.start == 0:
# 无仓位遇到做多信号 -> 开多
logger.info("📈 无仓位,开多") logger.info("📈 无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size) self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True executed = True
elif self.start == 1:
# 持多仓遇到做多信号 -> 不操作
logger.debug("已持有多仓,忽略做多信号")
elif direction == "short": elif direction == "short":
if self.start == 1: # 当前多仓,平多开空 if self.start == 1:
# 持多仓遇到做空信号 -> 平多并反手开空
logger.info("📉 平多仓,反手开空") logger.info("📉 平多仓,反手开空")
self.平仓() self.平仓()
time.sleep(1) time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size) self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True executed = True
elif self.start == 0: # 当前无仓,直接开空 elif self.start == 0:
# 无仓位遇到做空信号 -> 开空
logger.info("📉 无仓位,开空") logger.info("📉 无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size) self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True executed = True
elif self.start == -1:
# 持空仓遇到做空信号 -> 不操作
logger.debug("已持有空仓,忽略做空信号")
# 记录本次触发
self.last_trigger_kline_id = curr_kline_id self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction self.last_trigger_direction = direction
if executed: if executed:
# 记录交易K线防止同一K线内频繁反手
self.last_trade_kline_id = curr_kline_id self.last_trade_kline_id = curr_kline_id
# 交易后立即发送持仓信息
self.get_position_status() self.get_position_status()
self._send_position_message(curr_kline) self._send_position_message(curr_kline)
last_report_time = time.time() last_report_time = time.time()
logger.info(f"{'=' * 50}") logger.info(f"{'=' * 50}")
else: else:
# 没有信号时,显示实时价格
logger.debug( logger.debug(
f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
# ========== 定时发送持仓信息 ========== if time.time() - last_report_time >= report_interval:
current_time = time.time()
if current_time - last_report_time >= report_interval:
if self.get_position_status(): if self.get_position_status():
self._send_position_message(kline_data[-1]) self._send_position_message(kline_data[-1])
last_report_time = current_time last_report_time = time.time()
# 等待下次检测
time.sleep(self.check_interval) time.sleep(self.check_interval)
except Exception as e: except Exception as e:
logger.error(f"主循环异常: {e}") logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval) time.sleep(self.check_interval)
time.sleep(3)
time.sleep(15) if random.randint(1,10)>7:
self.page.close() self.page.close()
time.sleep(15) time.sleep(15)
def _send_position_message(self, latest_kline): def _send_position_message(self, latest_kline):
"""发送持仓信息到钉钉"""
current_price = float(latest_kline["close"]) current_price = float(latest_kline["close"])
balance = self.get_available_balance() balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0 self.balance = balance if balance is not None else 0.0
if self.start != 0: if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0 current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if hasattr(self, position_cross = float(self.position_cross) if getattr(self, 'position_cross', None) else 0.0
'position_cross') and self.position_cross else 0.0 if self.start == 1:
# 计算浮动盈亏
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else: # 空头 else:
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
# 计算收益率
if open_avg_price > 0: if open_avg_price > 0:
if self.start == 1: if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
@@ -685,12 +515,10 @@ class BitmartOneThirdStrategy:
rate_str = f" ({pnl_rate:+.2f}%)" rate_str = f" ({pnl_rate:+.2f}%)"
else: else:
rate_str = "" rate_str = ""
direction_str = "" if self.start == -1 else "" direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT" pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = ( msg = (
f"分之一策略 {self.contract_symbol} 5分钟】\n" f"分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:{direction_str}\n" f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n" f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n" f"开仓均价:{open_avg_price:.2f} USDT\n"
@@ -701,15 +529,21 @@ class BitmartOneThirdStrategy:
) )
else: else:
msg = ( msg = (
f"分之一策略 {self.contract_symbol} 5分钟】\n" f"分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:无\n" f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n" f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt" f"账户可用余额:{self.balance:.2f} usdt"
) )
self.ding(msg=msg) self.ding(msg=msg)
if __name__ == '__main__': if __name__ == '__main__':
# 启动三分之一策略交易 try:
BitmartOneThirdStrategy(bit_id="62f9107d0c674925972084e282df55b3").action() BitmartOneFifthStrategy(bit_id="62f9107d0c674925972084e282df55b3").action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
# 关闭线程池,等待所有钉钉消息发送完成
logger.info("正在关闭钉钉消息线程池...")
ding_executor.shutdown(wait=True)
logger.info("线程池已关闭")