This commit is contained in:
27942
2025-12-23 11:12:32 +08:00
parent 5d01feb385
commit 49940862b1
12 changed files with 2365 additions and 1550 deletions

16
1111
View File

@@ -10,9 +10,15 @@ websea
账号yangyicheng6666@gmail.com
密码Wg138333.
api交易
yto0mg6r05kkyolwefqw
db2e87093c120766bca60d2282y72760688
yx20250715@gmail.com
Abc12345678
3. 更多交易信号
包住形态(原策略)
锤子线和上吊线
@@ -28,4 +34,12 @@ RSI超买超卖
时间止损持仓超过24小时自动平仓
凯利公式改进版:仓位管理
最大回撤控制
手续费考虑0.05%交易手续费
手续费考虑0.05%交易手续费
申请API原因我计划通过API实现自动化量化交易策略以减少人工干预提高交易效率并减少人为错误。通过API我能够实时获取市场数据、执行交易指令、监控账户状态并根据市场变化自动调整策略。
申请API具体方式用途我将使用API来获取实时的市场行情数据执行各种量化策略如网格交易、动量策略、均值回归等并进行账户管理。API将自动监控并调整我的交易参数比如仓位、止损、止盈等同时还需要定期回测策略和优化模型。
申请API主要交易什么币种我主要通过API进行ETH和BTC的合约交易还有一起其他的主流币
申请API操作频率预计API的调用频率将根据市场波动而变化通常情况下我的API请求频率大约在每秒5-10次有时会根据策略的调整需要进行更频繁的请求。
申请API大概体量预计每日API请求量为5,000 - 15,000次具体取决于市场波动和交易策略的复杂度。月度交易量预估为50,000 - 200,000 ETH/BTC。
申请API对接后预估增长目标通过API对接我预计能够实现量化策略的高效执行和自动化交易减少人工干预和错误。目标是在对接后的3个月内将交易频率提高50%并将月盈利提高20%至30%。

View File

@@ -19,7 +19,8 @@ def createBrowser(
port=None,
proxyUserName=None,
proxyPassword=None,
name='google'
name='google',
proxyType="socks5"
): # 创建或者更新窗口,指纹参数 browserFingerPrint 如没有特定需求,只需要指定下内核即可,如果需要更详细的参数,请参考文档
json_data = {
"groupId": groupId, # 分组id
@@ -27,7 +28,7 @@ def createBrowser(
'remark': '', # 备注
'proxyMethod': 1, # 代理方式 2自定义 3 提取IP
# 代理类型 ['noproxy', 'http', 'https', 'socks5', 'ssh']
'proxyType': 'socks5',
'proxyType': proxyType,
'host': host, # 代理主机EE
'port': port, # 代理端口
'proxyUserName': proxyUserName, # 代理账号
@@ -209,28 +210,27 @@ if __name__ == '__main__':
# proxyPassword=ips_info.password
# )
# fz_datas = get_group_lists()
# # fz_datas['推特']
#
# for i in range(10):
# for i in get_browser_lists_Browser(id=fz_datas['推特'], page=i):
# x_start_info = Xstart.get_or_none(
# Xstart.bit_id == i["id"]
# )
#
# if not x_start_info:
# deleteBrowser(id=i["id"])
#
# continue
#
# if x_start_info.start:
# continue
#
# deleteBrowser(id=i["id"])
#
# # x_start_info.delete_instance()
# x_start_info.bit_id = None
# x_start_info.save()
fz_datas = get_group_lists()
# fz_datas['推特']
for i in range(10):
for i in get_browser_lists_Browser(id=fz_datas['推特'], page=i):
x_start_info = Xstart.get_or_none(
Xstart.bit_id == i["id"]
)
if not x_start_info:
deleteBrowser(id=i["id"])
continue
if x_start_info.start:
continue
deleteBrowser(id=i["id"])
x_start_info.bit_id = None
x_start_info.save()
# for i in Xstart.select():
# res = browser_detail(id=i.bit_id)
@@ -243,20 +243,20 @@ if __name__ == '__main__':
# print(browser_detail(id="532651f5330e4caa917e644f9b676b"))
# # 批量修改代理
# for i in Xstart.select():
# for i in Xstart.select().where(Xstart.start == 0):
# update_proxy_Browser(id=i.bit_id, proxyType="http", host="104.168.59.92", port=random.randint(20001, 25000), )
fz_datas = get_group_lists()
print(fz_datas)
bit_id_list = []
for i in XToken.select().where(XToken.account_start == 2):
sql_info = Xstart.get_or_none(
Xstart.x_id == i.id
)
bit_id_list.append(sql_info.bit_id)
print(len(bit_id_list))
print(bit_id_list)
print(group_update(fz_datas["西班牙语"], bit_id_list))
# fz_datas = get_group_lists()
# print(fz_datas)
# bit_id_list = []
# for i in XToken.select().where(XToken.account_start == 2):
# sql_info = Xstart.get_or_none(
# Xstart.x_id == i.id
# )
#
# bit_id_list.append(sql_info.bit_id)
#
# print(len(bit_id_list))
# print(bit_id_list)
#
# print(group_update(fz_datas["西班牙语"], bit_id_list))

291
bitmart/api高频交易.py Normal file
View File

@@ -0,0 +1,291 @@
import time
import uuid
import datetime
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
return float(c['close']) < float(c['open'])
def is_trending(self, klines):
"""判断是否为单边行情通过布林带或RSI"""
close_prices = [kline['close'] for kline in klines]
rsi_value = self.calculate_rsi(close_prices, 14) # 使用14期的RSI
if rsi_value > 70 or rsi_value < 30:
return True # 单边行情
return False # 震荡行情
def calculate_rsi(self, prices, period=14):
"""计算RSI指标"""
deltas = [prices[i] - prices[i - 1] for i in range(1, len(prices))]
gains = [delta if delta > 0 else 0 for delta in deltas]
losses = [-delta if delta < 0 else 0 for delta in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
rs = avg_gain / avg_loss if avg_loss != 0 else 0
rsi = 100 - (100 / (1 + rs))
return rsi
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted[-3:] # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][0]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def calculate_size(self):
"""计算开仓张数使用可用余额的1%作为保证金"""
balance = self.get_available_balance()
self.balance = balance
if not balance or balance < 10:
logger.warning("余额不足,无法开仓")
return 0
price = self.get_current_price()
if not price:
price = 3000 # 保守估计避免size过大
leverage = int(self.leverage)
margin = balance * self.risk_percent # 使用1%余额
# ETHUSDT 1张 ≈ 0.001 ETH
size = int((margin * leverage) / (price * 0.001))
size = max(1, size)
logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})")
return size
def place_market_order(self, side: int, size: int):
if size <= 0:
return False
client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
response = self.contractAPI.post_submit_order(
contract_symbol=self.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type='market',
leverage=self.leverage,
open_type=self.open_type,
size=size
)[0]
if response['code'] == 1000:
logger.success(
f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}")
return True
else:
logger.error(f"下单失败: {response}")
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
return False
def check_signal(self, prev, curr):
"""简化英戈尔夫形态"""
if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']):
return "long"
if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']):
return "short"
return None
def execute_trade(self):
"""执行交易逻辑,根据市场状态切换策略"""
klines = self.get_klines()
if not klines or len(klines) < 3:
return
if self.is_trending(klines): # 单边行情
self.direction = self.check_signal(klines[1], klines[2])
if self.direction:
logger.success(f"检测到{self.direction}信号准备开仓用余额1%")
self.execute_trade() # 执行趋势跟随交易
else: # 震荡行情
self.execute_grid_trade() # 执行网格交易策略
def execute_grid_trade(self):
"""网格交易策略"""
logger.info("开始网格交易")
# 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.error("无法获取当前价格,网格交易无法执行")
return
# 假设的网格区间(可以根据需要调整)
grid_step = 10 # 每次10USDT为一个网格
grid_size = 1 # 每次开仓数量单位ETH
# 计算上网格和下网格价格
lower_price = current_price - grid_step # 下网格价格
upper_price = current_price + grid_step # 上网格价格
# 生成买卖网格订单
try:
# 设置买单
buy_order = self.place_market_order(side=1, size=grid_size) # 开多
if buy_order:
logger.info(f"已成功设置买单,买入价格:{lower_price},数量:{grid_size} ETH")
# 设置卖单
sell_order = self.place_market_order(side=4, size=grid_size) # 开空
if sell_order:
logger.info(f"已成功设置卖单,卖出价格:{upper_price},数量:{grid_size} ETH")
except Exception as e:
logger.error(f"网格交易下单失败: {e}")
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
while True:
current_minute = datetime.datetime.now().minute
if current_minute < 30:
self.pbar.n = current_minute
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
self.execute_trade()
time.sleep(2) # 高频交易,减少等待时间
if __name__ == '__main__':
BitmartFuturesTransaction().action()

View File

@@ -1,19 +0,0 @@
============================================================
ETHUSDT <20><><EFBFBD>ײ<EFBFBD><D7B2><EFBFBD><EFBFBD>ܽ<EFBFBD>
============================================================
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: <20><><EFBFBD>ƸǶ<C6B8> & <20><>͸<EFBFBD><CDB8>̬
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 1<><31><EFBFBD><EFBFBD>K<EFBFBD><4B>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><EFBFBD>: 2025<32><35>12<31><32>
<EFBFBD><EFBFBD>ʼ<EFBFBD>ʽ<EFBFBD>: $10000.00
<EFBFBD><EFBFBD>λ<EFBFBD><EFBFBD>С: 0.1 ETH
ֹ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 2.0%
ֹӯ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 3.0%
<EFBFBD>ܽ<EFBFBD><EFBFBD>״<EFBFBD><EFBFBD><EFBFBD>: 35
ӯ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 15 <20><>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 20 <20><>
ʤ<EFBFBD><EFBFBD>: 42.86%
<EFBFBD><EFBFBD>ӯ<EFBFBD><EFBFBD>: $14.84
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʽ<EFBFBD>: $10014.84
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>: 0.15%

View File

@@ -1,448 +0,0 @@
import time
import csv
from datetime import datetime
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
# ------------------ 配置 ------------------
START_YEAR = 2025
CONTRACT_SYMBOL = "ETHUSDT"
STEP = 3 # K 线周期,单位分钟
CSV_FILE = f"kline_{STEP}.csv"
# ------------------ 策略参数 ------------------
class TradingStrategy:
def __init__(self, initial_balance: float = 10000):
self.positions = [] # 存储持仓信息
self.trades = [] # 存储交易记录
self.initial_balance = initial_balance
self.current_balance = initial_balance
self.position_size = 0.1 # 每次开仓数量ETH
self.stop_loss_pct = 0.02 # 止损比例 2%
self.take_profit_pct = 0.03 # 止盈比例 3%
self.max_positions = 1 # 最大同时持仓数
def detect_dark_cloud_cover(self, df: pd.DataFrame, i: int) -> bool:
"""检测乌云盖顶形态(看跌)"""
if i < 1:
return False
prev_candle = df.iloc[i - 1]
curr_candle = df.iloc[i]
# 前一根是阳线
if prev_candle['close'] <= prev_candle['open']:
return False
# 当前是阴线
if curr_candle['close'] >= curr_candle['open']:
return False
# 当前开盘价高于前一根最高价
if curr_candle['open'] <= prev_candle['high']:
return False
# 当前收盘价低于前一根实体的50%以下
prev_body_mid = (prev_candle['open'] + prev_candle['close']) / 2
if curr_candle['close'] >= prev_body_mid:
return False
# 可选:添加成交量确认
# if curr_candle['volume'] < prev_candle['volume']:
# return False
return True
def detect_piercing_pattern(self, df: pd.DataFrame, i: int) -> bool:
"""检测刺透形态(看涨)"""
if i < 1:
return False
prev_candle = df.iloc[i - 1]
curr_candle = df.iloc[i]
# 前一根是阴线
if prev_candle['close'] >= prev_candle['open']:
return False
# 当前是阳线
if curr_candle['close'] <= curr_candle['open']:
return False
# 当前开盘价低于前一根最低价
if curr_candle['open'] >= prev_candle['low']:
return False
# 当前收盘价高于前一根实体的50%以上
prev_body_mid = (prev_candle['open'] + prev_candle['close']) / 2
if curr_candle['close'] <= prev_body_mid:
return False
# 可选:添加成交量确认
# if curr_candle['volume'] < prev_candle['volume']:
# return False
return True
def open_position(self, direction: str, price: float, timestamp: int, reason: str):
"""开仓"""
# 检查是否达到最大持仓限制
if len(self.positions) >= self.max_positions:
return False
position = {
'direction': direction, # 'long' 或 'short'
'open_price': price,
'open_time': timestamp,
'open_reason': reason,
'size': self.position_size,
'status': 'open',
'stop_loss': self.calculate_stop_loss(direction, price),
'take_profit': self.calculate_take_profit(direction, price)
}
self.positions.append(position)
print(f"\n📈 开仓信号 @ {datetime.fromtimestamp(timestamp)}")
print(f" 方向: {'做多' if direction == 'long' else '做空'}")
print(f" 价格: ${price:.2f}")
print(f" 数量: {self.position_size} ETH")
print(f" 止损: ${position['stop_loss']:.2f}")
print(f" 止盈: ${position['take_profit']:.2f}")
print(f" 原因: {reason}")
return True
def calculate_stop_loss(self, direction: str, price: float) -> float:
"""计算止损价"""
if direction == 'long':
return price * (1 - self.stop_loss_pct)
else:
return price * (1 + self.stop_loss_pct)
def calculate_take_profit(self, direction: str, price: float) -> float:
"""计算止盈价"""
if direction == 'long':
return price * (1 + self.take_profit_pct)
else:
return price * (1 - self.take_profit_pct)
def close_position(self, position_idx: int, price: float, timestamp: int, reason: str):
"""平仓"""
position = self.positions[position_idx]
# 计算盈亏
if position['direction'] == 'long':
pnl_pct = (price - position['open_price']) / position['open_price']
else:
pnl_pct = (position['open_price'] - price) / position['open_price']
pnl_amount = self.position_size * position['open_price'] * pnl_pct
# 更新持仓状态
position.update({
'close_price': price,
'close_time': timestamp,
'close_reason': reason,
'pnl_pct': pnl_pct * 100, # 百分比
'pnl_amount': pnl_amount,
'status': 'closed'
})
# 更新余额
self.current_balance += pnl_amount
print(f"\n📉 平仓信号 @ {datetime.fromtimestamp(timestamp)}")
print(f" 方向: {'平多单' if position['direction'] == 'long' else '平空单'}")
print(f" 开仓价: ${position['open_price']:.2f}")
print(f" 平仓价: ${price:.2f}")
print(f" 盈亏: {pnl_pct * 100:.2f}% (${pnl_amount:.2f})")
print(f" 原因: {reason}")
print(f" 当前余额: ${self.current_balance:.2f}")
# 记录交易
trade_record = position.copy()
trade_record['duration'] = timestamp - position['open_time']
trade_record['duration_minutes'] = trade_record['duration'] / 60
self.trades.append(trade_record)
# 从持仓列表中移除已平仓的仓位
self.positions.pop(position_idx)
return True
def check_stop_loss_take_profit(self, df: pd.DataFrame, i: int):
"""检查止损止盈"""
current_price = df.iloc[i]['close']
current_time = df.iloc[i]['id']
current_high = df.iloc[i]['high']
current_low = df.iloc[i]['low']
positions_to_close = []
for idx, position in enumerate(self.positions):
if position['status'] != 'open':
continue
close_reason = None
close_price = current_price
if position['direction'] == 'long':
# 多头止损检查:最低价是否触及止损
if current_low <= position['stop_loss']:
close_reason = "止损触发"
close_price = position['stop_loss'] # 使用止损价
# 多头止盈检查:最高价是否触及止盈
elif current_high >= position['take_profit']:
close_reason = "止盈触发"
close_price = position['take_profit'] # 使用止盈价
else:
# 空头止损检查:最高价是否触及止损
if current_high >= position['stop_loss']:
close_reason = "止损触发"
close_price = position['stop_loss'] # 使用止损价
# 空头止盈检查:最低价是否触及止盈
elif current_low <= position['take_profit']:
close_reason = "止盈触发"
close_price = position['take_profit'] # 使用止盈价
if close_reason:
positions_to_close.append((idx, close_price, close_reason))
# 按索引从大到小平仓,避免索引错乱
positions_to_close.sort(reverse=True)
for idx, close_price, close_reason in positions_to_close:
self.close_position(idx, close_price, current_time, close_reason)
def analyze_trades(self):
"""分析交易结果"""
if not self.trades:
return {
'total_trades': 0,
'winning_trades': 0,
'losing_trades': 0,
'win_rate': 0,
'total_pnl': 0,
'avg_pnl': 0,
'max_win': 0,
'max_loss': 0,
'profit_factor': 0
}
total_trades = len(self.trades)
winning_trades = [t for t in self.trades if t['pnl_amount'] > 0]
losing_trades = [t for t in self.trades if t['pnl_amount'] < 0]
total_pnl = sum(t['pnl_amount'] for t in self.trades)
total_win = sum(t['pnl_amount'] for t in winning_trades)
total_loss = abs(sum(t['pnl_amount'] for t in losing_trades))
win_rate = len(winning_trades) / total_trades * 100 if total_trades > 0 else 0
avg_pnl = total_pnl / total_trades if total_trades > 0 else 0
max_win = max(t['pnl_amount'] for t in winning_trades) if winning_trades else 0
max_loss = min(t['pnl_amount'] for t in losing_trades) if losing_trades else 0
profit_factor = total_win / total_loss if total_loss > 0 else float('inf')
return {
'total_trades': total_trades,
'winning_trades': len(winning_trades),
'losing_trades': len(losing_trades),
'win_rate': win_rate,
'total_pnl': total_pnl,
'avg_pnl': avg_pnl,
'max_win': max_win,
'max_loss': max_loss,
'profit_factor': profit_factor,
'final_balance': self.current_balance,
'total_return': ((self.current_balance - self.initial_balance) / self.initial_balance * 100)
}
# ------------------ 主程序 ------------------
def main():
print("=" * 60)
print("ETHUSDT 交易策略 - 乌云盖顶 & 刺透形态")
print(f"数据周期: {STEP}分钟K线")
print(f"分析时间: 2025年12月")
print("=" * 60)
# 1. 从CSV文件读取数据
print(f"\n📥 正在从 {CSV_FILE} 读取数据...")
try:
df = pd.read_csv(CSV_FILE)
print(f"成功读取 {len(df)} 条K线数据")
# 转换时间戳为datetime
df['datetime'] = pd.to_datetime(df['id'], unit='s')
df.set_index('datetime', inplace=True)
# 按时间排序
df = df.sort_index()
except FileNotFoundError:
print(f"❌ 错误: 文件 {CSV_FILE} 不存在")
return
except Exception as e:
print(f"❌ 读取文件时出错: {e}")
return
# 2. 过滤出12月份数据
dec_2025_start = pd.Timestamp('2025-12-01')
dec_2025_end = pd.Timestamp('2025-12-31 23:59:59')
# 确保数据在指定范围内
mask = (df.index >= dec_2025_start) & (df.index <= dec_2025_end)
dec_df = df.loc[mask].copy()
if len(dec_df) == 0:
print("❌ 未找到2025年12月的数据")
print(f"数据时间范围: {df.index[0]}{df.index[-1]}")
return
print(f"\n📊 12月份数据: {len(dec_df)} 条K线")
print(f"时间范围: {dec_df.index[0]}{dec_df.index[-1]}")
# 显示数据预览
print(f"\n数据预览:")
print(dec_df[['open', 'high', 'low', 'close', 'volume']].head())
# 3. 初始化策略
strategy = TradingStrategy(initial_balance=10000)
# 4. 运行策略
print("\n" + "=" * 60)
print("开始执行交易策略...")
print("=" * 60)
for i in range(1, len(dec_df)):
current_time = int(dec_df.iloc[i]['id'])
current_price = dec_df.iloc[i]['close']
# 首先检查止损止盈
strategy.check_stop_loss_take_profit(dec_df, i)
# 如果有持仓,跳过新信号(单次只持有一个仓位)
if len(strategy.positions) >= strategy.max_positions:
continue
# 检测形态
dark_cloud = strategy.detect_dark_cloud_cover(dec_df, i)
piercing = strategy.detect_piercing_pattern(dec_df, i)
# 处理信号
if dark_cloud:
# 乌云盖顶 - 看跌信号,开空仓
strategy.open_position('short', current_price, current_time, "乌云盖顶形态")
elif piercing:
# 刺透形态 - 看涨信号,开多仓
strategy.open_position('long', current_price, current_time, "刺透形态")
# 5. 强制平掉所有未平仓
print("\n" + "=" * 60)
print("强制平仓所有持仓...")
print("=" * 60)
if strategy.positions:
last_price = dec_df.iloc[-1]['close']
last_time = dec_df.iloc[-1]['id']
# 按索引从大到小平仓
for idx in range(len(strategy.positions) - 1, -1, -1):
strategy.close_position(idx, last_price, last_time, "策略结束强制平仓")
else:
print("没有需要平仓的持仓")
# 6. 生成交易报告
print("\n" + "=" * 60)
print("📊 交易汇总报告")
print("=" * 60)
analysis = strategy.analyze_trades()
if strategy.trades:
print(f"总交易次数: {analysis['total_trades']}")
print(f"盈利交易: {analysis['winning_trades']}")
print(f"亏损交易: {analysis['losing_trades']}")
print(f"胜率: {analysis['win_rate']:.2f}%")
print(f"总盈亏: ${analysis['total_pnl']:.2f}")
print(f"平均每笔盈亏: ${analysis['avg_pnl']:.2f}")
print(f"最大盈利: ${analysis['max_win']:.2f}")
print(f"最大亏损: ${analysis['max_loss']:.2f}")
print(f"盈利因子: {analysis['profit_factor']:.2f}")
print(f"初始资金: ${strategy.initial_balance:.2f}")
print(f"最终资金: ${analysis['final_balance']:.2f}")
print(f"总收益率: {analysis['total_return']:.2f}%")
# 打印每笔交易详情
print("\n" + "-" * 60)
print("详细交易记录:")
print("-" * 60)
for i, trade in enumerate(strategy.trades, 1):
print(f"\n交易 #{i}:")
print(f" 方向: {'做多' if trade['direction'] == 'long' else '做空'}")
print(f" 开仓时间: {datetime.fromtimestamp(trade['open_time'])}")
print(f" 开仓价格: ${trade['open_price']:.2f}")
print(f" 平仓时间: {datetime.fromtimestamp(trade['close_time'])}")
print(f" 平仓价格: ${trade['close_price']:.2f}")
print(f" 持仓时间: {trade['duration_minutes']:.1f} 分钟")
print(f" 盈亏: ${trade['pnl_amount']:.2f} ({trade['pnl_pct']:.2f}%)")
print(f" 原因: {trade['open_reason']} -> {trade['close_reason']}")
else:
print("本月无交易记录")
# 7. 保存交易记录到CSV
if strategy.trades:
trades_df = pd.DataFrame(strategy.trades)
# 格式化时间列
trades_df['open_datetime'] = trades_df['open_time'].apply(lambda x: datetime.fromtimestamp(x))
trades_df['close_datetime'] = trades_df['close_time'].apply(lambda x: datetime.fromtimestamp(x))
# 选择要保存的列
columns_to_save = [
'open_datetime', 'close_datetime', 'direction', 'open_price',
'close_price', 'size', 'pnl_amount', 'pnl_pct', 'duration_minutes',
'open_reason', 'close_reason'
]
trades_csv = f"eth_trades_{START_YEAR}_12.csv"
trades_df[columns_to_save].to_csv(trades_csv, index=False)
print(f"\n✅ 交易记录已保存到: {trades_csv}")
# 8. 保存策略参数和结果
with open(f"strategy_summary_{START_YEAR}_12.txt", 'w') as f:
f.write("=" * 60 + "\n")
f.write("ETHUSDT 交易策略总结\n")
f.write("=" * 60 + "\n\n")
f.write(f"策略名称: 乌云盖顶 & 刺透形态\n")
f.write(f"数据周期: {STEP}分钟K线\n")
f.write(f"分析时间: 2025年12月\n")
f.write(f"初始资金: ${strategy.initial_balance:.2f}\n")
f.write(f"仓位大小: {strategy.position_size} ETH\n")
f.write(f"止损比例: {strategy.stop_loss_pct * 100}%\n")
f.write(f"止盈比例: {strategy.take_profit_pct * 100}%\n\n")
if strategy.trades:
f.write(f"总交易次数: {analysis['total_trades']}\n")
f.write(f"盈利交易: {analysis['winning_trades']}\n")
f.write(f"亏损交易: {analysis['losing_trades']}\n")
f.write(f"胜率: {analysis['win_rate']:.2f}%\n")
f.write(f"总盈亏: ${analysis['total_pnl']:.2f}\n")
f.write(f"最终资金: ${analysis['final_balance']:.2f}\n")
f.write(f"总收益率: {analysis['total_return']:.2f}%\n")
else:
f.write("本月无交易记录\n")
print(f"\n✅ 策略总结已保存到: strategy_summary_{START_YEAR}_12.txt")
print("\n" + "=" * 60)
print("策略执行完成!")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@@ -1,768 +0,0 @@
"""
量化交易回测系统 - EMA交叉策略双EMA/三EMA
"""
import csv
import datetime
import numpy as np
from typing import List, Dict, Optional, Tuple
# ========================= EMA计算函数 =========================
def calculate_ema(prices: List[float], period: int) -> List[Optional[float]]:
"""
计算指数移动平均线(EMA)
Args:
prices: 价格列表(通常是收盘价)
period: EMA周期
Returns:
EMA值列表前period-1个为None
"""
if len(prices) < period:
return [None] * len(prices)
ema_values = [None] * (period - 1)
# 计算初始SMA作为EMA的起点
sma = sum(prices[:period]) / period
# EMA计算公式EMA_today = (Price_today * (2/(period+1))) + (EMA_yesterday * (1 - (2/(period+1))))
multiplier = 2 / (period + 1)
# 第一个EMA值
ema = sma
ema_values.append(ema)
# 计算后续EMA值
for price in prices[period:]:
ema = (price * multiplier) + (ema * (1 - multiplier))
ema_values.append(ema)
return ema_values
# ========================= 策略核心函数 =========================
def check_ema_cross(ema_fast: List[Optional[float]],
ema_slow: List[Optional[float]],
idx: int) -> Optional[str]:
"""
检查EMA金叉/死叉
Args:
ema_fast: 快线EMA值列表
ema_slow: 慢线EMA值列表
idx: 当前K线索引
Returns:
"golden" - 金叉(做多信号)
"dead" - 死叉(做空信号)
None - 无交叉
"""
if idx < 1:
return None
# 确保有足够的数据
if ema_fast[idx] is None or ema_fast[idx - 1] is None:
return None
if ema_slow[idx] is None or ema_slow[idx - 1] is None:
return None
# 前一根K线快线在慢线下方当前K线快线上穿慢线 -> 金叉
if ema_fast[idx - 1] < ema_slow[idx - 1] and ema_fast[idx] > ema_slow[idx]:
return "golden"
# 前一根K线快线在慢线上方当前K线快线下穿慢线 -> 死叉
if ema_fast[idx - 1] > ema_slow[idx - 1] and ema_fast[idx] < ema_slow[idx]:
return "dead"
return None
def check_triple_ema_cross(ema_fast: List[Optional[float]],
ema_mid: List[Optional[float]],
ema_slow: List[Optional[float]],
idx: int) -> Optional[str]:
"""
检查三EMA交叉更稳定的信号
规则:快线 > 中线 > 慢线 -> 多头排列 -> 做多
快线 < 中线 < 慢线 -> 空头排列 -> 做空
Args:
ema_fast: 快线如EMA7
ema_mid: 中线如EMA14
ema_slow: 慢线如EMA30
idx: 当前K线索引
Returns:
"golden" - 金叉多头排列
"dead" - 死叉空头排列
None - 无明确信号
"""
if idx < 1:
return None
# 确保有足够的数据
if any(ema[idx] is None or ema[idx - 1] is None for ema in [ema_fast, ema_mid, ema_slow]):
return None
# 检查是否形成多头排列EMA快 > 中 > 慢)
current_fast = ema_fast[idx]
current_mid = ema_mid[idx]
current_slow = ema_slow[idx]
prev_fast = ema_fast[idx - 1]
prev_mid = ema_mid[idx - 1]
prev_slow = ema_slow[idx - 1]
# 多头排列条件:快线 > 中线 > 慢线
is_golden_triple = current_fast > current_mid > current_slow
was_not_golden = not (prev_fast > prev_mid > prev_slow)
# 空头排列条件:快线 < 中线 < 慢线
is_dead_triple = current_fast < current_mid < current_slow
was_not_dead = not (prev_fast < prev_mid < prev_slow)
if is_golden_triple and was_not_golden:
return "golden"
elif is_dead_triple and was_not_dead:
return "dead"
return None
# ========================= 回测引擎 =========================
class EMABacktester:
"""EMA交叉策略回测器"""
def __init__(self,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9, # 用于MACD信号线
use_triple_ema: bool = False,
mid_period: int = 14, # 三EMA时的中间周期
use_macd_confirmation: bool = False,
stop_loss_pct: float = 0.02, # 2%止损
take_profit_pct: float = 0.05, # 5%止盈
trailing_stop_pct: float = 0.03): # 3%移动止损
self.fast_period = fast_period
self.slow_period = slow_period
self.signal_period = signal_period
self.use_triple_ema = use_triple_ema
self.mid_period = mid_period
self.use_macd_confirmation = use_macd_confirmation
self.stop_loss_pct = stop_loss_pct
self.take_profit_pct = take_profit_pct
self.trailing_stop_pct = trailing_stop_pct
self.stats = {
'golden_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '金叉做多'},
'dead_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '死叉做空'},
}
def calculate_macd(self, prices: List[float]) -> Tuple[List[Optional[float]],
List[Optional[float]],
List[Optional[float]]]:
"""
计算MACD指标
Returns: (MACD线, 信号线, 柱状图)
"""
# 计算快慢EMA
ema_fast = calculate_ema(prices, self.fast_period)
ema_slow = calculate_ema(prices, self.slow_period)
# 计算MACD线 = EMA快线 - EMA慢线
macd_line = []
for i in range(len(prices)):
if ema_fast[i] is not None and ema_slow[i] is not None:
macd_line.append(ema_fast[i] - ema_slow[i])
else:
macd_line.append(None)
# 计算信号线MACD的EMA
signal_line = calculate_ema([x for x in macd_line if x is not None] if any(macd_line) else [],
self.signal_period)
# 补全None值
signal_line_extended = [None] * (len(prices) - len(signal_line)) + signal_line if len(signal_line) < len(
prices) else signal_line
# 计算柱状图
histogram = []
for i in range(len(prices)):
if macd_line[i] is not None and signal_line_extended[i] is not None:
histogram.append(macd_line[i] - signal_line_extended[i])
else:
histogram.append(None)
return macd_line, signal_line_extended, histogram
def backtest(self, data: List[Dict]) -> Tuple[List[Dict], Dict]:
"""
执行EMA交叉策略回测
Args:
data: K线数据列表
Returns:
trades: 交易记录列表
stats: 统计数据
"""
# 提取收盘价
close_prices = [float(c['close']) for c in data]
# 计算EMA
ema_fast = calculate_ema(close_prices, self.fast_period)
ema_slow = calculate_ema(close_prices, self.slow_period)
# 如果需要三EMA计算中间EMA
ema_mid = None
if self.use_triple_ema:
ema_mid = calculate_ema(close_prices, self.mid_period)
# 如果需要MACD确认计算MACD
macd_line, signal_line, histogram = None, None, None
if self.use_macd_confirmation:
macd_line, signal_line, histogram = self.calculate_macd(close_prices)
trades: List[Dict] = []
current_position: Optional[Dict] = None
highest_price_since_entry = 0 # 用于移动止损
lowest_price_since_entry = float('inf') # 用于移动止损
# 遍历K线数据跳过前几个没有EMA值的
start_idx = max(self.fast_period, self.slow_period)
if self.use_triple_ema:
start_idx = max(start_idx, self.mid_period)
for idx in range(start_idx, len(data)):
current_bar = data[idx]
current_price = float(current_bar['close'])
open_price = float(current_bar['open'])
# ========== 信号检测 ==========
signal = None
# 基础双EMA交叉信号
if not self.use_triple_ema:
signal = check_ema_cross(ema_fast, ema_slow, idx)
# 三EMA排列信号
else:
signal = check_triple_ema_cross(ema_fast, ema_mid, ema_slow, idx)
# MACD确认可选
if signal and self.use_macd_confirmation:
macd_confirmed = False
if signal == "golden":
# 金叉确认MACD线在信号线上方且柱状图为正
if macd_line[idx] is not None and signal_line[idx] is not None:
macd_confirmed = (macd_line[idx] > signal_line[idx]) and (
histogram[idx] is not None and histogram[idx] > 0)
elif signal == "dead":
# 死叉确认MACD线在信号线下方且柱状图为负
if macd_line[idx] is not None and signal_line[idx] is not None:
macd_confirmed = (macd_line[idx] < signal_line[idx]) and (
histogram[idx] is not None and histogram[idx] < 0)
if not macd_confirmed:
signal = None
# ========== 空仓时开仓 ==========
if current_position is None and signal:
# 下一根K线开盘价入场
if idx + 1 < len(data):
entry_price = float(data[idx + 1]['open'])
if signal == "golden": # 做多
current_position = {
'direction': 'long',
'entry_price': entry_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'golden_cross',
'highest_price': entry_price, # 用于移动止损
'lowest_price': entry_price, # 用于空头的移动止损
}
self.stats['golden_cross']['count'] += 1
elif signal == "dead": # 做空
current_position = {
'direction': 'short',
'entry_price': entry_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'dead_cross',
'highest_price': entry_price,
'lowest_price': entry_price,
}
self.stats['dead_cross']['count'] += 1
# 跳过下一根因为已经在这根K线开盘入场
continue
# ========== 持仓时处理 ==========
if current_position:
pos_dir = current_position['direction']
entry_price = current_position['entry_price']
signal_key = current_position['signal']
# 更新最高/最低价(用于移动止损)
if pos_dir == 'long':
current_position['highest_price'] = max(current_position['highest_price'], current_price)
else: # short
current_position['lowest_price'] = min(current_position['lowest_price'], current_price)
# ========== 止损止盈检查 ==========
should_exit = False
exit_reason = ""
exit_price = current_price # 默认用收盘价平仓
# 固定止损
if pos_dir == 'long':
stop_loss_price = entry_price * (1 - self.stop_loss_pct)
if current_price <= stop_loss_price:
should_exit = True
exit_reason = "止损"
exit_price = stop_loss_price
# 固定止盈
take_profit_price = entry_price * (1 + self.take_profit_pct)
if current_price >= take_profit_price:
should_exit = True
exit_reason = "止盈"
exit_price = take_profit_price
# 移动止损
trailing_stop_price = current_position['highest_price'] * (1 - self.trailing_stop_pct)
if current_price <= trailing_stop_price:
should_exit = True
exit_reason = "移动止损"
exit_price = trailing_stop_price
else: # short
stop_loss_price = entry_price * (1 + self.stop_loss_pct)
if current_price >= stop_loss_price:
should_exit = True
exit_reason = "止损"
exit_price = stop_loss_price
# 固定止盈
take_profit_price = entry_price * (1 - self.take_profit_pct)
if current_price <= take_profit_price:
should_exit = True
exit_reason = "止盈"
exit_price = take_profit_price
# 移动止损
trailing_stop_price = current_position['lowest_price'] * (1 + self.trailing_stop_pct)
if current_price >= trailing_stop_price:
should_exit = True
exit_reason = "移动止损"
exit_price = trailing_stop_price
# ========== 反向信号检查 ==========
if signal and (
(signal == "dead" and pos_dir == "long") or
(signal == "golden" and pos_dir == "short")
):
should_exit = True
exit_reason = "反向信号"
# 反向信号用下一根开盘价平仓
if idx + 1 < len(data):
exit_price = float(data[idx + 1]['open'])
# ========== 执行平仓 ==========
if should_exit:
# 计算盈亏
if pos_dir == 'long':
diff = exit_price - entry_price
else: # short
diff = entry_price - exit_price
# 记录交易
trade = {
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']),
'exit_time': datetime.datetime.fromtimestamp(current_bar['id']),
'signal': self.stats[signal_key]['name'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': entry_price,
'exit': exit_price,
'diff': diff,
'exit_reason': exit_reason,
'holding_bars': idx - current_position['entry_idx'] + 1,
}
trades.append(trade)
# 更新统计
self.stats[signal_key]['total_profit'] += diff
if diff > 0:
self.stats[signal_key]['wins'] += 1
# 平仓
current_position = None
# 如果是因为反向信号平仓,立即反手开仓
if exit_reason == "反向信号" and signal and idx + 1 < len(data):
if signal == "golden": # 反手做多
current_position = {
'direction': 'long',
'entry_price': exit_price, # 同价反手
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'golden_cross',
'highest_price': exit_price,
'lowest_price': exit_price,
}
self.stats['golden_cross']['count'] += 1
elif signal == "dead": # 反手做空
current_position = {
'direction': 'short',
'entry_price': exit_price,
'entry_time': data[idx + 1]['id'],
'entry_idx': idx + 1,
'signal': 'dead_cross',
'highest_price': exit_price,
'lowest_price': exit_price,
}
self.stats['dead_cross']['count'] += 1
# 跳过下一根K线
continue
# ========== 尾仓处理 ==========
if current_position:
last_bar = data[-1]
exit_price = float(last_bar['close'])
pos_dir = current_position['direction']
entry_price = current_position['entry_price']
signal_key = current_position['signal']
diff = (exit_price - entry_price) if pos_dir == 'long' else (entry_price - exit_price)
trade = {
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']),
'exit_time': datetime.datetime.fromtimestamp(last_bar['id']),
'signal': self.stats[signal_key]['name'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': entry_price,
'exit': exit_price,
'diff': diff,
'exit_reason': "尾仓平仓",
'holding_bars': len(data) - current_position['entry_idx'],
}
trades.append(trade)
self.stats[signal_key]['total_profit'] += diff
if diff > 0:
self.stats[signal_key]['wins'] += 1
return trades, self.stats
# ========================= 可视化分析 =========================
def analyze_trades(trades: List[Dict], stats: Dict) -> Dict:
"""深入分析交易结果"""
if not trades:
return {}
# 基础统计
total_trades = len(trades)
winning_trades = [t for t in trades if t['diff'] > 0]
losing_trades = [t for t in trades if t['diff'] <= 0]
win_rate = len(winning_trades) / total_trades * 100 if total_trades > 0 else 0
# 盈亏统计
total_profit = sum(t['diff'] for t in trades)
avg_profit_per_trade = total_profit / total_trades if total_trades > 0 else 0
# 胜率相关
avg_win = np.mean([t['diff'] for t in winning_trades]) if winning_trades else 0
avg_loss = np.mean([t['diff'] for t in losing_trades]) if losing_trades else 0
# 盈亏比
profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
# 最大连续盈利/亏损
max_consecutive_wins = 0
max_consecutive_losses = 0
current_wins = 0
current_losses = 0
for trade in trades:
if trade['diff'] > 0:
current_wins += 1
current_losses = 0
max_consecutive_wins = max(max_consecutive_wins, current_wins)
else:
current_losses += 1
current_wins = 0
max_consecutive_losses = max(max_consecutive_losses, current_losses)
# 持仓时间分析
holding_bars = [t.get('holding_bars', 0) for t in trades]
avg_holding_bars = np.mean(holding_bars) if holding_bars else 0
# 按平仓原因分析
exit_reasons = {}
for trade in trades:
reason = trade.get('exit_reason', '未知')
exit_reasons[reason] = exit_reasons.get(reason, 0) + 1
return {
'total_trades': total_trades,
'win_rate': win_rate,
'total_profit': total_profit,
'avg_profit_per_trade': avg_profit_per_trade,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': profit_factor,
'max_consecutive_wins': max_consecutive_wins,
'max_consecutive_losses': max_consecutive_losses,
'avg_holding_bars': avg_holding_bars,
'exit_reasons': exit_reasons,
'winning_trades_count': len(winning_trades),
'losing_trades_count': len(losing_trades),
}
# ========================= 主程序 =========================
if __name__ == '__main__':
# 从CSV文件读取数据
csv_file = "kline_3.csv" # 请替换为你的CSV文件路径
read_data = []
try:
with open(csv_file, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
read_data.append({
'id': int(row['id']),
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close'])
})
print(f"成功读取 {len(read_data)} 条K线数据")
except FileNotFoundError:
print(f"文件 {csv_file} 未找到,请检查路径")
exit(1)
except Exception as e:
print(f"读取CSV文件时出错: {e}")
exit(1)
# 按时间排序
read_data.sort(key=lambda x: x['id'])
print("\n" + "=" * 60)
print("EMA交叉策略回测系统")
print("=" * 60)
# 策略参数选择
print("\n请选择EMA策略类型")
print("1. 双EMA交叉策略 (默认: EMA12/EMA26)")
print("2. 三EMA排列策略 (默认: EMA7/EMA14/EMA30)")
print("3. EMA+MACD双重确认策略")
choice = input("\n请输入选择 (1-3默认1): ").strip()
if choice == "2":
# 三EMA策略
backtester = EMABacktester(
fast_period=7,
slow_period=30,
use_triple_ema=True,
mid_period=14,
use_macd_confirmation=False,
stop_loss_pct=0.01, # 1.5%止损
take_profit_pct=0.4, # 4%止盈
trailing_stop_pct=0.04 # 2%移动止损
)
strategy_name = "三EMA排列策略(7/14/30)"
elif choice == "3":
# EMA+MACD策略
backtester = EMABacktester(
fast_period=12,
slow_period=26,
signal_period=9,
use_triple_ema=False,
use_macd_confirmation=True,
stop_loss_pct=0.1, # 1%止损
take_profit_pct=0.4, # 3%止盈
trailing_stop_pct=0.04 # 1.5%移动止损
)
strategy_name = "EMA+MACD双重确认策略(12/26/9)"
else:
# 默认双EMA策略
backtester = EMABacktester(
fast_period=12,
slow_period=26,
use_triple_ema=False,
use_macd_confirmation=False,
stop_loss_pct=0.02, # 2%止损
take_profit_pct=0.05, # 5%止盈
trailing_stop_pct=0.03 # 3%移动止损
)
strategy_name = "双EMA交叉策略(12/26)"
print(f"\n使用策略: {strategy_name}")
print("开始回测...")
# 执行回测
trades, stats = backtester.backtest(read_data)
# ========== 交易详情和盈利计算 ==========
print(f"\n{'=' * 60}")
print(f"回测结果 - {strategy_name}")
print(f"{'=' * 60}")
# 参数设定
contract_size = 10000 # 合约规模
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 平仓手续费率
total_points_profit = 0 # 累计点差
total_money_profit = 0 # 累计金额盈利
total_fee = 0 # 累计手续费
print(f"\n交易详情 (共{len(trades)}笔):")
print("-" * 100)
for i, t in enumerate(trades, 1):
entry = t['entry']
exit = t['exit']
direction = t['direction']
# 原始价差
point_diff = (exit - entry) if direction == '做多' else (entry - exit)
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate)
# 净利润
net_profit = money_profit - fee
# 保存结果
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
# 输出交易详情
profit_color = "\033[92m" if net_profit > 0 else "\033[91m"
reset_color = "\033[0m"
print(f"{i:3d}. {t['entry_time'].strftime('%Y-%m-%d %H:%M')} -> "
f"{t['exit_time'].strftime('%Y-%m-%d %H:%M')} "
f"{direction}({t['signal']}) "
f"入={entry:.2f} 出={exit:.2f} "
f"{profit_color}净利={net_profit:+.2f}{reset_color} "
f"(持有:{t.get('holding_bars', '?')}根K线, 原因:{t.get('exit_reason', '未知')})")
# ========== 汇总统计 ==========
total_net_profit = total_money_profit - total_fee
print(f"\n{'=' * 60}")
print("汇总统计")
print(f"{'=' * 60}")
print(f"总交易笔数: {len(trades)}")
print(f"总点差: {total_points_profit:.2f}")
print(f"总原始盈利(未扣费): {total_money_profit:.2f}")
print(f"总手续费: {total_fee:.2f}")
print(f"总净利润: {total_net_profit:.2f}")
# 深入分析
analysis = analyze_trades(trades, stats)
if analysis:
print(f"\n策略分析:")
print(f"- 胜率: {analysis['win_rate']:.2f}%")
print(f"- 平均每笔盈利: {analysis['avg_profit_per_trade']:.2f}")
print(f"- 平均盈利: {analysis['avg_win']:.2f}")
print(f"- 平均亏损: {analysis['avg_loss']:.2f}")
print(f"- 盈亏比: {analysis['profit_factor']:.2f}")
print(f"- 最大连续盈利: {analysis['max_consecutive_wins']}")
print(f"- 最大连续亏损: {analysis['max_consecutive_losses']}")
print(f"- 平均持仓K线数: {analysis['avg_holding_bars']:.1f}")
print(f"\n平仓原因统计:")
for reason, count in analysis['exit_reasons'].items():
percentage = count / len(trades) * 100
print(f" - {reason}: {count} 笔 ({percentage:.1f}%)")
# ========== 信号统计 ==========
print(f"\n{'=' * 60}")
print("信号统计")
print(f"{'=' * 60}")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
if count > 0:
win_rate = (wins / count * 100)
avg_p = total_p / count
profit_color = "\033[92m" if total_p > 0 else "\033[91m"
reset_color = "\033[0m"
print(f"{name}:")
print(f" 信号次数: {count}")
print(f" 胜率: {win_rate:.2f}%")
print(f" 总价差: {profit_color}{total_p:.2f}{reset_color}")
print(f" 平均价差: {avg_p:.2f}")
# ========== 风险指标 ==========
if len(trades) > 1:
returns = [t['net_profit'] for t in trades]
# 夏普比率(简化版)
avg_return = np.mean(returns)
std_return = np.std(returns)
sharpe_ratio = avg_return / std_return if std_return > 0 else 0
# 最大回撤
cumulative_returns = np.cumsum(returns)
running_max = np.maximum.accumulate(cumulative_returns)
drawdown = cumulative_returns - running_max
max_drawdown = abs(np.min(drawdown)) if len(drawdown) > 0 else 0
print(f"\n{'=' * 60}")
print("风险指标")
print(f"{'=' * 60}")
print(f"夏普比率(简化): {sharpe_ratio:.4f}")
print(f"最大回撤: {max_drawdown:.2f}")
# 盈亏分布
print(f"\n盈亏分布:")
profit_ranges = {
"大亏 (< -100)": len([r for r in returns if r < -100]),
"中亏 (-100 ~ -50)": len([r for r in returns if -100 <= r < -50]),
"小亏 (-50 ~ 0)": len([r for r in returns if -50 <= r < 0]),
"小盈 (0 ~ 50)": len([r for r in returns if 0 <= r < 50]),
"中盈 (50 ~ 100)": len([r for r in returns if 50 <= r < 100]),
"大盈 (> 100)": len([r for r in returns if r >= 100]),
}
for range_name, count in profit_ranges.items():
if count > 0:
percentage = count / len(returns) * 100
print(f" {range_name}: {count} 笔 ({percentage:.1f}%)")
print(f"\n{'=' * 60}")
print("回测完成!")
print(f"{'=' * 60}")

View File

@@ -1,133 +0,0 @@
import csv
from datetime import datetime, timezone
import matplotlib.pyplot as plt
# ---------------- 配置 ----------------
CSV_FILE = "kline_3.csv" # CSV 文件路径
LEVERAGE = 100
CAPITAL = 10000
POSITION_RATIO = 0.01
FEE_RATE = 0.0005
# ---------------- 读取 CSV ----------------
data = []
with open(CSV_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
ts = int(row['id'])
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
if dt.year == 2025 and dt.month == 1:
data.append({
'time': dt,
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close']),
})
# ---------------- 策略回测 ----------------
total_profit = 0
total_fee = 0
trades = [] # 保存每笔交易详情
for i in range(len(data) - 1):
k = data[i]
k_next = data[i + 1]
body = abs(k['close'] - k['open'])
upper_shadow = k['high'] - max(k['close'], k['open'])
lower_shadow = min(k['close'], k['open']) - k['low']
position_usdt = CAPITAL * POSITION_RATIO
leveraged_position = position_usdt * LEVERAGE
# ---------------- 锤子线 → 做多 ----------------
if body != 0 and lower_shadow >= 2 * body:
profit_raw = (k_next['close'] - k['close']) / k['close'] * leveraged_position
fee_open = leveraged_position * FEE_RATE
fee_close = leveraged_position * FEE_RATE
profit_net = profit_raw - fee_open - fee_close
total_profit += profit_raw
total_fee += fee_open + fee_close
trades.append({
'方向': '',
'开仓时间': k['time'].strftime("%Y-%m-%d %H:%M"),
'开仓价格': k['close'],
'平仓时间': k_next['time'].strftime("%Y-%m-%d %H:%M"),
'平仓价格': k_next['close'],
'本金': position_usdt,
'杠杆仓位': leveraged_position,
'开仓手续费': fee_open,
'平仓手续费': fee_close,
'原始盈亏': profit_raw,
'净盈亏': profit_net
})
# ---------------- 上吊线 → 做空 ----------------
elif body != 0 and upper_shadow >= 2 * body:
profit_raw = (k['close'] - k_next['close']) / k['close'] * leveraged_position
fee_open = leveraged_position * FEE_RATE
fee_close = leveraged_position * FEE_RATE
profit_net = profit_raw - fee_open - fee_close
total_profit += profit_raw
total_fee += fee_open + fee_close
trades.append({
'方向': '',
'开仓时间': k['time'].strftime("%Y-%m-%d %H:%M"),
'开仓价格': k['close'],
'平仓时间': k_next['time'].strftime("%Y-%m-%d %H:%M"),
'平仓价格': k_next['close'],
'本金': position_usdt,
'杠杆仓位': leveraged_position,
'开仓手续费': fee_open,
'平仓手续费': fee_close,
'原始盈亏': profit_raw,
'净盈亏': profit_net
})
# ---------------- 输出统计 ----------------
print(f"1月总原始盈亏: {total_profit:.2f} USDT")
print(f"1月总手续费: {total_fee:.2f} USDT")
print(f"1月净盈亏: {total_profit - total_fee:.2f} USDT")
print("\n交易明细前10笔示例:")
n = 0
for t in trades:
if t['原始盈亏'] > 10 or t['原始盈亏'] < -10:
print(f"{t['方向']}仓 | 开仓: {t['开仓时间']} @ {t['开仓价格']:.2f} | "
f"平仓: {t['平仓时间']} @ {t['平仓价格']:.2f} | "
f"本金: {t['本金']:.2f} | 杠杆仓位: {t['杠杆仓位']:.2f} | "
f"开仓手续费: {t['开仓手续费']:.2f} | 平仓手续费: {t['平仓手续费']:.2f} | "
f"原始盈亏: {t['原始盈亏']:.2f} | 净盈亏: {t['净盈亏']:.2f}")
n += t['原始盈亏']
print(n)
# # ---------------- 绘制 K 线图 + 交易点 ----------------
# times = [k['time'] for k in data]
# closes = [k['close'] for k in data]
#
# plt.figure(figsize=(16,6))
# plt.plot(times, closes, color='black', label='ETH 收盘价')
#
# for t in trades:
# open_time = datetime.strptime(t['开仓时间'], "%Y-%m-%d %H:%M")
# close_time = datetime.strptime(t['平仓时间'], "%Y-%m-%d %H:%M")
# if t['方向'] == '多':
# plt.scatter(open_time, t['开仓价格'], color='green', marker='^', s=100, label='多开' if '多开' not in plt.gca().get_legend_handles_labels()[1] else "")
# plt.scatter(close_time, t['平仓价格'], color='red', marker='v', s=100, label='多平' if '多平' not in plt.gca().get_legend_handles_labels()[1] else "")
# else:
# plt.scatter(open_time, t['开仓价格'], color='red', marker='v', s=100, label='空开' if '空开' not in plt.gca().get_legend_handles_labels()[1] else "")
# plt.scatter(close_time, t['平仓价格'], color='green', marker='^', s=100, label='空平' if '空平' not in plt.gca().get_legend_handles_labels()[1] else "")
#
# plt.xlabel('时间')
# plt.ylabel('价格(USDT)')
# plt.title('ETH 永续合约 1 月交易回测100倍杠杆')
# plt.legend()
# plt.grid(True)
# plt.gcf().autofmt_xdate()
# plt.show()

View File

@@ -18,7 +18,7 @@ class StrategyConfig:
# ===== 合约 =====
contract_symbol: str = "ETHUSDT"
open_type: str = "cross"
leverage: str = "2" # 1~2 更稳
leverage: str = "50" # 1~2 更稳
# ===== K线与指标 =====
step_min: int = 1
@@ -40,7 +40,7 @@ class StrategyConfig:
cooldown_sec_after_exit: int = 10 # 平仓后冷却10秒防抖
# ===== 下单/仓位 =====
risk_percent: float = 0.0015 # 每次用可用余额的0.15%作为保证金预算
risk_percent: float = 0.005 # 每次用可用余额的0.15%作为保证金预算
min_size: int = 1
max_size: int = 5000
@@ -530,3 +530,4 @@ if __name__ == "__main__":
bot.action()
# 9274.08
# 9260.59

762
test1.py
View File

@@ -1,92 +1,692 @@
import csv
from datetime import datetime, timezone
import matplotlib.pyplot as plt
import os
import time
import uuid
import datetime
from dataclasses import dataclass
# ---------------- 配置 ----------------
CSV_FILE = "bitmart/kline_3.csv" # 你的 CSV 文件路径
LEVERAGE = 100
CAPITAL = 10000
POSITION_RATIO = 0.01
FEE_RATE = 0.00015
from tqdm import tqdm
from loguru import logger
# ---------------- 读取 CSV ----------------
data = []
with open(CSV_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
ts = int(row['id'])
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
if dt.year == 2025 and dt.month == 1:
data.append({
'time': dt,
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close']),
})
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
# ---------------- 识别交易信号 ----------------
trades = []
position_usdt = CAPITAL * POSITION_RATIO
leveraged_position = position_usdt * LEVERAGE
from 交易.tools import send_dingtalk_message
for i in range(len(data)-1):
k1 = data[i]
k2 = data[i+1]
# 刺透形态Piercing Line多头
if k1['close'] < k1['open'] and k2['close'] > k2['open']:
midpoint = (k1['open'] + k1['close']) / 2
if k2['open'] < k1['close'] and k2['close'] > midpoint:
trades.append({
'方向': '',
'开仓时间': k2['time'],
'开仓价格': k2['open'],
'平仓时间': k2['time'],
'平仓价格': k2['close']
})
@dataclass
class StrategyConfig:
# =============================
# 1m | ETH 永续 | 控止损≤5/日
# =============================
# 乌云盖顶Dark Cloud Cover空头
elif k1['close'] > k1['open'] and k2['close'] < k2['open']:
midpoint = (k1['open'] + k1['close']) / 2
if k2['open'] > k1['close'] and k2['close'] < midpoint:
trades.append({
'方向': '',
'开仓时间': k2['time'],
'开仓价格': k2['open'],
'平仓时间': k2['time'],
'平仓价格': k2['close']
})
# ===== 合约 =====
contract_symbol: str = "ETHUSDT"
open_type: str = "cross"
leverage: str = "30" # 50 -> 30显著降低1m噪声导致的连环止损与回撤波动
# ---------------- 绘制 K 线图 ----------------
plt.figure(figsize=(16,6))
# ===== K线与指标 =====
step_min: int = 1
lookback_min: int = 240
ema_len: int = 36 # 30 -> 36均值更稳信号更挑剔
atr_len: int = 14
times = [k['time'] for k in data]
opens = [k['open'] for k in data]
closes = [k['close'] for k in data]
highs = [k['high'] for k in data]
lows = [k['low'] for k in data]
# ===== 动态阈值基础(自适应行情)=====
entry_dev_floor: float = 0.0012 # 0.10% -> 0.12%:过滤小噪声进场
tp_floor: float = 0.0006 # 0.05% -> 0.06%:更接近“净盈利”
sl_floor: float = 0.0018 # 0.15% -> 0.18%ETH 1m插针多底线略放宽
# 绘制 K 线(用竖线表示最高最低价,用矩形表示开收盘价
for i in range(len(data)):
color = 'green' if closes[i] >= opens[i] else 'red'
plt.plot([times[i], times[i]], [lows[i], highs[i]], color='black') # 高低价
plt.plot([times[i]-0.0005, times[i]+0.0005], [opens[i], opens[i]], color=color, linewidth=5) # 开盘价
plt.plot([times[i]-0.0005, times[i]+0.0005], [closes[i], closes[i]], color=color, linewidth=5) # 收盘价
# 更挑剔、更少止损(进场更苛刻;止损不过度随波动放大
entry_k: float = 1.45 # 1.20 -> 1.45:减少进场频率
tp_k: float = 0.65 # 0.60 -> 0.65:略抬止盈
sl_k: float = 1.05 # 1.20 -> 1.05配合sl_floor避免高波动下止损无限变大
# ---------------- 标注交易信号 ----------------
for t in trades:
if t['方向'] == '':
plt.scatter(t['开仓时间'], t['开仓价格'], color='green', marker='^', s=100, label='多开' if '多开' not in plt.gca().get_legend_handles_labels()[1] else "")
plt.scatter(t['平仓时间'], t['平仓价格'], color='red', marker='v', s=100, label='多平' if '多平' not in plt.gca().get_legend_handles_labels()[1] else "")
else:
plt.scatter(t['开仓时间'], t['开仓价格'], color='red', marker='v', s=100, label='空开' if '空开' not in plt.gca().get_legend_handles_labels()[1] else "")
plt.scatter(t['平仓时间'], t['平仓价格'], color='green', marker='^', s=100, label='空平' if '空平' not in plt.gca().get_legend_handles_labels()[1] else "")
# ===== 时间/冷却 =====
max_hold_sec: int = 75 # 90/120 -> 751m回归不恋战
cooldown_sec_after_exit: int = 20 # 10 -> 20减少“刚出又进”连环单
plt.xlabel('时间')
plt.ylabel('价格(USDT)')
plt.title('ETH 永续合约 1 月交易回测(刺透 & 乌云形态)')
plt.legend()
plt.grid(True)
plt.gcf().autofmt_xdate()
plt.show()
# ===== 下单/仓位 =====
risk_percent: float = 0.004 # 0.005 -> 0.004再压一点波动更贴合止损≤5/日
min_size: int = 1
max_size: int = 5000
# ===== 日内风控 =====
daily_loss_limit: float = 0.02 # -2% 停机
daily_profit_cap: float = 0.01 # +1% 封顶停机
# ===== 危险模式过滤1m ETH 更敏感)=====
atr_ratio_kill: float = 0.0038 # 0.0045 -> 0.0038:更早暂停开仓
big_body_kill: float = 0.010 # 0.012 -> 0.010:更敏感
# ===== 轮询节奏 =====
klines_refresh_sec: int = 10
tick_refresh_sec: int = 1
status_notify_sec: int = 60
# =========================================================
# ✅ 止损后同向入场加门槛(但不禁止同向重入)
# =========================================================
reentry_penalty_mult: float = 1.55 # 同向入场门槛×1.55:大幅降低连环止损概率
reentry_penalty_max_sec: int = 180 # 罚时最长持续
reset_band_k: float = 0.45 # dev回到更靠近均值才解除罚则
reset_band_floor: float = 0.0006 # 最小复位带宽0.06%
# =========================================================
# ✅ 自动阈值ATR/Price 分位数基准(更稳,不被短时噪声带跑)
# =========================================================
vol_baseline_window: int = 120
vol_baseline_quantile: float = 0.65
vol_scale_min: float = 0.80
vol_scale_max: float = 1.60
# =========================================================
# ✅ 升级:止损后同方向 SL 放宽幅度与“止损时 vol_scale”联动
# =========================================================
post_sl_sl_max_sec: int = 90 # 只照顾“扫损后很快反弹”的窗口
post_sl_mult_min: float = 1.02
post_sl_mult_max: float = 1.16
post_sl_vol_alpha: float = 0.20 # mult = 1 + alpha*(vol_scale_at_sl - 1)
class BitmartFuturesMeanReversionBot:
def __init__(self, cfg: StrategyConfig):
self.cfg = cfg
# ✅ 只从环境变量读(请务必更换曾经硬编码泄露过的 key
self.api_key = os.getenv("BITMART_API_KEY", "").strip()
self.secret_key = os.getenv("BITMART_SECRET_KEY", "").strip()
self.memo = os.getenv("BITMART_MEMO", "合约交易").strip()
if not self.api_key or not self.secret_key:
raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)")
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 持仓状态: -1 空, 0 无, 1 多
self.pos = 0
self.entry_price = None
self.entry_ts = None
self.last_exit_ts = 0
# 日内权益基准
self.day_start_equity = None
self.trading_enabled = True
self.day_tag = datetime.date.today()
# 缓存
self._klines_cache = None
self._klines_cache_ts = 0
self._last_status_notify_ts = 0
# ✅ 止损后“同向入场加门槛”状态
self.last_sl_dir = 0 # 1=多止损,-1=空止损0=无
self.last_sl_ts = 0.0
# ✅ 止损后“同方向 SL 联动放宽”状态
self.post_sl_dir = 0
self.post_sl_ts = 0.0
self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale
self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
# ----------------- 通用工具 -----------------
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for _ in range(3):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self) -> bool:
try:
resp = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.contract_symbol,
leverage=self.cfg.leverage,
open_type=self.cfg.open_type
)[0]
if resp.get("code") == 1000:
logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
return True
logger.error(f"设置杠杆失败: {resp}")
self.ding(f"设置杠杆失败: {resp}", error=True)
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
self.ding(f"设置杠杆异常: {e}", error=True)
return False
# ----------------- 行情/指标 -----------------
def get_klines_cached(self):
now = time.time()
if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
return self._klines_cache
kl = self.get_klines()
if kl:
self._klines_cache = kl
self._klines_cache_ts = now
return self._klines_cache
def get_klines(self):
try:
end_time = int(time.time())
start_time = end_time - 60 * self.cfg.lookback_min
resp = self.contractAPI.get_kline(
contract_symbol=self.cfg.contract_symbol,
step=self.cfg.step_min,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp.get("data", [])
formatted = []
for k in data:
formatted.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
formatted.sort(key=lambda x: x["id"])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(f"获取K线异常: {e}", error=True)
return None
def get_last_price(self, fallback_close: float) -> float:
"""
优先取更实时的最新价若SDK不支持/字段不同回退到K线close。
"""
try:
if hasattr(self.contractAPI, "get_contract_details"):
r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "mark_price", "index_price"):
if key in d and d[key] is not None:
return float(d[key])
if hasattr(self.contractAPI, "get_ticker"):
r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "price", "last", "close"):
if key in d and d[key] is not None:
return float(d[key])
except Exception:
pass
return float(fallback_close)
@staticmethod
def ema(values, n: int) -> float:
k = 2 / (n + 1)
e = values[0]
for v in values[1:]:
e = v * k + e * (1 - k)
return e
@staticmethod
def atr(klines, n: int) -> float:
if len(klines) < n + 1:
return 0.0
trs = []
for i in range(-n, 0):
cur = klines[i]
prev = klines[i - 1]
tr = max(
cur["high"] - cur["low"],
abs(cur["high"] - prev["close"]),
abs(cur["low"] - prev["close"]),
)
trs.append(tr)
return sum(trs) / len(trs)
def is_danger_market(self, klines, price: float) -> bool:
last = klines[-1]
body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
if body >= self.cfg.big_body_kill:
return True
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
if atr_ratio >= self.cfg.atr_ratio_kill:
return True
return False
def atr_ratio_baseline(self, klines) -> float:
"""
自动阈值基准:最近 window 根的 atr_ratio 分布的 quantile 作为“典型波动”
"""
window = self.cfg.vol_baseline_window
if len(klines) < (window + self.cfg.atr_len + 5):
return 0.0
ratios = []
for i in range(-window, 0):
sub = klines[:i] if i != 0 else klines
a = self.atr(sub, self.cfg.atr_len)
p = sub[-1]["close"]
if p > 0 and a > 0:
ratios.append(a / p)
if not ratios:
return 0.0
ratios.sort()
q = max(0.0, min(1.0, self.cfg.vol_baseline_quantile))
idx = int(q * (len(ratios) - 1))
return ratios[idx]
def dynamic_thresholds(self, atr_ratio: float, base_ratio: float):
"""
动态阈值atr_ratio * vol_scale并带 floor
"""
if base_ratio <= 0:
vol_scale = 1.0
else:
raw = atr_ratio / base_ratio
vol_scale = max(self.cfg.vol_scale_min, min(self.cfg.vol_scale_max, raw))
entry_dev = max(self.cfg.entry_dev_floor, self.cfg.entry_k * vol_scale * atr_ratio)
tp = max(self.cfg.tp_floor, self.cfg.tp_k * vol_scale * atr_ratio)
sl = max(self.cfg.sl_floor, self.cfg.sl_k * vol_scale * atr_ratio)
return entry_dev, tp, sl, vol_scale
# ----------------- 账户/仓位 -----------------
def get_assets_available(self) -> float:
try:
resp = self.contractAPI.get_assets_detail()[0]
if resp.get("code") != 1000:
return 0.0
data = resp.get("data")
if isinstance(data, dict):
return float(data.get("available_balance", 0))
if isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return 0.0
except Exception as e:
logger.error(f"余额查询异常: {e}")
return 0.0
def get_position_status(self) -> bool:
try:
resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
if resp.get("code") != 1000:
return False
positions = resp.get("data", [])
if not positions:
self.pos = 0
return True
p = positions[0]
self.pos = 1 if p["position_type"] == 1 else -1
return True
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(f"持仓查询异常: {e}", error=True)
return False
def get_equity_proxy(self) -> float:
return self.get_assets_available()
def refresh_daily_baseline(self):
today = datetime.date.today()
if today != self.day_tag:
self.day_tag = today
self.day_start_equity = None
self.trading_enabled = True
self.ding(f"新的一天({today}):重置日内风控基准")
def risk_kill_switch(self):
self.refresh_daily_baseline()
equity = self.get_equity_proxy()
if equity <= 0:
return
if self.day_start_equity is None:
self.day_start_equity = equity
logger.info(f"日内权益基准设定:{equity:.2f} USDT")
return
pnl = (equity - self.day_start_equity) / self.day_start_equity
if pnl <= -self.cfg.daily_loss_limit:
self.trading_enabled = False
self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
if pnl >= self.cfg.daily_profit_cap:
self.trading_enabled = False
self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
# ----------------- 下单 -----------------
def calculate_size(self, price: float) -> int:
"""
保守仓位估算:按 1张≈0.001ETH(沿用你原假设)
"""
bal = self.get_assets_available()
if bal < 10:
return 0
margin = bal * self.cfg.risk_percent
lev = int(self.cfg.leverage)
size = int((margin * lev) / (price * 0.001))
size = max(self.cfg.min_size, size)
size = min(self.cfg.max_size, size)
return size
def place_market_order(self, side: int, size: int) -> bool:
"""
side:
1 开多
2 平空
3 平多
4 开空
"""
if size <= 0:
return False
client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
resp = self.contractAPI.post_submit_order(
contract_symbol=self.cfg.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type="market",
leverage=self.cfg.leverage,
open_type=self.cfg.open_type,
size=size
)[0]
logger.info(f"order_resp: {resp}")
if resp.get("code") == 1000:
return True
self.ding(f"下单失败: {resp}", error=True)
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
self.ding(f"API下单异常: {e}", error=True)
return False
except Exception as e:
logger.error(f"下单未知异常: {e}")
self.ding(f"下单未知异常: {e}", error=True)
return False
def close_position_all(self):
if self.pos == 1:
ok = self.place_market_order(3, 999999)
if ok:
self.pos = 0
elif self.pos == -1:
ok = self.place_market_order(2, 999999)
if ok:
self.pos = 0
# ----------------- 止损后机制(核心优化) -----------------
def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
"""
止损后同向入场加门槛:
- 只要 dev 还没有回到中性区,就对“上次止损方向”的同向入场门槛提高
- dev 回到 abs(dev) <= reset_band 后自动解除
- 超过 max_sec 自动解除(避免一直卡住)
"""
if self.last_sl_dir == 0:
return False
if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
self.last_sl_dir = 0
return False
reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
if abs(dev) <= reset_band:
self.last_sl_dir = 0
return False
return True
def _post_sl_dynamic_mult(self) -> float:
"""
止损后同方向 SL 放宽倍数与“止损时 vol_scale”联动
mult = 1 + alpha*(vol_scale_at_sl - 1)
并做上下限裁剪 + 有效期控制
"""
if self.post_sl_dir == 0:
return 1.0
if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
self.post_sl_dir = 0
self.post_sl_vol_scale = 1.0
return 1.0
raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
raw = max(1.0, raw) # 不缩小止损,只放宽
return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
# ----------------- 交易逻辑 -----------------
def in_cooldown(self) -> bool:
return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit
def maybe_enter(self, price: float, ema_value: float, entry_dev: float):
if self.pos != 0:
return
if self.in_cooldown():
return
dev = (price - ema_value) / ema_value if ema_value else 0.0
size = self.calculate_size(price)
if size <= 0:
return
penalty_active = self._reentry_penalty_active(dev, entry_dev)
# 基础阈值
long_th = -entry_dev
short_th = entry_dev
# 若罚则生效:对“上次止损方向”的同向阈值提高
if penalty_active:
if self.last_sl_dir == 1:
long_th = -entry_dev * self.cfg.reentry_penalty_mult
elif self.last_sl_dir == -1:
short_th = entry_dev * self.cfg.reentry_penalty_mult
logger.info(
f"enter_check: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% "
f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) "
f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}"
)
if dev <= long_th:
if self.place_market_order(1, size): # 开多
self.pos = 1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开多dev={dev * 100:.3f}% size={size} entry={price:.2f}")
elif dev >= short_th:
if self.place_market_order(4, size): # 开空
self.pos = -1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开空dev={dev * 100:.3f}% size={size} entry={price:.2f}")
def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float):
if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
return
hold = time.time() - self.entry_ts
if self.pos == 1:
pnl = (price - self.entry_price) / self.entry_price
else:
pnl = (self.entry_price - price) / self.entry_price
# ✅ 同方向止损后:在有效期内放宽 SL与止损时 vol_scale 联动)
sl_mult = 1.0
if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
sl_mult = self._post_sl_dynamic_mult()
effective_sl = sl * sl_mult
if pnl >= tp:
self.close_position_all()
self.ding(f"🎯止盈pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif pnl <= -effective_sl:
# 记录止损方向
sl_dir = self.pos # 1=多止损,-1=空止损
self.close_position_all()
self.ding(
f"🛑止损pnl={pnl * 100:.3f}% price={price:.2f} "
f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})",
error=True
)
# ✅ 开启:同向入场加门槛
self.last_sl_dir = sl_dir
self.last_sl_ts = time.time()
# ✅ 开启:同向 SL 联动放宽(记录止损时 vol_scale
self.post_sl_dir = sl_dir
self.post_sl_ts = time.time()
self.post_sl_vol_scale = float(vol_scale)
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif hold >= self.cfg.max_hold_sec:
self.close_position_all()
self.ding(f"⏱超时hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
atr_ratio: float, base_ratio: float, vol_scale: float,
entry_dev: float, tp: float, sl: float):
now = time.time()
if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
return
self._last_status_notify_ts = now
direction_str = "" if self.pos == 1 else ("" if self.pos == -1 else "")
penalty_active = self._reentry_penalty_active(dev, entry_dev)
sl_mult = 1.0
if self.pos != 0 and self.post_sl_dir == self.pos:
sl_mult = self._post_sl_dynamic_mult()
msg = (
f"【BitMart {self.cfg.contract_symbol}1m均值回归(自动阈值+止损智能)】\n"
f"方向:{direction_str}\n"
f"现价:{price:.2f}\n"
f"EMA{self.cfg.ema_len}{ema_value:.2f}\n"
f"dev{dev * 100:.3f}%entry_dev={entry_dev * 100:.3f}%\n"
f"ATR比{atr_ratio * 100:.3f}% 基准:{base_ratio * 100:.3f}% vol_scale={vol_scale:.2f}\n"
f"tp/sl{tp * 100:.3f}% / {sl * 100:.3f}%postSL×{sl_mult:.2f}, sl@scale={self.post_sl_vol_scale:.2f}\n"
f"止损同向加门槛:{'ON' if penalty_active else 'OFF'}last_sl_dir={self.last_sl_dir}\n"
f"可用余额:{bal:.2f} USDT 杠杆:{self.cfg.leverage}x\n"
f"超时:{self.cfg.max_hold_sec}s 冷却:{self.cfg.cooldown_sec_after_exit}s"
)
self.ding(msg)
def action(self):
if not self.set_leverage():
self.ding("杠杆设置失败,停止运行", error=True)
return
while True:
now_dt = datetime.datetime.now()
self.pbar.n = now_dt.second
self.pbar.refresh()
klines = self.get_klines_cached()
if not klines or len(klines) < (self.cfg.ema_len + 5):
time.sleep(1)
continue
last_k = klines[-1]
closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
ema_value = self.ema(closes, self.cfg.ema_len)
price = self.get_last_price(fallback_close=float(last_k["close"]))
dev = (price - ema_value) / ema_value if ema_value else 0.0
# 自动阈值
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
base_ratio = self.atr_ratio_baseline(klines)
entry_dev, tp, sl, vol_scale = self.dynamic_thresholds(atr_ratio, base_ratio)
# 日内风控
self.risk_kill_switch()
# 刷新仓位
if not self.get_position_status():
time.sleep(1)
continue
# 停机:平仓+不再开仓
if not self.trading_enabled:
if self.pos != 0:
self.close_position_all()
time.sleep(5)
continue
# 危险市场:不新开仓(允许已有仓按 tp/sl/超时 退出)
if self.is_danger_market(klines, price):
logger.warning("危险模式:高波动/大实体K暂停开仓")
self.maybe_exit(price, tp, sl, vol_scale)
time.sleep(self.cfg.tick_refresh_sec)
continue
# 先出场再入场
self.maybe_exit(price, tp, sl, vol_scale)
self.maybe_enter(price, ema_value, entry_dev)
# 状态通知(限频)
bal = self.get_assets_available()
self.notify_status_throttled(
price, ema_value, dev, bal,
atr_ratio, base_ratio, vol_scale,
entry_dev, tp, sl
)
time.sleep(self.cfg.tick_refresh_sec)
if __name__ == "__main__":
"""
Windows PowerShell:
setx BITMART_API_KEY "你的key"
setx BITMART_SECRET_KEY "你的secret"
setx BITMART_MEMO "合约交易"
重新打开终端再运行。
Linux/macOS:
export BITMART_API_KEY="你的key"
export BITMART_SECRET_KEY="你的secret"
export BITMART_MEMO="合约交易"
"""
cfg = StrategyConfig()
bot = BitmartFuturesMeanReversionBot(cfg)
bot.action()
# 9208.96

996
test2.py

File diff suppressed because it is too large Load Diff

390
websea/api交易.py Normal file
View File

@@ -0,0 +1,390 @@
import os
import time
import random
import string
import hashlib
import requests
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
# -------------------------
# Config
# -------------------------
@dataclass
class BotConfig:
symbol: str = "ETH-USDT"
quote_offset: float = 0.0006
refresh_threshold: float = 0.0005
amount: float = 1.0
lever_rate: int = 3
is_full: int = 1
depth_limit: int = 20
loop_interval: float = 2.0
dry_run: bool = True
max_net_pos_contracts: float = 5.0
min_spread_frac: float = 0.00015
max_spread_frac: float = 0.0030
close_threshold_frac: float = 0.8 # New: Fraction of max_net to trigger closing
close_offset: float = 0.001 # New: Offset for closing prices
# -------------------------
# Helper Functions
# -------------------------
def best_bid_ask(depth: Dict[str, Any]) -> Tuple[float, float]:
"""Extract best bid and ask from depth data."""
result = depth.get("result", {})
bids = result.get("bids", []) # Assuming list of dicts: [{'price': str, 'number': str}, ...], bids descending
asks = result.get("asks", []) # asks ascending
if not bids or not asks:
raise ValueError("Invalid depth data")
best_bid = float(bids[0]["price"])
best_ask = float(asks[0]["price"])
return best_bid, best_ask
def compute_net_position(pos_data: Dict[str, Any], symbol: str) -> float:
"""Compute net position contracts (long - short) for the symbol."""
result = pos_data.get("result", [])
net = 0.0
for p in result:
if p.get("symbol") == symbol:
side = p.get("side", "").lower() # Assuming 'buy' or 'long' for long, 'sell' or 'short' for short
amount = float(p.get("hold_vol", 0) or p.get("position", 0)) # Common field names
if "buy" in side or "long" in side:
net += amount
elif "sell" in side or "short" in side:
net -= amount
return net
def classify_current_orders(cur_data: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
"""Classify current orders into buy_orders and sell_orders."""
orders = cur_data.get("result", {}).get("list", [])
buy_orders = [o for o in orders if o.get("type", "").lower().startswith("buy")]
sell_orders = [o for o in orders if o.get("type", "").lower().startswith("sell")]
return buy_orders, sell_orders
# -------------------------
# Websea REST Client (optimized)
# -------------------------
class WebseaAPIError(RuntimeError):
def __init__(self, errno: int, errmsg: str, payload: Dict[str, Any]):
super().__init__(f"Websea API error: errno={errno} errmsg={errmsg}")
self.errno = errno
self.errmsg = errmsg
self.payload = payload
class WebseaClient:
def __init__(self, token: str, secret: str, base_url: str = "https://coapi.websea.com", timeout: int = 15):
self.token = token.strip()
self.secret = secret.strip()
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
@staticmethod
def _nonce() -> str:
ts = str(int(time.time()))
rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
return f"{ts}_{rand}"
@staticmethod
def _stringify_params(d: Optional[Dict[str, Any]]) -> Dict[str, str]:
"""Convert non-None values to str for consistent signing/parsing."""
if not d:
return {}
out: Dict[str, str] = {}
for k, v in d.items():
if v is None:
continue
if isinstance(v, bool):
out[k] = "1" if v else "0"
else:
out[k] = str(v)
return out
def _signature(self, nonce: str, params: Dict[str, str]) -> str:
tmp: List[str] = [self.token, self.secret, nonce]
for k, v in params.items():
tmp.append(f"{k}={v}")
tmp.sort()
payload = "".join(tmp)
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
def _request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
auth: bool = False,
) -> Dict[str, Any]:
method = method.upper()
url = f"{self.base_url}{path}"
q = self._stringify_params(params)
b = self._stringify_params(data)
sig_params: Dict[str, str] = {**q, **b}
headers: Dict[str, str] = {
"Content-Type": "application/x-www-form-urlencoded" if method in ("POST", "PUT", "DELETE") else "application/json"
}
if auth:
nonce = self._nonce()
signature = self._signature(nonce, sig_params)
headers.update({
"Token": self.token,
"Nonce": nonce,
"Signature": signature,
})
resp = self.session.request(
method=method,
url=url,
params=q if q and method == "GET" else None,
data=b if (method in ("POST", "PUT", "DELETE") and b) else None,
json=None if b else data, # Fallback if needed
timeout=self.timeout,
headers=headers,
)
try:
resp.raise_for_status()
except requests.HTTPError as e:
print(f"HTTP Error: {e} - Response: {resp.text}")
raise
try:
payload = resp.json()
except ValueError:
print(f"Invalid JSON: {resp.text}")
raise
if isinstance(payload, dict):
errno = payload.get("errno")
if errno not in (None, 0):
raise WebseaAPIError(int(errno), str(payload.get("errmsg")), payload)
return payload
def futures_depth(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
return self._request("GET", "/openApi/contract/depth", params={"symbol": symbol, "limit": limit}, auth=False)
def futures_24hr(self, symbol: str) -> Dict[str, Any]:
return self._request("GET", "/openApi/market/24hr", params={"symbol": symbol}, auth=False)
def positions(self, symbol: str, is_full: int = 1) -> Dict[str, Any]:
return self._request("GET", "/openApi/contract/position", params={"symbol": symbol, "is_full": is_full}, auth=True)
def current_orders(self, symbol: str, is_full: int = 1, limit: int = 100, direct: str = "prev") -> Dict[str, Any]:
return self._request("GET", "/openApi/contract/currentList",
params={"symbol": symbol, "is_full": is_full, "limit": limit, "direct": direct}, auth=True)
def add_order(
self,
*,
contract_type: str, # open/close
order_type: str, # buy-limit / sell-limit
symbol: str,
amount: float,
price: Optional[float] = None,
lever_rate: Optional[int] = None,
is_full: int = 1,
) -> Dict[str, Any]:
return self._request(
"POST",
"/openApi/contract/add",
data={
"contract_type": contract_type,
"type": order_type,
"symbol": symbol,
"amount": amount,
"price": price,
"lever_rate": lever_rate,
"is_full": is_full,
},
auth=True,
)
def cancel_orders(self, *, symbol: str, order_ids: List[str]) -> Dict[str, Any]:
return self._request(
"POST",
"/openApi/contract/cancel",
data={"symbol": symbol, "order_ids": ",".join(order_ids)},
auth=True,
)
def auth_ping(self, symbol: str, is_full: int) -> None:
"""Minimal auth test: fetch positions to confirm user recognition."""
_ = self.positions(symbol=symbol, is_full=is_full)
# -------------------------
# Bot loop
# -------------------------
def run_bot(client: WebseaClient, cfg: BotConfig) -> None:
last_mid: Optional[float] = None
print(f"[BOT] symbol={cfg.symbol} dry_run={cfg.dry_run} max_net={cfg.max_net_pos_contracts}")
while True:
try:
depth = client.futures_depth(cfg.symbol, limit=cfg.depth_limit)
bid, ask = best_bid_ask(depth)
mid = (bid + ask) / 2.0
spread_frac = (ask - bid) / mid
if spread_frac < cfg.min_spread_frac or spread_frac > cfg.max_spread_frac:
print(f"[SKIP] spread={spread_frac:.6f} bid={bid:.2f} ask={ask:.2f}")
time.sleep(cfg.loop_interval)
continue
pos = client.positions(symbol=cfg.symbol, is_full=cfg.is_full)
net = compute_net_position(pos, cfg.symbol)
cur = client.current_orders(symbol=cfg.symbol, is_full=cfg.is_full, limit=100)
buy_orders, sell_orders = classify_current_orders(cur)
buy_px = bid * (1.0 - cfg.quote_offset)
sell_px = ask * (1.0 + cfg.quote_offset)
need_refresh = last_mid is None or abs(mid - last_mid) / last_mid >= cfg.refresh_threshold
def far(orders: List[Dict[str, Any]], target: float) -> bool:
if not orders:
return True
prices = [float(o.get("price") or 0) for o in orders if o.get("price")]
if not prices:
return True
best = min(prices, key=lambda x: abs(x - target))
return abs(best - target) / target > (cfg.refresh_threshold * 0.8)
if far(buy_orders, buy_px) or far(sell_orders, sell_px):
need_refresh = True
allow_buy = net < cfg.max_net_pos_contracts
allow_sell = net > -cfg.max_net_pos_contracts
if need_refresh:
ids: List[str] = [str(o.get("order_id")) for o in (buy_orders + sell_orders) if o.get("order_id")]
if ids:
if cfg.dry_run:
print(f"[DRY] cancel {len(ids)} orders")
else:
client.cancel_orders(symbol=cfg.symbol, order_ids=ids)
print(f"[OK] canceled {len(ids)} orders")
rounded_buy_px = round(buy_px, 2)
rounded_sell_px = round(sell_px, 2)
if cfg.dry_run:
print(f"[DRY] mid={mid:.2f} net={net:.2f} buy@{rounded_buy_px:.2f} sell@{rounded_sell_px:.2f}")
else:
if allow_buy:
r1 = client.add_order(
contract_type="open",
order_type="buy-limit",
symbol=cfg.symbol,
amount=cfg.amount,
price=rounded_buy_px,
lever_rate=cfg.lever_rate,
is_full=cfg.is_full,
)
print("[OK] buy:", r1)
if allow_sell:
r2 = client.add_order(
contract_type="open",
order_type="sell-limit",
symbol=cfg.symbol,
amount=cfg.amount,
price=rounded_sell_px,
lever_rate=cfg.lever_rate,
is_full=cfg.is_full,
)
print("[OK] sell:", r2)
last_mid = mid
else:
print(f"[HOLD] mid={mid:.2f} spread={spread_frac:.6f} net={net:.2f} "
f"orders(buy={len(buy_orders)} sell={len(sell_orders)})")
# Enhanced closing logic if net is extreme
if abs(net) > cfg.max_net_pos_contracts * cfg.close_threshold_frac and not cfg.dry_run:
if net > 0:
close_type = "close"
close_order_type = "sell-limit"
close_px = mid * (1.0 - cfg.close_offset)
close_amt = net # Close all long
else:
close_type = "close"
close_order_type = "buy-limit"
close_px = mid * (1.0 + cfg.close_offset)
close_amt = abs(net) # Close all short
print(f"[WARN] Net position high ({net:.2f}), attempting to {close_type} with {close_order_type} amt={close_amt:.2f} @ {round(close_px, 2):.2f}")
r_close = client.add_order(
contract_type=close_type,
order_type=close_order_type,
symbol=cfg.symbol,
amount=close_amt,
price=round(close_px, 2),
lever_rate=cfg.lever_rate,
is_full=cfg.is_full,
)
print("[OK] close:", r_close)
except WebseaAPIError as e:
if e.errno == 20522:
print("[AUTH] User not recognized: Check Token/Secret or API permissions (enable Futures Trading).")
else:
print(f"[API] {e} payload={e.payload}")
except requests.HTTPError as e:
print(f"[HTTP] {e}")
except Exception as e:
print(f"[ERR] {e}")
time.sleep(cfg.loop_interval)
def env_bool(name: str, default: bool) -> bool:
v = os.environ.get(name)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "y", "on")
def main() -> None:
token = os.environ.get("WEBSEA_TOKEN", "")
secret = os.environ.get("WEBSEA_SECRET", "")
if not token or not secret:
raise SystemExit("Please set environment variables WEBSEA_TOKEN and WEBSEA_SECRET")
cfg = BotConfig(dry_run=env_bool("DRY_RUN", True))
client = WebseaClient(token=token, secret=secret)
# Public check
try:
t = client.futures_24hr(cfg.symbol)
print("[API] 24hr ok:", t)
except Exception as e:
print("[API CHECK FAILED]", e)
# Auth check
try:
client.auth_ping(cfg.symbol, cfg.is_full)
print("[AUTH] ok (user recognized)")
except WebseaAPIError as e:
print(f"[AUTH FAILED] errno={e.errno} errmsg={e.errmsg} payload={e.payload}")
print("Tips: Ensure API Key has Futures permissions enabled in Websea dashboard.")
raise
run_bot(client, cfg)
if __name__ == "__main__":
main()

View File

@@ -66,11 +66,12 @@ class Hub_Web:
bit_id = createBrowser(
name=self.x_info.user_name,
proxyType="http",
groupId=fz_datas['推特'],
host=self.ips_info.host,
port=int(self.ips_info.port),
proxyUserName=self.ips_info.username,
proxyPassword=self.ips_info.password,
host="104.168.59.92",
port=int(random.randint(20001, 25000)),
# proxyUserName=self.ips_info.username,
# proxyPassword=self.ips_info.password,
)
self.xstart_info.bit_id = bit_id