From 59fdd410379bc0baf7d573df7264858dffaa8d1e Mon Sep 17 00:00:00 2001 From: Administrator Date: Tue, 9 Dec 2025 15:11:18 +0800 Subject: [PATCH] fwefwf --- bitmart/main.py | 417 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 417 insertions(+) create mode 100644 bitmart/main.py diff --git a/bitmart/main.py b/bitmart/main.py new file mode 100644 index 0000000..b4977b7 --- /dev/null +++ b/bitmart/main.py @@ -0,0 +1,417 @@ +import re +import json +import hmac +import time +import base64 +import hashlib +import datetime +import requests + +from tqdm import * +from loguru import * +from DrissionPage import * +from bs4 import BeautifulSoup + + +def is_bullish(c): # 阳线 + return float(c['close']) > float(c['open']) + + +def is_bearish(c): # 阴线 + return float(c['close']) < float(c['open']) + + +class WeexTransaction: + def __init__(self, tge_id): + self.tge_port = None # tge浏览器使用端口 + self.tge_id = tge_id # tge id + self.tge_url = "http://127.0.0.1:50326" # tge本地服务url + self.tge_headers = { + "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", + "Content-Type": "application/json" + } + + # 替换为你自己的钉钉机器人 Webhook 地址 + self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125" + # 替换为你自己的钉钉机器人秘钥 + self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998" + + self.page = None # 浏览器对象 + + self.start = 0 # 持仓状态 -1:做空,0:维持仓,1:做多 + self.kline_1 = None # 0:跌,1:涨 + self.kline_2 = None # 0:跌,1:涨 + + self.direction = None # 信号类型 + + self.pbar = None # 进度条对象 + + def get_signature(self, timestamp): + # 将时间戳和密钥拼接 + string_to_sign = f'{timestamp}\n{self.secret}' + string_to_sign = string_to_sign.encode('utf-8') + # 使用 HMAC-SHA256 算法进行签名 + hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest() + # 对签名结果进行 Base64 编码 + sign = base64.b64encode(hmac_code).decode('utf-8') + return sign + + # def send_dingtalk_message(self, message_content): + # # 获取当前时间戳(毫秒) + # timestamp = str(round(time.time() * 1000)) + # # 生成签名 + # sign = self.get_signature(timestamp, ) + # # 拼接带有签名信息的完整 Webhook URL + # full_url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}" + # + # # 定义消息内容 + # message = { + # "msgtype": "text", + # "text": { + # "content": message_content + # } + # } + # + # # 设置请求头 + # headers = { + # "Content-Type": "application/json" + # } + # + # try: + # # 发送 POST 请求 + # response = requests.post(full_url, headers=headers, data=json.dumps(message)) + # + # except requests.RequestException as e: + # print(f"请求发生错误: {e}") + + def send_dingtalk_message(self, message_content): + + pass + + # url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481" + # + # res = requests.post( + # url=url, + # json={ + # "phone": "8613661496481", + # "bot_name": "ergggreef", + # "datas": [ + # {"send_message": [message_content], "click_button": [""], }, + # ] + # + # } + # ) + # + # print(res.json()) + + def openBrowser(self, ): # 直接指定ID打开窗口,也可以使用 createBrowser 方法返回的ID + + try: + + response = requests.post( + f"{self.tge_url}/api/browser/start", + json={"envId": self.tge_id}, + headers=self.tge_headers + ) + + self.tge_port = response.json()["data"]["port"] + + return True + except: + return False + + def take_over_browser(self): + try: + co = ChromiumOptions() + co.set_local_port(self.tge_port) + + self.page = ChromiumPage(addr_or_opts=co) + + self.page.set.window.max() + + return True + except: + return False + + def is_bullish(self, c): # 阳线 + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): # 阴线 + return float(c['close']) < float(c['open']) + + def check_signal(self, prev, curr): + """ + 包住形态信号判定(仅15分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open): + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open): + return "short", "bull_bear_engulf" + + return None, None + + def get_price(self): + + for i in range(3): + try: + logger.info(f"获取最新数据:{i + 1}次。。。") + self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") + res = self.mn_tab.listen.wait(timeout=25) # 等待并获取一个数据包 + + datas = [] + if res: + + for data in res.response.body["result"]['data']: + insert_data = { + 'id': int(data["id"]), + 'open': float(data["open"]), + 'high': float(data["high"]), + 'low': float(data["low"]), + 'close': float(data["close"]) + } + + datas.append(insert_data) + + return datas + except: + pass + + return False + + def remove_tags_and_spaces(self, html): + # 创建 BeautifulSoup 对象 + soup = BeautifulSoup(html, 'html.parser') + # 获取去除标签后的文本 + text = soup.get_text() + # 去除所有空格 + text = text.replace(" ", "") + return text + + def to_do_page(self): + # self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT") + + self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click() + time.sleep(1) + + self.mn_tab.ele('x://*[@id="amountInput"]').input(float(self.get_num()) / 100) + time.sleep(1) + + if self.direction == "long" and not self.start: + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开多") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开多") + self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click() + self.start = 1 + elif self.direction == "short" and not self.start: + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开空") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开空") + self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click() + self.start = -1 + elif self.direction == "long" and self.start == -1: + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平空做多") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平空做多") + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True) + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click() + time.sleep(3) + self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click() + self.start = 1 + elif self.direction == "short" and self.start == 1: + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平多做空") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平多做空") + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True) + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click() + time.sleep(3) + self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click() + self.start = -1 + + def get_text(self, target_text): + # 去除目标文本中的空白字符 + cleaned_target_text = re.sub(r'\s', '', target_text) + + # 创建 BeautifulSoup 对象 + soup = BeautifulSoup(self.mn_tab.html, 'html.parser') + + # 遍历所有标签的文本内容 + for tag in soup.find_all(): + tag_text = tag.get_text() + # 去除标签文本中的空白字符 + cleaned_tag_text = re.sub(r'\s', '', tag_text) + if cleaned_target_text in cleaned_tag_text: + return True + else: + return False + + def get_now_time(self): + # 获取当前时间戳 + current_timestamp = time.time() + # 将当前时间戳转换为 datetime 对象 + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的整点或 30 分时刻 + if current_datetime.minute < 30: + target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + else: + target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0) + + # 将目标 datetime 对象转换为时间戳 + target_timestamp = target_datetime.timestamp() + + return int(target_timestamp) + + def close_extra_tabs_in_browser(self): + + try: + for _, i in enumerate(self.page.get_tabs()): + if _ == 0: + continue + + i.close() + + return True + except: + pass + + return False + + def get_num(self): + + num_tab = self.page.new_tab() + num_tab.listen.start("capi.websea.com/webApi/asset/account") + + for i in range(3): + try: + logger.info(f"获取最新数据:{i + 1}次。。。") + num_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") + res = num_tab.listen.wait(timeout=15) # 等待并获取一个数据包 + + if res: + num_tab.close() + return res.response.body["result"]["availableUsd"] + except: + pass + + num_tab.close() + + return False + + def action(self): + # 获取比特端口 + if self.openBrowser(): + logger.info("获取打开比特成功,成功获取端口!!!") + else: + logger.error("打开比特失败!!!") + return + + # 接管浏览器 + if self.take_over_browser(): + logger.info("接管比特浏览器成功!!!") + else: + logger.error("接管浏览器失败!!!") + return + + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功!!!') + else: + logger.info('关闭多余标签页失败!!!') + + self.mn_tab = self.page.new_tab() + self.mn_tab.listen.start("capi.websea.com/webApi/market/getKline") + logger.success("浏览器开启抓包模式。。。") + + self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页 + + self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc:进度条说明,ncols:长度 + + while True: + # 获取当前时间 + current_time = time.localtime() + current_minute = current_time.tm_min + + if current_minute < 30: + self.pbar.n = current_minute + self.pbar.refresh() + else: + self.pbar.n = current_minute - 30 + self.pbar.refresh() + + if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了 + # if current_minute not in range(60): # 判断是否是 新的30分钟了 + time.sleep(10) + continue + + new_price_datas = self.get_price() + if new_price_datas: + logger.success("获取最新交易价格成功!!!") + else: + logger.info("获取最新价格有问题!!!") + continue + + new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"]) + self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:] + + # 判断抓取的数据是否正确 + if self.get_now_time() != self.kline_3["id"]: + continue + + time.sleep(15) + + if self.get_text(target_text="全部平仓ETHUSDT永续多全仓100X持仓量"): + self.start = 1 + elif self.get_text(target_text="全部平仓ETHUSDT永续空全仓100X持仓量"): + self.start = -1 + else: + self.start = 0 + + if self.start == 1: + if is_bearish(self.kline_1) and is_bearish(self.kline_2): + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平多") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平多") + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True) + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click() + self.start = 0 + elif self.start == -1: + if is_bullish(self.kline_1) and is_bullish(self.kline_2): + logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平空") + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平空") + + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True) + self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click() + self.start = 0 + + self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号 + + if self.direction: + try: + self.to_do_page() + except Exception as e: + self.send_dingtalk_message( + message_content=f"{datetime.datetime.now()},{e}") + + self.pbar.reset() # 重置进度条 + self.send_dingtalk_message( + message_content= + f"{datetime.datetime.now()}," + f"目前有持仓:{"无" if self.start == 0 else ("多" if self.start == 1 else "空")}," + f"当前信号:{"无" if not self.direction else ("多" if self.direction == "long" else "空")}" + ) + + +if __name__ == '__main__': + WeexTransaction( + tge_id=191303, + ).action() + + # //*[contains(text(), '特定文本')]