Files
lm_code/三分之一策略交易.py
Administrator fd151f7a80 haha
2026-02-04 00:58:52 +08:00

425 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
# 策略相关变量
self.prev_kline = None # 上一根K线
self.current_kline = None # 当前K线
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 30分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
return formatted[-2], formatted[-1]
return None, None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None, None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = float(positions[0]['open_avg_price'])
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def calculate_upper_shadow(self, kline):
"""计算上阴线(上影线)涨幅百分比"""
# 上阴线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价)
body_top = max(kline['open'], kline['close'])
if body_top == 0:
return 0
return (kline['high'] - body_top) / body_top * 100
def calculate_lower_shadow(self, kline):
"""计算下阴线(下影线)跌幅百分比"""
# 下阴线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价)
body_bottom = min(kline['open'], kline['close'])
if body_bottom == 0:
return 0
return (body_bottom - kline['low']) / body_bottom * 100
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
"""
# 计算上一根K线实体
prev_entity = self.calculate_entity(prev_kline)
# 实体过小不交易(实体 < 0.1
if prev_entity < 0.1:
logger.info(f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测")
return None
# 获取上一根K线的实体上下边
prev_entity_edge = self.get_entity_edge(prev_kline)
prev_entity_upper = prev_entity_edge['upper'] # 实体上边
prev_entity_lower = prev_entity_edge['lower'] # 实体下边
# 计算触发价基于上一根K线实体位置
long_trigger = prev_entity_lower + prev_entity / 3 # 做多触发价 = 实体下边 + 实体/3下三分之一处
short_trigger = prev_entity_upper - prev_entity / 3 # 做空触发价 = 实体上边 - 实体/3上三分之一处
logger.info(f"当前价格: {current_price:.2f}, 上一根实体: {prev_entity:.4f}")
logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}")
logger.info(f"做多触发价(下1/3): {long_trigger:.2f}, 做空触发价(上1/3): {short_trigger:.2f}")
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_trigger:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 触发价(下1/3) {long_trigger:.2f}")
return ('long', long_trigger)
elif current_price <= short_trigger:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 触发价(上1/3) {short_trigger:.2f}")
return ('short', short_trigger)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
# 反手条件1: 价格跌到上一根K线的上三分之一处做空触发价
if current_price <= short_trigger:
logger.info(f"持多反手做空!价格 {current_price:.2f} <= 触发价(上1/3) {short_trigger:.2f}")
return ('reverse_short', short_trigger)
# 反手条件2: 上一根K线上阴线涨幅>0.01%,当前跌到上一根实体下边
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower:
logger.info(f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} <= 实体下边 {prev_entity_lower:.2f}")
return ('reverse_short', prev_entity_lower)
elif self.start == -1: # 持空仓
# 反手条件1: 价格涨到上一根K线的下三分之一处做多触发价
if current_price >= long_trigger:
logger.info(f"持空反手做多!价格 {current_price:.2f} >= 触发价(下1/3) {long_trigger:.2f}")
return ('reverse_long', long_trigger)
# 反手条件2: 上一根K线下阴线跌幅>0.01%,当前涨到上一根实体上边
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper:
logger.info(f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} >= 实体上边 {prev_entity_upper:.2f}")
return ('reverse_long', prev_entity_upper)
return None
def execute_trade(self, signal, size=1):
"""执行交易"""
signal_type, trigger_price = signal
size= 25
if signal_type == 'long':
# 开多
logger.info(f"执行开多,触发价: {trigger_price:.2f}")
self.开单(marketPriceLongOrder=1, size=size)
self.start = 1
elif signal_type == 'short':
# 开空
logger.info(f"执行开空,触发价: {trigger_price:.2f}")
self.开单(marketPriceLongOrder=-1, size=size)
self.start = -1
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多)
logger.info(f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=size)
self.start = 1
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空)
logger.info(f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=size)
self.start = -1
def action(self):
"""主循环"""
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 打开浏览器
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(3)
logger.info("开始运行三分之一策略交易...")
# 标记是否刚执行过交易(用于跳过从交易所获取持仓状态,避免延迟问题)
just_traded = False
while True:
try:
# 1. 获取K线数据当前K线和上一根K线
prev_kline, current_kline = self.get_klines()
if not prev_kline or not current_kline:
logger.warning("获取K线失败等待重试...")
time.sleep(5)
continue
# 2. 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.warning("获取价格失败,等待重试...")
time.sleep(2)
continue
# 3. 获取持仓状态如果刚交易过信任本地状态跳过API查询避免延迟
if not just_traded:
if not self.get_position_status():
logger.warning("获取持仓状态失败,等待重试...")
time.sleep(2)
continue
else:
logger.info(f"刚执行交易,信任本地持仓状态: {self.start}")
just_traded = False # 重置标记
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 5. 有信号则执行交易
if signal:
self.execute_trade(signal, size=1)
logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}")
just_traded = True # 标记刚执行过交易
# 交易后立即再次检查是否有反手信号同一根K线内可能多次反手
time.sleep(1) # 短暂等待
# 重新获取价格,检查是否需要再次反手
new_price = self.get_current_price()
if new_price:
new_signal = self.check_signal(new_price, prev_kline, current_kline)
if new_signal:
logger.info(f"检测到连续反手信号!当前价格: {new_price:.2f}")
self.execute_trade(new_signal, size=1)
logger.success(f"连续反手执行完成: {new_signal[0]}, 当前持仓状态: {self.start}")
time.sleep(1) # 交易后稍等
continue # 立即进入下一次循环继续监控
# 6. 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(3)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()