Files
lm_code/交易/weex-结构优化.py

954 lines
35 KiB
Python
Raw Permalink Normal View History

2025-12-18 14:20:15 +08:00
"""
WEEX ETH-USDT 永续合约自动交易系统
优化版本改进代码结构模块化和可读性
"""
import time
import datetime
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from curl_cffi import requests
2026-01-13 15:17:08 +08:00
from bit_tools import openBrowser
2025-12-18 14:20:15 +08:00
from 交易.tools import send_dingtalk_message
# ==================== 配置常量 ====================
class Config:
"""配置类:集中管理所有配置常量"""
# TGE浏览器配置
TGE_URL = "http://127.0.0.1:50326"
TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91"
# WEEX API配置
CONTRACT_ID = "10000002"
PRODUCT_CODE = "cmt_ethusdt"
KLINE_TYPE = "MINUTE_30"
KLINE_LIMIT = 300
# 交易页面URL
2026-01-12 09:34:55 +08:00
TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT"
2025-12-18 14:20:15 +08:00
# 交易配置
POSITION_RATIO = 100 # 开仓金额比例余额的1/100
# 重试配置
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 1 # 秒
# ==================== K线分析器 ====================
class KlineAnalyzer:
"""K线分析器负责K线数据分析和信号判断"""
@staticmethod
def is_bullish(candle: Dict) -> bool:
"""判断是否为阳线"""
return float(candle['close']) > float(candle['open'])
@staticmethod
def is_bearish(candle: Dict) -> bool:
"""判断是否为阴线"""
return float(candle['close']) < float(candle['open'])
@staticmethod
def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]:
"""
包住形态信号判定30分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
Returns:
(direction, signal_type): direction为"long""short"signal_type为信号类型
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if (KlineAnalyzer.is_bullish(curr) and
2025-12-19 10:35:02 +08:00
KlineAnalyzer.is_bearish(prev) and
int(c_open) <= int(p_close) and
int(c_close) >= int(p_open)):
2025-12-18 14:20:15 +08:00
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if (KlineAnalyzer.is_bearish(curr) and
2025-12-19 10:35:02 +08:00
KlineAnalyzer.is_bullish(prev) and
int(c_open) >= int(p_close) and
int(c_close) <= int(p_open)):
2025-12-18 14:20:15 +08:00
return "short", "bull_bear_engulf"
return None, None
# ==================== 浏览器管理器 ====================
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict):
self.tge_id = tge_id
self.tge_url = tge_url
self.tge_headers = tge_headers
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
2026-01-13 15:17:08 +08:00
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
2025-12-18 14:20:15 +08:00
try:
2026-01-13 15:17:08 +08:00
bit_port = openBrowser(id=self.tge_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.tge_port = bit_port
2025-12-18 14:20:15 +08:00
return True
2026-01-13 15:17:08 +08:00
except:
2025-12-18 14:20:15 +08:00
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
logger.success("成功接管浏览器")
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
closed_count = 0
for idx, tab in enumerate(tabs):
if idx == 0:
continue
tab.close()
closed_count += 1
if closed_count > 0:
logger.info(f"已关闭{closed_count}个多余标签页")
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
# ==================== WEEX API客户端 ====================
class WEEXApiClient:
"""WEEX API客户端负责与WEEX API的交互"""
def __init__(self):
self.session = requests.Session()
self.headers: Optional[Dict] = None
def update_headers(self, headers: Dict) -> None:
"""更新session的headers"""
if not self.headers:
self.session.headers = headers
else:
self.session.headers.update(headers)
self.headers = headers
def get_kline_data(self, contract_id: str = Config.CONTRACT_ID,
2025-12-19 10:35:02 +08:00
product_code: str = Config.PRODUCT_CODE,
kline_type: str = Config.KLINE_TYPE,
limit: int = Config.KLINE_LIMIT) -> List[Dict]:
2025-12-18 14:20:15 +08:00
"""
获取K线数据
Args:
contract_id: 合约ID
product_code: 产品代码
kline_type: K线类型
limit: 返回数量限制
Returns:
K线数据列表
"""
params = {
'contractId': contract_id,
'productCode': product_code,
'priceType': 'LAST_PRICE',
'klineType': kline_type,
'limit': str(limit),
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
logger.info(f"获取最新K线数据{attempt + 1}/{Config.MAX_RETRY_ATTEMPTS}次尝试...")
try:
response = self.session.get(
'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params,
timeout=15
)
# 检查响应状态码
if response.status_code != 200:
logger.warning(f"获取K线数据失败HTTP状态码{response.status_code}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
# 尝试解析JSON
try:
response_data = response.json()
except Exception as json_error:
logger.warning(f"JSON解析失败响应内容{response.text[:200]}...")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
# 检查数据结构
if "data" not in response_data or "dataList" not in response_data["data"]:
logger.warning(f"响应数据格式不正确:{response_data}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
result = response_data["data"]["dataList"]
kline_data = []
for item in result:
# WEEX API返回格式: [close, high, low, open, id]
kline_data.append({
'id': int(item[4]),
'open': float(item[3]),
'high': float(item[1]),
'low': float(item[2]),
'close': float(item[0])
})
logger.success(f"成功获取{len(kline_data)}条K线数据")
return kline_data
except Exception as e:
logger.warning(f"获取K线数据失败{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取K线数据失败已重试3次仍未成功")
return []
def get_available_balance(self) -> Optional[float]:
"""获取可用余额"""
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
timeout=15
)
balance = float(response.json()["data"]["newContract"]["balanceList"][0]["available"])
logger.success(f"成功获取可用余额: {balance}")
return balance
except Exception as e:
logger.warning(f"获取余额失败(第{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取余额失败已重试3次仍未成功")
return None
2026-01-21 18:17:27 +08:00
def get_position_status(self) -> Tuple[bool, Optional[List]]:
2025-12-18 14:20:15 +08:00
"""
获取持仓状态
Returns:
2026-01-21 18:17:27 +08:00
(success, position_data): success为是否成功position_data为持仓信息列表
2025-12-18 14:20:15 +08:00
"""
json_data = {
'filterContractIdList': [10000002],
'limit': 100,
'languageType': 0,
'sign': 'SIGN',
'timeZone': 'string',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
2026-01-21 18:14:14 +08:00
# 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
2025-12-18 14:20:15 +08:00
json=json_data,
timeout=15
)
datas = response.json()["data"]["dataList"]
if not datas:
return True, None
2026-01-21 18:14:14 +08:00
latest_order = datas
2025-12-18 14:20:15 +08:00
return True, latest_order
except Exception as e:
logger.warning(f"获取持仓状态失败(第{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False, None
# ==================== Token管理器 ====================
class TokenManager:
"""Token管理器负责获取和更新认证token"""
def __init__(self, api_client: WEEXApiClient, page: ChromiumPage):
self.api_client = api_client
self.page = page
def get_token(self) -> bool:
"""从浏览器中获取U-TOKEN"""
tab = self.page.new_tab()
tab.listen.start("/user/security/getLanguageType")
try:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
tab.get(url=Config.TRADING_URL)
res = tab.listen.wait(timeout=5)
if res.request.headers.get("U-TOKEN"):
headers = res.request.headers
self.api_client.update_headers(headers)
logger.success("成功获取token")
return True
except Exception as e:
logger.warning(f"获取token尝试{attempt + 1}失败: {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取token失败已重试3次仍未成功")
return False
finally:
tab.close()
# ==================== 交易执行器 ====================
class TradingExecutor:
"""交易执行器:负责执行交易操作"""
def __init__(self, page: ChromiumPage, api_client: WEEXApiClient):
self.page = page
self.api_client = api_client
def navigate_to_trading_page(self) -> bool:
"""导航到交易页面(选择市价)"""
try:
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
return True
except Exception as e:
logger.error(f"导航到交易页面失败: {e}")
return False
def close_all_positions(self) -> bool:
"""平仓所有持仓(闪电平仓)"""
try:
2026-01-12 09:34:55 +08:00
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True)
2025-12-18 14:20:15 +08:00
time.sleep(1)
2026-01-21 18:14:14 +08:00
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True)
2025-12-18 14:20:15 +08:00
time.sleep(3)
logger.success("成功执行平仓操作")
return True
except Exception as e:
logger.error(f"平仓失败: {e}")
return False
def open_long(self, amount: float) -> bool:
"""开多"""
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
2026-01-12 09:34:55 +08:00
self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True)
2025-12-18 14:20:15 +08:00
logger.success(f"成功开多,金额:{amount}")
return True
except Exception as e:
logger.error(f"开多失败: {e}")
return False
def open_short(self, amount: float) -> bool:
"""开空"""
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
2026-01-12 09:34:55 +08:00
self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True)
2025-12-18 14:20:15 +08:00
logger.success(f"成功开空,金额:{amount}")
return True
except Exception as e:
logger.error(f"开空失败: {e}")
return False
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
"""
执行交易操作
Args:
direction: 交易方向 "long" "short"
current_position: 当前持仓状态 -1:, 0:, 1:
amount: 交易金额
Returns:
2025-12-18 16:42:26 +08:00
是否成功如果已持有相同方向仓位返回True但不执行操作
2025-12-18 14:20:15 +08:00
"""
2025-12-18 16:42:26 +08:00
# 检查是否已经持有相同方向的仓位
if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \
2025-12-19 10:35:02 +08:00
(direction == "short" and current_position == PositionManager.POSITION_SHORT):
2025-12-18 16:42:26 +08:00
logger.info(f"已持有{direction}方向仓位,无需重复开仓")
return True # 返回True表示"状态正常",虽然没有执行交易
2025-12-18 14:20:15 +08:00
if not self.navigate_to_trading_page():
return False
try:
# 开多
if direction == "long":
if current_position == 0:
logger.info(f"{datetime.datetime.now()},开多,金额:{amount}")
return self.open_long(amount)
elif current_position == -1:
logger.info(f"{datetime.datetime.now()},反手平空做多,金额:{amount}")
if self.close_all_positions():
time.sleep(1)
return self.open_long(amount)
# 开空
elif direction == "short":
if current_position == 0:
logger.info(f"{datetime.datetime.now()},开空,金额:{amount}")
return self.open_short(amount)
elif current_position == 1:
logger.info(f"{datetime.datetime.now()},反手平多做空,金额:{amount}")
if self.close_all_positions():
time.sleep(1)
return self.open_short(amount)
return False
except Exception as e:
logger.error(f"执行交易失败: {e}")
return False
# ==================== 持仓管理器 ====================
class PositionManager:
"""持仓管理器:负责持仓状态管理和止损"""
# 持仓状态常量
POSITION_SHORT = -1 # 做空
2025-12-19 10:35:02 +08:00
POSITION_NONE = 0 # 无持仓
POSITION_LONG = 1 # 做多
2025-12-18 14:20:15 +08:00
def __init__(self, trading_executor: TradingExecutor):
self.trading_executor = trading_executor
self.current_position: int = self.POSITION_NONE
2026-01-21 18:17:27 +08:00
self.position_data: Optional[List] = None
2025-12-18 14:20:15 +08:00
2026-01-21 18:17:27 +08:00
def update_position(self, position_data: Optional[List]) -> None:
2025-12-18 14:20:15 +08:00
"""
更新持仓状态
Args:
2026-01-21 18:17:27 +08:00
position_data: 持仓数据列表
2025-12-18 14:20:15 +08:00
"""
2026-01-21 18:17:27 +08:00
self.position_data = position_data # 修复统一使用position_data
2025-12-18 14:20:15 +08:00
if not position_data:
self.current_position = self.POSITION_NONE
return
2026-01-21 18:14:14 +08:00
position_data.reverse()
start = 0
start1 = 0
for _, i in enumerate(position_data):
direction = i.get("legacyOrderDirection")
if direction == "CLOSE_SHORT":
start = 0
elif direction == "CLOSE_LONG":
start1 = 0
elif direction == "OPEN_SHORT":
start -= 1
elif direction == "OPEN_LONG":
start1 += 1
# direction = position_data.get("legacyOrderDirection")
if start1:
2025-12-18 14:20:15 +08:00
self.current_position = self.POSITION_LONG
2026-01-21 18:14:14 +08:00
elif start:
2025-12-18 14:20:15 +08:00
self.current_position = self.POSITION_SHORT
else:
self.current_position = self.POSITION_NONE
def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool:
"""
检查止损条件
Args:
kline_1: 倒数第二根K线
kline_2: 倒数第一根K线
Returns:
是否执行了止损
"""
try:
# 持多仓,连续两根阴线,平多
if self.current_position == self.POSITION_LONG:
if (KlineAnalyzer.is_bearish(kline_1) and
2025-12-19 10:35:02 +08:00
KlineAnalyzer.is_bearish(kline_2)):
2025-12-18 14:20:15 +08:00
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阴线,平多"
)
if self.trading_executor.close_all_positions():
self.current_position = self.POSITION_NONE
return True
# 持空仓,连续两根阳线,平空
elif self.current_position == self.POSITION_SHORT:
if (KlineAnalyzer.is_bullish(kline_1) and
2025-12-19 10:35:02 +08:00
KlineAnalyzer.is_bullish(kline_2)):
2025-12-18 14:20:15 +08:00
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阳线,平空"
)
if self.trading_executor.close_all_positions():
self.current_position = self.POSITION_NONE
return True
except Exception as e:
logger.error(f"止损检查失败: {e}")
return False
# ==================== 消息发送器 ====================
class MessageSender:
"""消息发送器:负责发送钉钉消息"""
@staticmethod
def send_dingtalk_message(message_content: str, is_error: bool = False) -> None:
"""
发送钉钉消息
Args:
message_content: 消息内容
is_error: 是否为错误消息错误消息会发送多次
"""
if is_error:
prefix = "❌weex"
count = 15
else:
prefix = "🔔weex"
count = 1
for _ in range(count):
send_dingtalk_message(f"{prefix}{message_content}")
@staticmethod
2026-01-21 18:17:27 +08:00
def format_position_message(position_data: Optional[List],
current_position: int,
2025-12-19 10:35:02 +08:00
current_price: float,
available_balance: float) -> str:
2025-12-18 14:20:15 +08:00
"""
格式化持仓信息消息
Args:
2026-01-21 18:17:27 +08:00
position_data: 持仓数据列表
current_position: 当前持仓方向 (-1:, 0:, 1:)
2025-12-18 14:20:15 +08:00
current_price: 当前价格
available_balance: 可用余额
"""
2026-01-21 18:17:27 +08:00
# 根据current_position确定持仓方向
if current_position == PositionManager.POSITION_LONG:
direction = ""
elif current_position == PositionManager.POSITION_SHORT:
direction = ""
else:
direction = ""
2025-12-18 14:20:15 +08:00
2026-01-21 18:17:27 +08:00
# 如果没有持仓,直接返回无持仓消息
if current_position == PositionManager.POSITION_NONE:
2025-12-18 14:20:15 +08:00
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
2025-12-22 11:07:50 +08:00
f"**持仓方向**:无\n"
2025-12-18 14:20:15 +08:00
f"**当前现价**{current_price:.2f} USDT\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
2025-12-22 11:07:50 +08:00
2026-01-21 18:17:27 +08:00
# 如果有持仓尝试从position_data中提取持仓信息
if position_data and isinstance(position_data, list) and len(position_data) > 0:
try:
# 从列表中查找最新的开仓记录
open_order = None
total_fill_size = 0.0
total_fill_value = 0.0
for order in reversed(position_data): # 从最新到最旧遍历
legacy_direction = order.get("legacyOrderDirection", "")
# 查找对应方向的开仓记录
if current_position == PositionManager.POSITION_LONG and legacy_direction == "OPEN_LONG":
fill_size = float(order.get('fillSize', 0))
fill_value = float(order.get('fillValue', 0))
if fill_size > 0:
total_fill_size += fill_size
total_fill_value += fill_value
if open_order is None:
open_order = order
elif current_position == PositionManager.POSITION_SHORT and legacy_direction == "OPEN_SHORT":
fill_size = float(order.get('fillSize', 0))
fill_value = float(order.get('fillValue', 0))
if fill_size > 0:
total_fill_size += fill_size
total_fill_value += fill_value
if open_order is None:
open_order = order
# 如果遇到平仓记录,停止累计
if (current_position == PositionManager.POSITION_LONG and legacy_direction == "CLOSE_LONG") or \
(current_position == PositionManager.POSITION_SHORT and legacy_direction == "CLOSE_SHORT"):
break
if total_fill_size > 0:
# 计算开仓均价
open_avg_price = total_fill_value / total_fill_size if total_fill_size > 0 else 0
# 计算浮动盈亏
if direction == "":
unrealized_pnl = total_fill_size * (current_price - open_avg_price)
pnl_rate = (current_price - open_avg_price) / open_avg_price * 100 if open_avg_price > 0 else 0
else: # 空
unrealized_pnl = total_fill_size * (open_avg_price - current_price)
pnl_rate = (open_avg_price - current_price) / open_avg_price * 100 if open_avg_price > 0 else 0
pnl_str = f"{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)"
current_value = total_fill_size * current_price
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**{direction}\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**开仓均价**{open_avg_price:.2f} USDT\n"
f"**持仓数量(eth)**{total_fill_size:.3f} ETH\n"
f"**持仓数量(usdt)**{total_fill_value / 100:.2f} USDT\n"
f"**名义价值**{current_value:.2f} USDT\n"
f"**浮动盈亏**{pnl_str}\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
except Exception as e:
logger.warning(f"解析持仓数据失败: {e}")
# 如果无法解析持仓数据,至少显示持仓方向
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**{direction}\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
2025-12-18 14:20:15 +08:00
# ==================== 时间工具类 ====================
class TimeUtils:
"""时间工具类"""
@staticmethod
def get_current_kline_timestamp() -> int:
"""
获取当前K线的时间戳30分钟K线的整点或30分时刻毫秒
Returns:
时间戳毫秒
"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或30分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(
minute=0, second=0, microsecond=0
)
else:
target_datetime = current_datetime.replace(
minute=30, second=0, microsecond=0
)
return int(target_datetime.timestamp()) * 1000
@staticmethod
def get_formatted_time() -> str:
"""获取格式化的当前时间字符串"""
timestamp = time.time()
local_time = time.localtime(timestamp)
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
@staticmethod
def get_progress_bar_value() -> int:
"""获取进度条的当前值0-29"""
current_time = time.localtime()
current_minute = current_time.tm_min
return current_minute if current_minute < 30 else current_minute - 30
# ==================== 主交易类 ====================
class WeexTransaction:
"""WEEX自动交易主类"""
2026-01-13 15:17:08 +08:00
def __init__(self, tge_id):
2025-12-18 14:20:15 +08:00
# 配置
self.tge_id = tge_id
self.tge_headers = {
"Authorization": Config.TGE_AUTHORIZATION,
"Content-Type": "application/json"
}
# 组件初始化
self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers)
self.api_client = WEEXApiClient()
self.position_manager = None # 需要在浏览器初始化后创建
self.trading_executor = None # 需要在浏览器初始化后创建
self.token_manager = None # 需要在浏览器初始化后创建
# 状态
self.pbar: Optional[tqdm] = None
self.last_kline_timestamp: Optional[int] = None
self.kline_1: Optional[Dict] = None
self.kline_2: Optional[Dict] = None
self.kline_3: Optional[Dict] = None
def initialize(self) -> bool:
"""初始化所有组件"""
# 打开浏览器
2026-01-13 15:17:08 +08:00
if not self.browser_manager.openBrowser():
2025-12-18 14:20:15 +08:00
logger.error("打开浏览器失败")
MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True)
return False
# 接管浏览器
if not self.browser_manager.take_over_browser():
logger.error("接管浏览器失败")
MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True)
return False
# 关闭多余标签页
self.browser_manager.close_extra_tabs()
# 初始化需要page的组件
page = self.browser_manager.page
self.trading_executor = TradingExecutor(page, self.api_client)
self.position_manager = PositionManager(self.trading_executor)
self.token_manager = TokenManager(self.api_client, page)
# 打开交易页面
page.get(url=Config.TRADING_URL)
# 初始化时先获取一次token
if not self.token_manager.get_token():
logger.warning("初始化时获取token失败将在获取K线数据时重试")
# 初始化进度条
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80)
logger.success("系统初始化完成")
return True
def update_progress_bar(self) -> None:
"""更新进度条"""
if self.pbar:
self.pbar.n = TimeUtils.get_progress_bar_value()
self.pbar.refresh()
def fetch_and_update_kline(self) -> bool:
"""获取并更新K线数据"""
# 先获取token获取K线数据需要token
if not self.token_manager.get_token():
logger.warning("获取token失败无法获取K线数据")
return False
kline_data = self.api_client.get_kline_data()
if not kline_data:
logger.warning("获取K线数据失败")
MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True)
return False
# 排序并取最后三根
sorted_data = sorted(kline_data, key=lambda x: x["id"])
if len(sorted_data) < 3:
logger.error("K线数据不足3根")
return False
self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:]
# 检查是否是新K线
current_kline_id = self.kline_3["id"]
current_timestamp = TimeUtils.get_current_kline_timestamp()
# 判断K线时间是否匹配当前时间并且是新K线
if current_kline_id != current_timestamp:
return False # K线时间不匹配
if self.last_kline_timestamp == current_timestamp:
return False # 已经处理过这个K线
self.last_kline_timestamp = current_timestamp
return True
def sync_position_status(self) -> bool:
"""同步持仓状态"""
success, position_data = self.api_client.get_position_status()
if not success:
logger.error("获取持仓状态失败")
MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True)
return False
self.position_manager.update_position(position_data)
return True
def process_trading_logic(self) -> None:
"""处理交易逻辑"""
page = self.browser_manager.page
# 刷新token因为可能已经过期但这不是必须的因为获取K线时已经获取过了
# 如果token有效这里会快速返回如果无效会重新获取
self.token_manager.get_token()
# 刷新页面
page.get(url=Config.TRADING_URL)
# 同步持仓状态
if not self.sync_position_status():
return
# 检查止损
try:
self.position_manager.check_stop_loss(self.kline_1, self.kline_2)
except Exception as e:
logger.error(f"止损检查出错: {e}")
MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True)
return
# 检查交易信号
direction, signal_type = KlineAnalyzer.check_engulfing_signal(
prev=self.kline_1,
curr=self.kline_2
)
if direction:
2025-12-18 16:42:26 +08:00
# 检查是否已经持有相同方向的仓位
current_pos = self.position_manager.current_position
if (direction == "long" and current_pos == PositionManager.POSITION_LONG) or \
2025-12-19 10:35:02 +08:00
(direction == "short" and current_pos == PositionManager.POSITION_SHORT):
2025-12-18 16:42:26 +08:00
logger.info(f"已持有{direction}方向仓位,继续持仓,不执行新开仓操作")
# 继续持仓,不发送开仓信息,直接返回继续执行后续的持仓信息发送
else:
# 需要执行交易(新开仓或反手)
# 获取可用余额
balance = self.api_client.get_available_balance()
if balance is None:
logger.error("获取可用余额失败")
MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True)
return
2026-01-12 09:34:55 +08:00
amount = int(balance / 100)
2025-12-18 16:42:26 +08:00
# 执行交易
trade_executed = self.trading_executor.execute_trade(
direction=direction,
current_position=self.position_manager.current_position,
amount=amount
2025-12-18 14:20:15 +08:00
)
2025-12-18 16:42:26 +08:00
if not trade_executed:
MessageSender.send_dingtalk_message(
f"交易执行失败,方向:{direction}",
is_error=True
)
return
2025-12-18 14:20:15 +08:00
2025-12-18 16:42:26 +08:00
# 更新持仓状态
if direction == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
elif direction == "short":
self.position_manager.current_position = PositionManager.POSITION_SHORT
# 发送交易消息(只有真正执行了开仓或反手操作才发送)
MessageSender.send_dingtalk_message(
f"信号:{direction},开仓金额:{amount:.2f}",
is_error=False
)
2025-12-18 14:20:15 +08:00
# 再次同步持仓状态(交易后)
if not self.sync_position_status():
return
# 发送持仓信息
balance = self.api_client.get_available_balance()
current_price = float(self.kline_3["close"])
if balance is not None:
position_info = self.position_manager.position_data
2026-01-21 18:17:27 +08:00
current_pos = self.position_manager.current_position
2025-12-18 14:20:15 +08:00
msg = MessageSender.format_position_message(
position_info,
2026-01-21 18:17:27 +08:00
current_pos,
2025-12-18 14:20:15 +08:00
current_price,
balance
)
MessageSender.send_dingtalk_message(msg, is_error=False)
def run(self) -> None:
"""主运行循环"""
# 初始化
if not self.initialize():
return
logger.info("系统初始化完成,开始运行...")
# 主循环
while True:
try:
# 更新进度条
self.update_progress_bar()
# 获取当前K线时间戳
current_timestamp = TimeUtils.get_current_kline_timestamp()
# 检查是否已经处理过这个K线
if self.last_kline_timestamp == current_timestamp:
time.sleep(10)
continue
# 获取并更新K线数据
if not self.fetch_and_update_kline():
time.sleep(10)
continue
# 处理交易逻辑
self.process_trading_logic()
# 重置进度条
if self.pbar:
self.pbar.reset()
# 等待一段时间
time.sleep(5)
except KeyboardInterrupt:
logger.info("收到中断信号,正在退出...")
break
except Exception as e:
logger.error(f"运行出错: {e}")
MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True)
time.sleep(10)
def action(self) -> None:
"""兼容旧接口的方法名"""
self.run()
if __name__ == '__main__':
2026-01-13 15:17:08 +08:00
transaction = WeexTransaction(tge_id="86837a981aba4576be6916a0ef6ad785")
2025-12-18 14:20:15 +08:00
transaction.action()