import time import asyncio import datetime from tqdm import * from loguru import * from DrissionPage import * from curl_cffi import requests from telethon import TelegramClient from 交易.tools import send_dingtalk_message def is_bullish(c): # 阳线 return float(c['close']) > float(c['open']) def is_bearish(c): # 阴线 return float(c['close']) < float(c['open']) class WeexTransaction: def __init__(self, tge_id): self.tge_port = None # tge浏览器使用端口 self.tge_id = tge_id # tge id self.tge_url = "http://127.0.0.1:50326" # tge本地服务url self.tge_headers = { "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", "Content-Type": "application/json" } # 替换为你自己的钉钉机器人 Webhook 地址 self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125" # 替换为你自己的钉钉机器人秘钥 self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998" self.page = None # 浏览器对象 self.start = 0 # 持仓状态 -1:做空,0:维持仓,1:做多 self.kline_1 = None # 0:跌,1:涨 self.kline_2 = None # 0:跌,1:涨 self.kline_1 = self.kline_2 = self.kline_3 = None self.direction = None # 信号类型 self.pbar = None # 进度条对象 self.session = requests.Session() # 接口请求对象 self.headers = None def get_now_time1(self): timestamp = time.time() local_time = time.localtime(timestamp) formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time) return formatted_time def send_dingtalk_message(self, message_content, type=1): if type: send_dingtalk_message( message_content=f"🔔weex:" + message_content ) else: for i in range(15): send_dingtalk_message( message_content=f"❌weex:" + message_content ) def openBrowser(self, ): # 直接指定ID打开窗口,也可以使用 createBrowser 方法返回的ID try: response = requests.post( f"{self.tge_url}/api/browser/start", json={"envId": self.tge_id}, headers=self.tge_headers ) self.tge_port = response.json()["data"]["port"] return True except: return False def take_over_browser(self): try: co = ChromiumOptions() co.set_local_port(self.tge_port) self.page = ChromiumPage(addr_or_opts=co) self.page.set.window.max() return True except: return False def is_bullish(self, c): # 阳线 return float(c['close']) > float(c['open']) def is_bearish(self, c): # 阴线 return float(c['close']) < float(c['open']) def check_signal(self, prev, curr): """ 包住形态信号判定(仅15分钟K线): - 前跌后涨包住 -> 做多 - 前涨后跌包住 -> 做空 """ p_open, p_close = float(prev['open']), float(prev['close']) c_open, c_close = float(curr['open']), float(curr['close']) # 前跌后涨包住 -> 做多 if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open): return "long", "bear_bull_engulf" # 前涨后跌包住 -> 做空 if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open): return "short", "bull_bear_engulf" return None, None def get_price(self): params = { 'contractId': '10000002', 'productCode': 'cmt_ethusdt', 'priceType': 'LAST_PRICE', 'klineType': 'MINUTE_30', 'limit': '300', 'timeZone': 'string', 'languageType': '1', 'sign': 'SIGN', } datas = [] for i in range(3): logger.info(f"获取最新数据:{i + 1}次。。。") try: response = self.session.get('https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2', params=params, ) for i in response.json()["data"]["dataList"]: insert_data = { 'id': int(i[4]), 'open': float(i[3]), 'high': float(i[1]), 'low': float(i[2]), 'close': float(i[0]) } datas.append(insert_data) return datas except: time.sleep(1) return datas def to_do_page(self): # self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT") self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click() time.sleep(1) num = self.get_num() if num: logger.info("获取可用余额成功!!!") else: logger.error("获取可用余额失败!!!") self.send_dingtalk_message("获取可用余额失败!!!", type=0) return self.page.ele('x://input[@placeholder="请输入数量"]').input(float(num) / 100) time.sleep(1) if self.direction == "long" and not self.start: logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开多") self.send_dingtalk_message(f"信号:{self.direction},开多,开仓金额:{float(num) / 100}") self.page.ele('x://*[contains(text(), "买入开多")]').click() self.start = 1 elif self.direction == "short" and not self.start: logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开空") self.send_dingtalk_message(f"信号:{self.direction},开空,开仓金额:{float(num) / 100}") self.page.ele('x://*[contains(text(), "卖出开空")]').click() self.start = -1 elif self.direction == "long" and self.start == -1: logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平空做多") self.send_dingtalk_message(f"信号:{self.direction},反手平空做多,开仓金额:{float(num) / 100}") self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True) time.sleep(1) self.page.ele('x://*[contains(text(), "闪电平仓")]').click() time.sleep(3) self.page.ele('x://*[contains(text(), "买入开多")]').click() self.start = 1 elif self.direction == "short" and self.start == 1: logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平多做空") self.send_dingtalk_message(f"信号:{self.direction},反手平多做空,开仓金额:{float(num) / 100}") self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True) time.sleep(1) self.page.ele('x://*[contains(text(), "闪电平仓")]').click() time.sleep(3) self.page.ele('x://*[contains(text(), "卖出开空")]').click() self.start = -1 def get_now_time(self): # 获取当前时间戳 current_timestamp = time.time() # 将当前时间戳转换为 datetime 对象 current_datetime = datetime.datetime.fromtimestamp(current_timestamp) # 计算距离当前时间最近的整点或 30 分时刻 if current_datetime.minute < 30: target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) else: target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0) # 将目标 datetime 对象转换为时间戳 target_timestamp = target_datetime.timestamp() return int(target_timestamp) * 1000 def close_extra_tabs_in_browser(self): try: for _, i in enumerate(self.page.get_tabs()): if _ == 0: continue i.close() return True except: pass return False def get_num(self): for i in range(3): try: response = self.session.post( 'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new', ) return response.json()["data"]["newContract"]["balanceList"][0]["available"] except: time.sleep(1) return False def get_token(self): tab = self.page.new_tab() tab.listen.start("/user/security/getLanguageType") for i in range(3): tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") try: res = tab.listen.wait(timeout=5) if res.request.headers.get("U-TOKEN"): if not self.headers: self.session.headers = res.request.headers else: self.session.headers["U-TOKEN"] = res.request.headers["U-TOKEN"] tab.close() return True except: time.sleep(1) tab.close() return False def get_position_status(self): json_data = { 'filterContractIdList': [ 10000002, ], 'limit': 100, 'languageType': 0, 'sign': 'SIGN', 'timeZone': 'string', } for i in range(3): try: response = self.session.post( 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage', json=json_data, ) datas = [] for i in response.json()["data"]["dataList"]: datas.append(i) if datas[0]["legacyOrderDirection"] == "OPEN_LONG": self.start = 1 elif datas[0]["legacyOrderDirection"] == "OPEN_SHORT": self.start = -1 else: self.start = 0 self.datas = datas[0] return True except: time.sleep(1) return False def action(self): # 获取比特端口 if self.openBrowser(): logger.info("获取打开比特成功,成功获取端口!!!") else: logger.error("打开比特失败!!!") return # 接管浏览器 if self.take_over_browser(): logger.info("接管比特浏览器成功!!!") else: logger.error("接管浏览器失败!!!") return if self.close_extra_tabs_in_browser(): logger.info('关闭多余标签页成功!!!') else: logger.info('关闭多余标签页失败!!!') self.page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页 self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc:进度条说明,ncols:长度 self.time_start = None # 时间状态 避免同一个时段,发生太多消息 while True: # 获取当前时间 current_time = time.localtime() current_minute = current_time.tm_min if current_minute < 30: self.pbar.n = current_minute self.pbar.refresh() else: self.pbar.n = current_minute - 30 self.pbar.refresh() # if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 35]: # 判断是否是 新的30分钟了 # # if current_minute not in range(60): # 判断是否是 新的30分钟了 # time.sleep(10) # continue if self.kline_3 and self.get_now_time() == self.kline_3["id"]: continue if self.get_token(): # 获取token logger.info("获取token成功!!!") else: logger.info("获取token失败!!!") self.send_dingtalk_message(message_content=f"获取token失败!!!", type=0) new_price_datas = self.get_price() if not new_price_datas: logger.info("获取最新价格有问题!!!") self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0) continue new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"]) self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:] # 判断抓取的数据是否正确 if self.get_now_time() != self.kline_3["id"]: continue if self.time_start == self.get_now_time(): continue self.time_start = self.get_now_time() self.page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页 if self.get_position_status(): logger.info("获取仓位信息成功!!!") else: logger.info("获取仓位信息失败!!!") self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0) continue try: if self.start == 1: if is_bearish(self.kline_1) and is_bearish(self.kline_2): logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平多") self.send_dingtalk_message( message_content=f"第一根信号:{self.kline_1},{self.kline_2},平多") self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True) time.sleep(1) self.page.ele('x://*[contains(text(), "闪电平仓")]').click() self.start = 0 elif self.start == -1: if is_bullish(self.kline_1) and is_bullish(self.kline_2): logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平空") self.send_dingtalk_message( message_content=f"第一根信号:{self.kline_1},{self.kline_2},平空") self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True) time.sleep(1) self.page.ele('x://*[contains(text(), "闪电平仓")]').click() self.start = 0 except: self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0) continue self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号 if self.direction: try: self.to_do_page() except Exception as e: self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0) continue self.pbar.reset() # 重置进度条 if self.get_position_status(): logger.info("获取仓位信息成功!!!") else: logger.info("获取仓位信息失败!!!") self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0) continue num = self.get_num() # 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值 message_content = None current_price = float(self.kline_3["close"]) if self.start: # 1. 从 self.datas 中提取并转换关键数据 # 假设 self.datas 是你贴的那个JSON字典(如果是个列表,取最新一笔) data = self.datas if isinstance(data, list) and data: data = data[-1] # 如果是列表,取最新一笔成交 fill_size = float(self.datas['fillSize']) # 持仓量,单位:ETH fill_value = float(self.datas['fillValue']) # 成交名义价值 USDT open_avg_price = fill_value / fill_size # 开仓均价(本笔成交均价) position_side = self.datas['positionSide'] # "SHORT" 或 "LONG" # 2. 方向判断并设置 self.start(方便后续策略使用) if position_side == 'SHORT': direction = "空" self.start = -1 elif position_side == 'LONG': direction = "多" self.start = 1 else: direction = "无" self.start = 0 # 3. 当前价格(从你的K线数据) current_price = float(self.kline_3["close"]) # 4. 持仓量(假设当前持仓等于这笔成交量,如有加减仓后续可维护累计) current_amount = fill_size # 单位:ETH # 5. 计算浮动盈亏(USDT) if self.start == 1: # 多头 unrealized_pnl = current_amount * (current_price - open_avg_price) elif self.start == -1: # 空头 unrealized_pnl = current_amount * (open_avg_price - current_price) else: unrealized_pnl = 0.0 # 6. 收益率 if self.start != 0 and open_avg_price > 0: if self.start == 1: pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 else: pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 rate_str = f" ({pnl_rate:+.2f}%)" else: rate_str = "" pnl_str = f"{unrealized_pnl:+.2f} USDT" # 7. 当前持仓名义价值 current_value = current_amount * current_price # 8. 持仓量显示 amount_display = f"{current_amount:.3f} ETH" # 9. 组装消息(钉钉Markdown格式,更美观) message_content = ( "**【WEEX ETHUSDT 永续持仓监控】**\n\n" f"**持仓方向**:{direction}\n" f"**当前现价**:{current_price:.2f} USDT\n" f"**开仓均价**:{open_avg_price:.2f} USDT\n" f"**持仓数量(eth)**:{amount_display} eth\n" f"**持仓数量(usdt)**:{float(self.datas['fillValue']) / 100:.2f} usdt\n" f"**名义价值**:{current_value:.2f} USDT\n" f"**浮动盈亏**:{pnl_str}{rate_str}\n" f"**账户可用余额**:{float(num):.2f} USDT" ) else: message_content = ( "**【WEEX ETHUSDT 永续持仓监控】**\n\n" f"**持仓方向**:无\n" f"**当前现价**:{current_price:.2f} USDT\n" # f"**开仓均价**:{open_avg_price:.2f} USDT\n" # f"**持仓数量**:{amount_display}\n" # f"**名义价值**:{current_value:.2f} USDT\n" # f"**浮动盈亏**:{pnl_str}{rate_str}\n" f"**账户可用余额**:{float(num):.2f} USDT" ) # 10. 发送钉钉消息 self.send_dingtalk_message(message_content=message_content) if __name__ == '__main__': WeexTransaction( tge_id=146473, ).action()