Files
lm_code/交易/bitmart_交易.py
Administrator ef95188675 fwefwf
2025-12-12 10:03:30 +08:00

459 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import asyncio
import datetime
from tqdm import *
from loguru import *
from DrissionPage import *
from curl_cffi import requests
from telethon import TelegramClient
from 交易.tools import send_dingtalk_message
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.direction = None # 信号类型
self.pbar = None # 进度条对象
self.session = requests.Session() # 接口请求对象
async def to_do_tg(self, message_content):
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件
# ============================
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
try:
client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY)
await client.start() # 登录,如果第一次会要求输入手机号和验证码
bot = await client.get_entity("ergggreef")
await client.send_message(bot, message_content)
return True
except:
return False
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
# if is_bullish(curr) and is_bearish(prev) and int(c_open) - 3 <= int(p_close) and int(c_close) >= int(p_open):
# return "long", "bear_bull_engulf"
if is_bullish(curr) and is_bearish(prev) and int(c_close) >= int(p_open):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
# if is_bearish(curr) and is_bullish(prev) and int(c_open) + 3 >= int(p_close) and int(c_close) <= int(p_open):
# return "short", "bull_bear_engulf"
if is_bearish(curr) and is_bullish(prev) and int(c_close) <= int(p_open):
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'origin': 'https://derivatives.bitmart.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://derivatives.bitmart.com/',
'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
}
params = {
'unit': '30',
'resolution': 'M',
'contractID': '2',
'offset': '340',
'endTime': str(int(time.time())),
}
datas = []
for i in range(3):
logger.info(f"获取最新数据:{i + 1}次。。。")
try:
response = self.session.get('https://contract-v2.bitmart.com/v1/ifcontract/quote/kline', params=params,
headers=headers, timeout=5)
for i in response.json()["data"]:
insert_data = {
'id': int(i["timestamp"]) - 1,
'open': float(i["open"]),
'high': float(i["high"]),
'low': float(i["low"]),
'close': float(i["close"])
}
datas.append(insert_data)
return datas
except:
time.sleep(1)
return datas
def to_do_page(self):
self.mn_tab.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
num = self.get_num()
if num:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
return
self.mn_tab.ele('x://*[@id="size_0"]').input(float(num) / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://span[normalize-space(text()) = "买入/做多"]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://span[normalize-space(text()) = "卖出/做空"]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').click()
time.sleep(3)
self.mn_tab.ele('x:(//span[normalize-space(text()) = "买入/做多"])').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').click()
time.sleep(3)
self.mn_tab.ele('x://span[normalize-space(text()) = "卖出/做空"]').click()
self.start = -1
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_num(self):
params = {
'account_type': '3',
'isServerCal': '1',
}
for i in range(3):
try:
response = self.session.get(
'https://derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/accounts',
params=params,
)
return response.json()["data"]["accounts"][0]["available_balance"]
except:
time.sleep(1)
return False
def get_token(self):
tab = self.page.new_tab()
tab.listen.start("derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/userPositions")
for i in range(3):
tab.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
res = tab.listen.wait()
self.headers = res.request.headers
self.cookies = {}
try:
for i in res.request.cookies:
self.cookies[i["name"]] = i["value"]
if self.cookies.get('accessKey'):
self.session.cookies.update(self.cookies)
self.session.headers.update(self.headers)
if self.cookies:
break
except:
time.sleep(1)
tab.close()
return True if self.cookies else False
def get_now_time1(self):
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return formatted_time
def get_position_status(self):
params = {
'status': '1',
}
for i in range(3):
try:
response = self.session.get(
'https://derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/userPositions',
params=params,
impersonate="chrome120"
)
try:
resp_data = response.json()
positions = resp_data.get("data", {}).get("positions", [])
if not positions:
self.start = 0
else:
pos_type = positions[0].get("position_type")
if pos_type == 1:
self.start = 1
elif pos_type == 2:
self.start = -1
return True
except (KeyError, IndexError, AttributeError, TypeError, ValueError) as e:
# JSON 解析失败或结构异常
print(f"持仓数据解析错误: {e}")
self.start = 0
except:
time.sleep(1)
return False
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
send_dingtalk_message(message_content=f"bitmart{self.get_now_time()},获取最新价格有问题!!!")
continue
# 解析价格
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() == int(self.kline_3["id"]):
logger.success("获取最新交易价格成功!!!")
send_dingtalk_message("获取最新交易价格成功!!!")
else:
continue
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
send_dingtalk_message(message_content=f"bitmart{self.get_now_time()}获取token失败")
continue
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
send_dingtalk_message(message_content=f"bitmart{self.get_now_time()},获取仓位信息失败!!!")
continue
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x:(//span[normalize-space(text()) = "市价"])').click()
self.start = 0
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 根据策略生成信号判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
send_dingtalk_message(
message_content=f"bitmart:{self.get_now_time()},购买操作失败,{e}")
self.pbar.reset() # 重置进度条
send_dingtalk_message(
message_content=
f"bitmart{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
if __name__ == '__main__':
WeexTransaction(
tge_id=196495,
).action()