This commit is contained in:
Administrator
2025-12-15 15:56:42 +08:00
parent 2bb92f3f2d
commit 3944e6132b
6 changed files with 179 additions and 165 deletions

Binary file not shown.

View File

@@ -309,88 +309,93 @@ class WeexTransaction:
self.pbar = tqdm(total=30, desc="等待半小时周期", ncols=80)
while True:
now = time.localtime()
minute = now.tm_min
now = time.localtime()
minute = now.tm_min
# 更新进度条
self.pbar.n = minute if minute < 30 else minute - 30
self.pbar.refresh()
# 更新进度条
self.pbar.n = minute if minute < 30 else minute - 30
self.pbar.refresh()
# 必须是整点或半点及前 5 分钟
if minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 35]:
time.sleep(8)
# 必须是整点或半点及前 5 分钟
if minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 35]:
time.sleep(8)
return
# 时间重复跳过
if self.time_start == self.get_half_hour_timestamp():
time.sleep(5)
return
# ---- 获取 Token ----
if not self.get_token():
self.ding("获取 token 失败!", error=True)
return
logger.info("Token 获取成功")
# ---- 获取价格 ----
kdatas = self.get_price()
if not kdatas:
self.ding("获取价格失败!", error=True)
return
self.kline_1, self.kline_2, self.kline_3 = kdatas[-3:]
if int(self.kline_3["id"]) != self.get_half_hour_timestamp():
return
logger.success("K线获取成功")
self.time_start = self.get_half_hour_timestamp()
# ---- 刷新页面 ----
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
for i in range(3):
# ---- 获取仓位 ----
if not self.get_position_status():
self.ding("获取仓位失败!", error=True)
continue
# 时间重复跳过
if self.time_start == self.get_half_hour_timestamp():
time.sleep(5)
continue
if self.start:
break
# ---- 获取 Token ----
if not self.get_token():
self.ding("获取 token 失败!", error=True)
continue
logger.info("Token 获取成功")
# ---- 止损平仓 ----
try:
if self.start == 1 and is_bearish(self.kline_1) and is_bearish(self.kline_2):
self.ding("两根大阴线,平多")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
self.start = 0
# ---- 获取价格 ----
kdatas = self.get_price()
if not kdatas:
self.ding("获取价格失败!", error=True)
continue
elif self.start == -1 and is_bullish(self.kline_1) and is_bullish(self.kline_2):
self.ding("两根大阳线,平空")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
self.start = 0
except:
self.ding("止损平仓错误!", error=True)
# continue
return
self.kline_1, self.kline_2, self.kline_3 = kdatas[-3:]
if int(self.kline_3["id"]) != self.get_half_hour_timestamp():
continue
# ---- 生成新信号 ----
self.direction = self.check_signal(prev=self.kline_1, curr=self.kline_2)
logger.success("K线获取成功")
self.time_start = self.get_half_hour_timestamp()
# ---- 刷新页面 ----
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
for i in range(3):
# ---- 获取仓位 ----
if not self.get_position_status():
self.ding("获取仓位失败!", error=True)
continue
if self.start:
break
# ---- 止损平仓 ----
# ---- 执行交易 ----
if self.direction:
try:
if self.start == 1 and is_bearish(self.kline_1) and is_bearish(self.kline_2):
self.ding("两根大阴线,平多")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
self.start = 0
elif self.start == -1 and is_bullish(self.kline_1) and is_bullish(self.kline_2):
self.ding("两根大阳线,平空")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
self.start = 0
self.do_order()
except:
self.ding("止损平仓错误", error=True)
continue
self.ding("下单失败", error=True)
# ---- 生成新信号 ----
self.direction = self.check_signal(prev=self.kline_1, curr=self.kline_2)
# ---- 执行交易 ----
if self.direction:
try:
self.do_order()
except:
self.ding("下单失败!", error=True)
# ---- 周期结束消息 ----
self.pbar.reset()
self.ding(
f"持仓:{'' if self.start == 0 else ('' if self.start == 1 else '')}"
f"信号:{'' if not self.direction else ('' if self.direction == 'long' else '')}"
)
# ---- 周期结束消息 ----
self.pbar.reset()
self.ding(
f"持仓:{'' if self.start == 0 else ('' if self.start == 1 else '')}"
f"信号:{'' if not self.direction else ('' if self.direction == 'long' else '')}"
)
if __name__ == '__main__':
WeexTransaction(tge_id=196495).action()
while True:
try:
WeexTransaction(tge_id=196495).action()
except:
pass
time.sleep(30)

View File

@@ -322,101 +322,107 @@ class WeexTransaction:
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if self.kline_3 and self.get_now_time() == self.kline_3["id"]:
continue
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
return
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
if self.kline_3 and self.get_now_time() == self.kline_3["id"]:
return
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
continue
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
return
if self.time_start == self.get_now_time():
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
self.time_start = self.get_now_time()
if self.time_start == self.get_now_time():
return
self.page.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
self.time_start = self.get_now_time()
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
continue
self.page.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败!!!")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
return
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
return
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
return
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
return
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
continue
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
continue
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
if __name__ == '__main__':
WeexTransaction(
tge_id=191303,
).action()
while True:
try:
WeexTransaction(
tge_id=191303,
).action()
except:
pass
time.sleep(30)

View File

@@ -127,27 +127,30 @@ class WeexTransaction:
def get_price(self):
params = {
'unit': '30',
'resolution': 'M',
'contractID': '2',
'offset': '340',
'endTime': str(int(time.time())),
'contractId': '10000002',
'productCode': 'cmt_ethusdt',
'priceType': 'LAST_PRICE',
'klineType': 'MINUTE_30',
'limit': '300',
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
datas = []
for i in range(3):
logger.info(f"获取最新数据:{i + 1}次。。。")
try:
response = self.session.get('https://contract-v2.bitmart.com/v1/ifcontract/quote/kline', params=params,
timeout=5)
response = self.session.get('https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params, )
for i in response.json()["data"]:
insert_data = {
'id': int(i["timestamp"]) - 1,
'open': float(i["open"]),
'high': float(i["high"]),
'low': float(i["low"]),
'close': float(i["close"])
'id': int(i[4]),
'open': float(i[3]),
'high': float(i[1]),
'low': float(i[2]),
'close': float(i[0])
}
datas.append(insert_data)

View File

@@ -295,7 +295,7 @@ class WeexTransaction:
continue
# 避免同一分钟重复操作
if self.kline_3 or self.last_action_time == self.kline_3["id"]:
if self.kline_3 and self.last_action_time == self.kline_3["id"]:
continue
# 获取新 token

View File

@@ -687,7 +687,7 @@ def run_work(x_token_info, xstart_info):
hub.to_do_tui() # 发推
# hub.FollowTwitterAccount() # 关注
hub.FollowTwitterAccount() # 关注
if __name__ == '__main__':