Files
lm_code/交易/websea-结构优化.py
2025-12-19 10:35:02 +08:00

739 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSea ETH-USDT 永续合约自动交易系统
优化版本:改进代码结构、模块化和可读性
"""
import time
import datetime
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from curl_cffi import requests
from 交易.tools import send_dingtalk_message
class KlineAnalyzer:
"""K线分析器负责K线数据分析和信号判断"""
@staticmethod
def is_bullish(candle: Dict) -> bool:
"""判断是否为阳线"""
return float(candle['close']) > float(candle['open'])
@staticmethod
def is_bearish(candle: Dict) -> bool:
"""判断是否为阴线"""
return float(candle['close']) < float(candle['open'])
@staticmethod
def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]:
"""
包住形态信号判定仅30分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
Returns:
(direction, signal_type): direction为"long""short"signal_type为信号类型
"""
c_open, c_close = float(curr['open']), float(curr['close'])
p_open = float(prev['open'])
# 前跌后涨包住 -> 做多
if (KlineAnalyzer.is_bullish(curr) and
KlineAnalyzer.is_bearish(prev) and
int(c_close) >= int(p_open)):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if (KlineAnalyzer.is_bearish(curr) and
KlineAnalyzer.is_bullish(prev) and
int(c_close) <= int(p_open)):
return "short", "bull_bear_engulf"
return None, None
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict):
self.tge_id = tge_id
self.tge_url = tge_url
self.tge_headers = tge_headers
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def open_browser(self) -> bool:
"""打开浏览器并获取端口"""
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers,
timeout=10
)
self.tge_port = response.json()["data"]["port"]
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
for idx, tab in enumerate(tabs):
if idx == 0:
continue
tab.close()
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
class ApiClient:
"""API客户端负责与WebSea API的交互"""
def __init__(self):
self.session = requests.Session()
self.headers: Dict = {}
self.cookies: Dict = {}
def update_session(self, cookies: Dict, headers: Dict) -> None:
"""更新session的cookies和headers"""
self.session.cookies.update(cookies)
self.session.headers.update(headers)
def get_kline_data(self, symbol: str = 'ETH-USDT', period: str = '30min',
days: int = 10) -> List[Dict]:
"""
获取K线数据
Args:
symbol: 交易对符号
period: K线周期
days: 获取最近N天的数据
Returns:
K线数据列表
"""
now_ts = int(time.time())
start_ts = int(time.time() - 86400 * days)
params = {
'symbol': symbol,
'period': period,
'start': str(start_ts),
'end': str(now_ts),
}
for attempt in range(3):
logger.info(f"获取最新K线数据{attempt + 1}次尝试...")
try:
response = self.session.get(
'https://capi.websea.com/webApi/market/getKline',
params=params,
timeout=5
)
result = response.json()["result"]['data']
kline_data = []
for data in result:
kline_data.append({
'id': int(data["id"]),
'open': float(data["open"]),
'high': float(data["high"]),
'low': float(data["low"]),
'close': float(data["close"])
})
return kline_data
except Exception as e:
logger.warning(f"获取K线数据失败: {e}")
if attempt < 2:
time.sleep(1)
return []
def get_available_balance(self) -> Optional[float]:
"""获取可用余额"""
for attempt in range(3):
try:
response = self.session.get(
'https://capi.websea.com/webApi/asset/account',
timeout=5
)
return float(response.json()["result"]["available"])
except Exception as e:
logger.warning(f"获取余额失败: {e}")
if attempt < 2:
time.sleep(1)
return None
def get_position_status(self) -> Tuple[bool, Optional[Dict]]:
"""
获取持仓状态
Returns:
(success, position_data): success为是否成功position_data为持仓信息
"""
for attempt in range(3):
try:
response = self.session.get(
'https://capi.websea.com/webApi/entrust/holdPosition',
timeout=5
)
resp_data = response.json()
if not resp_data["result"]:
return True, {"type": 0, "position": None}
position = resp_data["result"][0]
position_info = {
"type": position["type"], # 1: 多, 2: 空
"position": position
}
return True, position_info
except Exception as e:
logger.warning(f"获取持仓状态失败: {e}")
if attempt < 2:
time.sleep(1)
return False, None
class TokenManager:
"""Token管理器负责获取和更新认证token"""
def __init__(self, api_client: ApiClient, page: ChromiumPage):
self.api_client = api_client
self.page = page
def get_token(self) -> bool:
"""从浏览器中获取token和cookies"""
try:
tab = self.page.new_tab()
tab.listen.start("api.websea.com/webApi/zendesk/url")
tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
res = tab.listen.wait()
# 提取cookies
cookies = {}
for cookie in res.request.cookies:
if not cookie["name"].startswith(':'):
cookies[cookie["name"]] = cookie["value"]
# 提取headers
headers = {}
if "token" in res.request.headers:
headers["token"] = res.request.headers["token"]
tab.close()
# 检查是否有shareToken
if cookies.get('shareToken'):
self.api_client.update_session(cookies, headers)
return True
else:
logger.warning("未获取到shareToken")
return False
except Exception as e:
logger.error(f"获取token失败: {e}")
return False
class TradingExecutor:
"""交易执行器:负责执行交易操作"""
def __init__(self, page: ChromiumPage, api_client: ApiClient):
self.page = page
self.api_client = api_client
def navigate_to_trading_page(self) -> bool:
"""导航到交易页面"""
try:
self.page.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click()
time.sleep(1)
return True
except Exception as e:
logger.error(f"导航到交易页面失败: {e}")
return False
def close_all_positions(self) -> bool:
"""平仓所有持仓"""
try:
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.actions.click('x://*[normalize-space(text())= "市价全平"]')
time.sleep(3)
return True
except Exception as e:
logger.error(f"平仓失败: {e}")
return False
def open_long(self, amount: float) -> bool:
"""开多"""
try:
self.page.ele('x://*[@id="amountInput"]').input(amount)
time.sleep(1)
self.page.actions.click('x://*[normalize-space(text())= "买入/做多"]')
return True
except Exception as e:
logger.error(f"开多失败: {e}")
return False
def open_short(self, amount: float) -> bool:
"""开空"""
try:
self.page.ele('x://*[@id="amountInput"]').input(amount)
time.sleep(1)
self.page.actions.click('x://*[normalize-space(text())= "卖出/做空"]')
return True
except Exception as e:
logger.error(f"开空失败: {e}")
return False
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
"""
执行交易操作
Args:
direction: 交易方向 "long""short"
current_position: 当前持仓状态 -1:空, 0:无, 1:多
amount: 交易金额
Returns:
是否成功
"""
if not self.navigate_to_trading_page():
return False
try:
# 开多
if direction == "long":
if current_position == 0:
logger.success(f"{datetime.datetime.now()},开多,金额:{amount}")
self.open_long(amount)
elif current_position == -1:
logger.success(f"{datetime.datetime.now()},反手平空做多,金额:{amount}")
self.close_all_positions()
self.open_long(amount)
# 开空
elif direction == "short":
if current_position == 0:
logger.success(f"{datetime.datetime.now()},开空,金额:{amount}")
self.open_short(amount)
elif current_position == 1:
logger.success(f"{datetime.datetime.now()},反手平多做空,金额:{amount}")
self.close_all_positions()
self.open_short(amount)
return True
except Exception as e:
logger.error(f"执行交易失败: {e}")
return False
class PositionManager:
"""持仓管理器:负责持仓状态管理和止损"""
def __init__(self, trading_executor: TradingExecutor):
self.trading_executor = trading_executor
self.current_position: int = 0 # -1:空, 0:无, 1:多
def update_position(self, position_type: int) -> None:
"""更新持仓状态"""
self.current_position = position_type
def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool:
"""
检查止损条件
Args:
kline_1: 倒数第二根K线
kline_2: 倒数第一根K线
Returns:
是否执行了止损
"""
try:
# 持多仓,连续两根阴线,平多
if self.current_position == 1:
if (KlineAnalyzer.is_bearish(kline_1) and
KlineAnalyzer.is_bearish(kline_2)):
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阴线,平多"
)
self.trading_executor.close_all_positions()
self.current_position = 0
return True
# 持空仓,连续两根阳线,平空
elif self.current_position == -1:
if (KlineAnalyzer.is_bullish(kline_1) and
KlineAnalyzer.is_bullish(kline_2)):
logger.success(
f"{datetime.datetime.now()},止损信号:连续两根阳线,平空"
)
self.trading_executor.close_all_positions()
self.current_position = 0
return True
except Exception as e:
logger.error(f"止损检查失败: {e}")
return False
class MessageSender:
"""消息发送器:负责发送钉钉消息"""
@staticmethod
def send_dingtalk_message(message_content: str, is_error: bool = False) -> None:
"""
发送钉钉消息
Args:
message_content: 消息内容
is_error: 是否为错误消息(错误消息会发送多次)
"""
if is_error:
prefix = "❌websea"
count = 15
else:
prefix = "🔔websea"
count = 1
for _ in range(count):
send_dingtalk_message(f"{prefix}{message_content}")
@staticmethod
def format_position_message(position_data: Optional[Dict],
available_balance: float) -> str:
"""格式化持仓信息消息"""
if position_data and position_data.get("position"):
pos = position_data["position"]
direction = "" if int(pos['type']) == 1 else ""
contracts = int(pos['number'])
eth_amount = float(pos['numberConvert'])
open_price = float(pos['openPriceAvg'])
current_price = float(pos['markPrice'])
unrealized_pnl = float(pos['unProfitLoss'])
pnl_rate = float(pos['profitRate'])
return (
f"【WebSea ETH-USDT 永续持仓】\n"
f"持仓方向:{direction}\n"
f"持仓张数:{contracts}\n"
f"持仓数量(eth){eth_amount:.3f} ETH\n"
f"持仓数量(usdt){float(pos['numberConvertU']) / 100:.3f} USDT\n"
f"开仓均价:{open_price:.2f} USDT\n"
f"标记现价:{current_price:.2f} USDT\n"
f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n"
f"账户可用:{available_balance:.2f} USDT"
)
else:
return (
f"【WebSea ETH-USDT 永续持仓】\n"
f"持仓方向:无\n"
f"账户可用:{available_balance:.2f} USDT"
)
class TimeUtils:
"""时间工具类"""
@staticmethod
def get_current_kline_timestamp() -> int:
"""
获取当前K线的时间戳30分钟K线的整点或30分时刻
Returns:
时间戳
"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或30分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(
minute=0, second=0, microsecond=0
)
else:
target_datetime = current_datetime.replace(
minute=30, second=0, microsecond=0
)
return int(target_datetime.timestamp())
@staticmethod
def get_formatted_time() -> str:
"""获取格式化的当前时间字符串"""
timestamp = time.time()
local_time = time.localtime(timestamp)
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
@staticmethod
def get_progress_bar_value() -> int:
"""获取进度条的当前值0-29"""
current_time = time.localtime()
current_minute = current_time.tm_min
return current_minute if current_minute < 30 else current_minute - 30
class WeexTransaction:
"""WebSea自动交易主类"""
def __init__(self, tge_id: int):
# 配置
self.tge_id = tge_id
self.tge_url = "http://127.0.0.1:50326"
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
self.trading_url = "https://www.websea.com/zh-CN/futures/ETH-USDT"
# 组件初始化
self.browser_manager = BrowserManager(self.tge_id, self.tge_url, self.tge_headers)
self.api_client = ApiClient()
self.position_manager = None # 需要在浏览器初始化后创建
self.trading_executor = None # 需要在浏览器初始化后创建
self.token_manager = None # 需要在浏览器初始化后创建
# 状态
self.pbar: Optional[tqdm] = None
self.last_kline_id: Optional[int] = None
self.kline_1: Optional[Dict] = None
self.kline_2: Optional[Dict] = None
self.kline_3: Optional[Dict] = None
def initialize(self) -> bool:
"""初始化所有组件"""
# 打开浏览器
if not self.browser_manager.open_browser():
logger.error("打开浏览器失败")
MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True)
return False
# 接管浏览器
if not self.browser_manager.take_over_browser():
logger.error("接管浏览器失败")
MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True)
return False
# 关闭多余标签页
self.browser_manager.close_extra_tabs()
# 初始化需要page的组件
page = self.browser_manager.page
self.trading_executor = TradingExecutor(page, self.api_client)
self.position_manager = PositionManager(self.trading_executor)
self.token_manager = TokenManager(self.api_client, page)
# 打开交易页面
page.get(url=self.trading_url)
# 初始化进度条
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80)
return True
def update_progress_bar(self) -> None:
"""更新进度条"""
if self.pbar:
self.pbar.n = TimeUtils.get_progress_bar_value()
self.pbar.refresh()
def fetch_and_update_kline(self) -> bool:
"""获取并更新K线数据"""
kline_data = self.api_client.get_kline_data()
if not kline_data:
logger.warning("获取K线数据失败")
MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True)
return False
# 排序并取最后三根
sorted_data = sorted(kline_data, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:]
# 检查是否是新K线
current_kline_id = self.kline_3["id"]
if self.last_kline_id == current_kline_id:
return False # 没有新K线
self.last_kline_id = current_kline_id
return True
def sync_position_status(self) -> bool:
"""同步持仓状态"""
success, position_info = self.api_client.get_position_status()
if not success:
logger.error("获取持仓状态失败")
MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True)
return False
if position_info:
pos_type = position_info.get("type", 0)
if pos_type == 1:
self.position_manager.current_position = 1
elif pos_type == 2:
self.position_manager.current_position = -1
else:
self.position_manager.current_position = 0
return True
def process_trading_logic(self) -> None:
"""处理交易逻辑"""
page = self.browser_manager.page
# 刷新页面
page.get(url=self.trading_url)
# 获取token
if not self.token_manager.get_token():
logger.error("获取token失败")
MessageSender.send_dingtalk_message("获取token失败", is_error=True)
return
# 同步持仓状态
if not self.sync_position_status():
return
# 检查止损
try:
self.position_manager.check_stop_loss(self.kline_1, self.kline_2)
except Exception as e:
logger.error(f"止损检查出错: {e}")
MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True)
return
# 检查交易信号
direction, signal_type = KlineAnalyzer.check_engulfing_signal(
prev=self.kline_1,
curr=self.kline_2
)
if direction:
# 获取可用余额
balance = self.api_client.get_available_balance()
if balance is None:
logger.error("获取可用余额失败")
MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True)
return
amount = balance / 100
# 执行交易
if not self.trading_executor.execute_trade(
direction=direction,
current_position=self.position_manager.current_position,
amount=amount
):
MessageSender.send_dingtalk_message(
f"交易执行失败,方向:{direction}",
is_error=True
)
return
# 更新持仓状态
if direction == "long":
self.position_manager.current_position = 1
elif direction == "short":
self.position_manager.current_position = -1
# 发送交易消息
MessageSender.send_dingtalk_message(
f"信号:{direction},开仓金额:{amount:.2f}",
is_error=False
)
# 再次同步持仓状态(交易后)
if not self.sync_position_status():
return
# 发送持仓信息
balance = self.api_client.get_available_balance()
if balance is not None:
success, position_info = self.api_client.get_position_status()
if success:
msg = MessageSender.format_position_message(position_info, balance)
MessageSender.send_dingtalk_message(msg, is_error=False)
def run(self) -> None:
"""主运行循环"""
# 初始化
if not self.initialize():
return
logger.info("系统初始化完成,开始运行...")
# 主循环
while True:
try:
# 更新进度条
self.update_progress_bar()
# 获取当前K线时间戳
current_timestamp = TimeUtils.get_current_kline_timestamp()
# 检查是否是新K线避免重复处理
if self.kline_3 and self.kline_3["id"] == current_timestamp:
time.sleep(10)
continue
# 获取并更新K线数据
if not self.fetch_and_update_kline():
time.sleep(10)
continue
# 处理交易逻辑
self.process_trading_logic()
# 重置进度条
if self.pbar:
self.pbar.reset()
# 等待一段时间
time.sleep(5)
except KeyboardInterrupt:
logger.info("收到中断信号,正在退出...")
break
except Exception as e:
logger.error(f"运行出错: {e}")
MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True)
time.sleep(10)
def action(self) -> None:
"""兼容旧接口的方法名"""
self.run()
if __name__ == '__main__':
transaction = WeexTransaction(tge_id=191303)
transaction.action()