Files
lm_code/交易/weex-结构优化.py
2026-01-21 18:17:27 +08:00

954 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WEEX ETH-USDT 永续合约自动交易系统
优化版本:改进代码结构、模块化和可读性
"""
import time
import datetime
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from curl_cffi import requests
from bit_tools import openBrowser
from 交易.tools import send_dingtalk_message
# ==================== 配置常量 ====================
class Config:
"""配置类:集中管理所有配置常量"""
# TGE浏览器配置
TGE_URL = "http://127.0.0.1:50326"
TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91"
# WEEX API配置
CONTRACT_ID = "10000002"
PRODUCT_CODE = "cmt_ethusdt"
KLINE_TYPE = "MINUTE_30"
KLINE_LIMIT = 300
# 交易页面URL
TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT"
# 交易配置
POSITION_RATIO = 100 # 开仓金额比例余额的1/100
# 重试配置
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 1 # 秒
# ==================== K线分析器 ====================
class KlineAnalyzer:
"""K线分析器负责K线数据分析和信号判断"""
@staticmethod
def is_bullish(candle: Dict) -> bool:
"""判断是否为阳线"""
return float(candle['close']) > float(candle['open'])
@staticmethod
def is_bearish(candle: Dict) -> bool:
"""判断是否为阴线"""
return float(candle['close']) < float(candle['open'])
@staticmethod
def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]:
"""
包住形态信号判定30分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
Returns:
(direction, signal_type): direction为"long""short"signal_type为信号类型
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if (KlineAnalyzer.is_bullish(curr) and
KlineAnalyzer.is_bearish(prev) and
int(c_open) <= int(p_close) and
int(c_close) >= int(p_open)):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if (KlineAnalyzer.is_bearish(curr) and
KlineAnalyzer.is_bullish(prev) and
int(c_open) >= int(p_close) and
int(c_close) <= int(p_open)):
return "short", "bull_bear_engulf"
return None, None
# ==================== 浏览器管理器 ====================
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict):
self.tge_id = tge_id
self.tge_url = tge_url
self.tge_headers = tge_headers
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.tge_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.tge_port = bit_port
return True
except:
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
logger.success("成功接管浏览器")
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
closed_count = 0
for idx, tab in enumerate(tabs):
if idx == 0:
continue
tab.close()
closed_count += 1
if closed_count > 0:
logger.info(f"已关闭{closed_count}个多余标签页")
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
# ==================== WEEX API客户端 ====================
class WEEXApiClient:
"""WEEX API客户端负责与WEEX API的交互"""
def __init__(self):
self.session = requests.Session()
self.headers: Optional[Dict] = None
def update_headers(self, headers: Dict) -> None:
"""更新session的headers"""
if not self.headers:
self.session.headers = headers
else:
self.session.headers.update(headers)
self.headers = headers
def get_kline_data(self, contract_id: str = Config.CONTRACT_ID,
product_code: str = Config.PRODUCT_CODE,
kline_type: str = Config.KLINE_TYPE,
limit: int = Config.KLINE_LIMIT) -> List[Dict]:
"""
获取K线数据
Args:
contract_id: 合约ID
product_code: 产品代码
kline_type: K线类型
limit: 返回数量限制
Returns:
K线数据列表
"""
params = {
'contractId': contract_id,
'productCode': product_code,
'priceType': 'LAST_PRICE',
'klineType': kline_type,
'limit': str(limit),
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
logger.info(f"获取最新K线数据{attempt + 1}/{Config.MAX_RETRY_ATTEMPTS}次尝试...")
try:
response = self.session.get(
'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params,
timeout=15
)
# 检查响应状态码
if response.status_code != 200:
logger.warning(f"获取K线数据失败HTTP状态码{response.status_code}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
# 尝试解析JSON
try:
response_data = response.json()
except Exception as json_error:
logger.warning(f"JSON解析失败响应内容{response.text[:200]}...")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
# 检查数据结构
if "data" not in response_data or "dataList" not in response_data["data"]:
logger.warning(f"响应数据格式不正确:{response_data}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
result = response_data["data"]["dataList"]
kline_data = []
for item in result:
# WEEX API返回格式: [close, high, low, open, id]
kline_data.append({
'id': int(item[4]),
'open': float(item[3]),
'high': float(item[1]),
'low': float(item[2]),
'close': float(item[0])
})
logger.success(f"成功获取{len(kline_data)}条K线数据")
return kline_data
except Exception as e:
logger.warning(f"获取K线数据失败{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取K线数据失败已重试3次仍未成功")
return []
def get_available_balance(self) -> Optional[float]:
"""获取可用余额"""
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
timeout=15
)
balance = float(response.json()["data"]["newContract"]["balanceList"][0]["available"])
logger.success(f"成功获取可用余额: {balance}")
return balance
except Exception as e:
logger.warning(f"获取余额失败(第{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取余额失败已重试3次仍未成功")
return None
def get_position_status(self) -> Tuple[bool, Optional[List]]:
"""
获取持仓状态
Returns:
(success, position_data): success为是否成功position_data为持仓信息列表
"""
json_data = {
'filterContractIdList': [10000002],
'limit': 100,
'languageType': 0,
'sign': 'SIGN',
'timeZone': 'string',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
# 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
json=json_data,
timeout=15
)
datas = response.json()["data"]["dataList"]
if not datas:
return True, None
latest_order = datas
return True, latest_order
except Exception as e:
logger.warning(f"获取持仓状态失败(第{attempt + 1}次尝试): {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False, None
# ==================== Token管理器 ====================
class TokenManager:
"""Token管理器负责获取和更新认证token"""
def __init__(self, api_client: WEEXApiClient, page: ChromiumPage):
self.api_client = api_client
self.page = page
def get_token(self) -> bool:
"""从浏览器中获取U-TOKEN"""
tab = self.page.new_tab()
tab.listen.start("/user/security/getLanguageType")
try:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
tab.get(url=Config.TRADING_URL)
res = tab.listen.wait(timeout=5)
if res.request.headers.get("U-TOKEN"):
headers = res.request.headers
self.api_client.update_headers(headers)
logger.success("成功获取token")
return True
except Exception as e:
logger.warning(f"获取token尝试{attempt + 1}失败: {e}")
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
logger.error("获取token失败已重试3次仍未成功")
return False
finally:
tab.close()
# ==================== 交易执行器 ====================
class TradingExecutor:
"""交易执行器:负责执行交易操作"""
def __init__(self, page: ChromiumPage, api_client: WEEXApiClient):
self.page = page
self.api_client = api_client
def navigate_to_trading_page(self) -> bool:
"""导航到交易页面(选择市价)"""
try:
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
return True
except Exception as e:
logger.error(f"导航到交易页面失败: {e}")
return False
def close_all_positions(self) -> bool:
"""平仓所有持仓(闪电平仓)"""
try:
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True)
time.sleep(3)
logger.success("成功执行平仓操作")
return True
except Exception as e:
logger.error(f"平仓失败: {e}")
return False
def open_long(self, amount: float) -> bool:
"""开多"""
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True)
logger.success(f"成功开多,金额:{amount}")
return True
except Exception as e:
logger.error(f"开多失败: {e}")
return False
def open_short(self, amount: float) -> bool:
"""开空"""
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True)
logger.success(f"成功开空,金额:{amount}")
return True
except Exception as e:
logger.error(f"开空失败: {e}")
return False
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
"""
执行交易操作
Args:
direction: 交易方向 "long""short"
current_position: 当前持仓状态 -1:空, 0:无, 1:多
amount: 交易金额
Returns:
是否成功如果已持有相同方向仓位返回True但不执行操作
"""
# 检查是否已经持有相同方向的仓位
if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \
(direction == "short" and current_position == PositionManager.POSITION_SHORT):
logger.info(f"已持有{direction}方向仓位,无需重复开仓")
return True # 返回True表示"状态正常",虽然没有执行交易
if not self.navigate_to_trading_page():
return False
try:
# 开多
if direction == "long":
if current_position == 0:
logger.info(f"{datetime.datetime.now()},开多,金额:{amount}")
return self.open_long(amount)
elif current_position == -1:
logger.info(f"{datetime.datetime.now()},反手平空做多,金额:{amount}")
if self.close_all_positions():
time.sleep(1)
return self.open_long(amount)
# 开空
elif direction == "short":
if current_position == 0:
logger.info(f"{datetime.datetime.now()},开空,金额:{amount}")
return self.open_short(amount)
elif current_position == 1:
logger.info(f"{datetime.datetime.now()},反手平多做空,金额:{amount}")
if self.close_all_positions():
time.sleep(1)
return self.open_short(amount)
return False
except Exception as e:
logger.error(f"执行交易失败: {e}")
return False
# ==================== 持仓管理器 ====================
class PositionManager:
"""持仓管理器:负责持仓状态管理和止损"""
# 持仓状态常量
POSITION_SHORT = -1 # 做空
POSITION_NONE = 0 # 无持仓
POSITION_LONG = 1 # 做多
def __init__(self, trading_executor: TradingExecutor):
self.trading_executor = trading_executor
self.current_position: int = self.POSITION_NONE
self.position_data: Optional[List] = None
def update_position(self, position_data: Optional[List]) -> None:
"""
更新持仓状态
Args:
position_data: 持仓数据列表
"""
self.position_data = position_data # 修复统一使用position_data
if not position_data:
self.current_position = self.POSITION_NONE
return
position_data.reverse()
start = 0
start1 = 0
for _, i in enumerate(position_data):
direction = i.get("legacyOrderDirection")
if direction == "CLOSE_SHORT":
start = 0
elif direction == "CLOSE_LONG":
start1 = 0
elif direction == "OPEN_SHORT":
start -= 1
elif direction == "OPEN_LONG":
start1 += 1
# direction = position_data.get("legacyOrderDirection")
if start1:
self.current_position = self.POSITION_LONG
elif start:
self.current_position = self.POSITION_SHORT
else:
self.current_position = self.POSITION_NONE
def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool:
"""
检查止损条件
Args:
kline_1: 倒数第二根K线
kline_2: 倒数第一根K线
Returns:
是否执行了止损
"""
try:
# 持多仓,连续两根阴线,平多
if self.current_position == self.POSITION_LONG:
if (KlineAnalyzer.is_bearish(kline_1) and
KlineAnalyzer.is_bearish(kline_2)):
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阴线,平多"
)
if self.trading_executor.close_all_positions():
self.current_position = self.POSITION_NONE
return True
# 持空仓,连续两根阳线,平空
elif self.current_position == self.POSITION_SHORT:
if (KlineAnalyzer.is_bullish(kline_1) and
KlineAnalyzer.is_bullish(kline_2)):
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阳线,平空"
)
if self.trading_executor.close_all_positions():
self.current_position = self.POSITION_NONE
return True
except Exception as e:
logger.error(f"止损检查失败: {e}")
return False
# ==================== 消息发送器 ====================
class MessageSender:
"""消息发送器:负责发送钉钉消息"""
@staticmethod
def send_dingtalk_message(message_content: str, is_error: bool = False) -> None:
"""
发送钉钉消息
Args:
message_content: 消息内容
is_error: 是否为错误消息(错误消息会发送多次)
"""
if is_error:
prefix = "❌weex"
count = 15
else:
prefix = "🔔weex"
count = 1
for _ in range(count):
send_dingtalk_message(f"{prefix}{message_content}")
@staticmethod
def format_position_message(position_data: Optional[List],
current_position: int,
current_price: float,
available_balance: float) -> str:
"""
格式化持仓信息消息
Args:
position_data: 持仓数据列表
current_position: 当前持仓方向 (-1:空, 0:无, 1:多)
current_price: 当前价格
available_balance: 可用余额
"""
# 根据current_position确定持仓方向
if current_position == PositionManager.POSITION_LONG:
direction = ""
elif current_position == PositionManager.POSITION_SHORT:
direction = ""
else:
direction = ""
# 如果没有持仓,直接返回无持仓消息
if current_position == PositionManager.POSITION_NONE:
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**:无\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
# 如果有持仓尝试从position_data中提取持仓信息
if position_data and isinstance(position_data, list) and len(position_data) > 0:
try:
# 从列表中查找最新的开仓记录
open_order = None
total_fill_size = 0.0
total_fill_value = 0.0
for order in reversed(position_data): # 从最新到最旧遍历
legacy_direction = order.get("legacyOrderDirection", "")
# 查找对应方向的开仓记录
if current_position == PositionManager.POSITION_LONG and legacy_direction == "OPEN_LONG":
fill_size = float(order.get('fillSize', 0))
fill_value = float(order.get('fillValue', 0))
if fill_size > 0:
total_fill_size += fill_size
total_fill_value += fill_value
if open_order is None:
open_order = order
elif current_position == PositionManager.POSITION_SHORT and legacy_direction == "OPEN_SHORT":
fill_size = float(order.get('fillSize', 0))
fill_value = float(order.get('fillValue', 0))
if fill_size > 0:
total_fill_size += fill_size
total_fill_value += fill_value
if open_order is None:
open_order = order
# 如果遇到平仓记录,停止累计
if (current_position == PositionManager.POSITION_LONG and legacy_direction == "CLOSE_LONG") or \
(current_position == PositionManager.POSITION_SHORT and legacy_direction == "CLOSE_SHORT"):
break
if total_fill_size > 0:
# 计算开仓均价
open_avg_price = total_fill_value / total_fill_size if total_fill_size > 0 else 0
# 计算浮动盈亏
if direction == "":
unrealized_pnl = total_fill_size * (current_price - open_avg_price)
pnl_rate = (current_price - open_avg_price) / open_avg_price * 100 if open_avg_price > 0 else 0
else: # 空
unrealized_pnl = total_fill_size * (open_avg_price - current_price)
pnl_rate = (open_avg_price - current_price) / open_avg_price * 100 if open_avg_price > 0 else 0
pnl_str = f"{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)"
current_value = total_fill_size * current_price
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**{direction}\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**开仓均价**{open_avg_price:.2f} USDT\n"
f"**持仓数量(eth)**{total_fill_size:.3f} ETH\n"
f"**持仓数量(usdt)**{total_fill_value / 100:.2f} USDT\n"
f"**名义价值**{current_value:.2f} USDT\n"
f"**浮动盈亏**{pnl_str}\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
except Exception as e:
logger.warning(f"解析持仓数据失败: {e}")
# 如果无法解析持仓数据,至少显示持仓方向
return (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**{direction}\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**账户可用余额**{available_balance:.2f} USDT"
)
# ==================== 时间工具类 ====================
class TimeUtils:
"""时间工具类"""
@staticmethod
def get_current_kline_timestamp() -> int:
"""
获取当前K线的时间戳30分钟K线的整点或30分时刻毫秒
Returns:
时间戳(毫秒)
"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或30分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(
minute=0, second=0, microsecond=0
)
else:
target_datetime = current_datetime.replace(
minute=30, second=0, microsecond=0
)
return int(target_datetime.timestamp()) * 1000
@staticmethod
def get_formatted_time() -> str:
"""获取格式化的当前时间字符串"""
timestamp = time.time()
local_time = time.localtime(timestamp)
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
@staticmethod
def get_progress_bar_value() -> int:
"""获取进度条的当前值0-29"""
current_time = time.localtime()
current_minute = current_time.tm_min
return current_minute if current_minute < 30 else current_minute - 30
# ==================== 主交易类 ====================
class WeexTransaction:
"""WEEX自动交易主类"""
def __init__(self, tge_id):
# 配置
self.tge_id = tge_id
self.tge_headers = {
"Authorization": Config.TGE_AUTHORIZATION,
"Content-Type": "application/json"
}
# 组件初始化
self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers)
self.api_client = WEEXApiClient()
self.position_manager = None # 需要在浏览器初始化后创建
self.trading_executor = None # 需要在浏览器初始化后创建
self.token_manager = None # 需要在浏览器初始化后创建
# 状态
self.pbar: Optional[tqdm] = None
self.last_kline_timestamp: Optional[int] = None
self.kline_1: Optional[Dict] = None
self.kline_2: Optional[Dict] = None
self.kline_3: Optional[Dict] = None
def initialize(self) -> bool:
"""初始化所有组件"""
# 打开浏览器
if not self.browser_manager.openBrowser():
logger.error("打开浏览器失败")
MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True)
return False
# 接管浏览器
if not self.browser_manager.take_over_browser():
logger.error("接管浏览器失败")
MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True)
return False
# 关闭多余标签页
self.browser_manager.close_extra_tabs()
# 初始化需要page的组件
page = self.browser_manager.page
self.trading_executor = TradingExecutor(page, self.api_client)
self.position_manager = PositionManager(self.trading_executor)
self.token_manager = TokenManager(self.api_client, page)
# 打开交易页面
page.get(url=Config.TRADING_URL)
# 初始化时先获取一次token
if not self.token_manager.get_token():
logger.warning("初始化时获取token失败将在获取K线数据时重试")
# 初始化进度条
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80)
logger.success("系统初始化完成")
return True
def update_progress_bar(self) -> None:
"""更新进度条"""
if self.pbar:
self.pbar.n = TimeUtils.get_progress_bar_value()
self.pbar.refresh()
def fetch_and_update_kline(self) -> bool:
"""获取并更新K线数据"""
# 先获取token获取K线数据需要token
if not self.token_manager.get_token():
logger.warning("获取token失败无法获取K线数据")
return False
kline_data = self.api_client.get_kline_data()
if not kline_data:
logger.warning("获取K线数据失败")
MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True)
return False
# 排序并取最后三根
sorted_data = sorted(kline_data, key=lambda x: x["id"])
if len(sorted_data) < 3:
logger.error("K线数据不足3根")
return False
self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:]
# 检查是否是新K线
current_kline_id = self.kline_3["id"]
current_timestamp = TimeUtils.get_current_kline_timestamp()
# 判断K线时间是否匹配当前时间并且是新K线
if current_kline_id != current_timestamp:
return False # K线时间不匹配
if self.last_kline_timestamp == current_timestamp:
return False # 已经处理过这个K线
self.last_kline_timestamp = current_timestamp
return True
def sync_position_status(self) -> bool:
"""同步持仓状态"""
success, position_data = self.api_client.get_position_status()
if not success:
logger.error("获取持仓状态失败")
MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True)
return False
self.position_manager.update_position(position_data)
return True
def process_trading_logic(self) -> None:
"""处理交易逻辑"""
page = self.browser_manager.page
# 刷新token因为可能已经过期但这不是必须的因为获取K线时已经获取过了
# 如果token有效这里会快速返回如果无效会重新获取
self.token_manager.get_token()
# 刷新页面
page.get(url=Config.TRADING_URL)
# 同步持仓状态
if not self.sync_position_status():
return
# 检查止损
try:
self.position_manager.check_stop_loss(self.kline_1, self.kline_2)
except Exception as e:
logger.error(f"止损检查出错: {e}")
MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True)
return
# 检查交易信号
direction, signal_type = KlineAnalyzer.check_engulfing_signal(
prev=self.kline_1,
curr=self.kline_2
)
if direction:
# 检查是否已经持有相同方向的仓位
current_pos = self.position_manager.current_position
if (direction == "long" and current_pos == PositionManager.POSITION_LONG) or \
(direction == "short" and current_pos == PositionManager.POSITION_SHORT):
logger.info(f"已持有{direction}方向仓位,继续持仓,不执行新开仓操作")
# 继续持仓,不发送开仓信息,直接返回继续执行后续的持仓信息发送
else:
# 需要执行交易(新开仓或反手)
# 获取可用余额
balance = self.api_client.get_available_balance()
if balance is None:
logger.error("获取可用余额失败")
MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True)
return
amount = int(balance / 100)
# 执行交易
trade_executed = self.trading_executor.execute_trade(
direction=direction,
current_position=self.position_manager.current_position,
amount=amount
)
if not trade_executed:
MessageSender.send_dingtalk_message(
f"交易执行失败,方向:{direction}",
is_error=True
)
return
# 更新持仓状态
if direction == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
elif direction == "short":
self.position_manager.current_position = PositionManager.POSITION_SHORT
# 发送交易消息(只有真正执行了开仓或反手操作才发送)
MessageSender.send_dingtalk_message(
f"信号:{direction},开仓金额:{amount:.2f}",
is_error=False
)
# 再次同步持仓状态(交易后)
if not self.sync_position_status():
return
# 发送持仓信息
balance = self.api_client.get_available_balance()
current_price = float(self.kline_3["close"])
if balance is not None:
position_info = self.position_manager.position_data
current_pos = self.position_manager.current_position
msg = MessageSender.format_position_message(
position_info,
current_pos,
current_price,
balance
)
MessageSender.send_dingtalk_message(msg, is_error=False)
def run(self) -> None:
"""主运行循环"""
# 初始化
if not self.initialize():
return
logger.info("系统初始化完成,开始运行...")
# 主循环
while True:
try:
# 更新进度条
self.update_progress_bar()
# 获取当前K线时间戳
current_timestamp = TimeUtils.get_current_kline_timestamp()
# 检查是否已经处理过这个K线
if self.last_kline_timestamp == current_timestamp:
time.sleep(10)
continue
# 获取并更新K线数据
if not self.fetch_and_update_kline():
time.sleep(10)
continue
# 处理交易逻辑
self.process_trading_logic()
# 重置进度条
if self.pbar:
self.pbar.reset()
# 等待一段时间
time.sleep(5)
except KeyboardInterrupt:
logger.info("收到中断信号,正在退出...")
break
except Exception as e:
logger.error(f"运行出错: {e}")
MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True)
time.sleep(10)
def action(self) -> None:
"""兼容旧接口的方法名"""
self.run()
if __name__ == '__main__':
transaction = WeexTransaction(tge_id="86837a981aba4576be6916a0ef6ad785")
transaction.action()