dededdew
This commit is contained in:
88
test1.py
88
test1.py
@@ -1,71 +1,27 @@
|
||||
import requests
|
||||
import time
|
||||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||||
import datetime
|
||||
|
||||
# ========== 配置 ==========
|
||||
API_BASE_URL = "http://127.0.0.1:50326"
|
||||
# 获取当前时间戳
|
||||
current_timestamp = time.time()
|
||||
# 将当前时间戳转换为 datetime 对象
|
||||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
# 计算距离当前时间最近的整点或 30 分时刻
|
||||
if current_datetime.minute < 30:
|
||||
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
|
||||
else:
|
||||
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
|
||||
|
||||
# # ========== 步骤 1: 创建浏览器环境 ==========
|
||||
# print("1. 创建浏览器环境...")
|
||||
# response = requests.post(
|
||||
# f"{API_BASE_URL}/api/browser/create",
|
||||
# headers=headers,
|
||||
# params = {
|
||||
# "current": 1,
|
||||
# "pageSize": 20,
|
||||
# "keyword": "test"
|
||||
# }
|
||||
# )
|
||||
#
|
||||
# env_id = response.json()["data"]["envId"]
|
||||
# print(f" 环境创建成功,ID: {env_id}")
|
||||
# 将目标 datetime 对象转换为时间戳
|
||||
target_timestamp = target_datetime.timestamp()
|
||||
|
||||
# ========== 步骤 2: 启动浏览器 ==========
|
||||
print("2. 启动浏览器...")
|
||||
response = requests.post(
|
||||
f"{API_BASE_URL}/api/browser/start",
|
||||
json={"envId": 146473},
|
||||
headers=headers
|
||||
)
|
||||
#
|
||||
# debug_port = response.json()["data"]["port"]
|
||||
# print(f" 浏览器已启动,调试端口: {debug_port}")
|
||||
#
|
||||
# # ========== 步骤 3: 连接 DrissionPage ==========
|
||||
# print("3. 连接 DrissionPage...")
|
||||
# time.sleep(3) # 等待浏览器完全启动
|
||||
#
|
||||
# # 方式1: 通过端口连接(推荐)
|
||||
# co = ChromiumOptions().set_local_port(debug_port)
|
||||
# page = ChromiumPage(addr_or_opts=co)
|
||||
#
|
||||
# # 方式2: 直接使用端口号
|
||||
# # from DrissionPage import Chromium
|
||||
# # page = Chromium(debug_port)
|
||||
#
|
||||
# # 方式3: 使用地址:端口
|
||||
# # page = Chromium(f'127.0.0.1:{debug_port}')
|
||||
#
|
||||
# # 方式4: 使用 WebSocket URL(需要先获取 ws_url)
|
||||
# # ws_url = response.json()["data"]["ws"]
|
||||
# # page = Chromium(ws_url)
|
||||
#
|
||||
# print(" DrissionPage 连接成功")
|
||||
#
|
||||
# # ========== 步骤 4: 执行自动化操作 ==========
|
||||
# print("4. 执行自动化操作...")
|
||||
#
|
||||
# # 访问网页
|
||||
# page.get('https://www.example.com')
|
||||
# print(f" 当前页面: {page.title}")
|
||||
#
|
||||
# # 更多操作示例
|
||||
# page.get_screenshot('screenshot.png') # 截图
|
||||
# print(" 已保存截图")
|
||||
#
|
||||
# print("\n完成!")
|
||||
print(f"当前时间戳: {current_timestamp}")
|
||||
print(f"目标时间戳: {target_timestamp}")
|
||||
|
||||
# 进行时间戳比对
|
||||
if current_timestamp == target_timestamp:
|
||||
print("当前时间就是目标时间。")
|
||||
elif current_timestamp < target_timestamp:
|
||||
print(f"当前时间早于目标时间,还需等待 {target_timestamp - current_timestamp} 秒。")
|
||||
else:
|
||||
print(f"当前时间晚于目标时间,已经过去了 {current_timestamp - target_timestamp} 秒。")
|
||||
|
||||
@@ -225,6 +225,23 @@ class WeexTransaction:
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_now_time(self):
|
||||
# 获取当前时间戳
|
||||
current_timestamp = time.time()
|
||||
# 将当前时间戳转换为 datetime 对象
|
||||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||||
|
||||
# 计算距离当前时间最近的整点或 30 分时刻
|
||||
if current_datetime.minute < 30:
|
||||
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
|
||||
else:
|
||||
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
|
||||
|
||||
# 将目标 datetime 对象转换为时间戳
|
||||
target_timestamp = target_datetime.timestamp()
|
||||
|
||||
return int(target_timestamp) * 1000
|
||||
|
||||
def action(self):
|
||||
|
||||
# 获取比特端口
|
||||
@@ -282,6 +299,10 @@ class WeexTransaction:
|
||||
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
|
||||
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
|
||||
|
||||
# 判断抓取的数据是否正确
|
||||
if self.get_now_time() != self.kline_3[id]:
|
||||
continue
|
||||
|
||||
time.sleep(15)
|
||||
if self.start == 1:
|
||||
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
|
||||
|
||||
Reference in New Issue
Block a user