diff --git a/1111 b/1111
index d1ec074..337f7ae 100644
--- a/1111
+++ b/1111
@@ -23,31 +23,7 @@ Abc12345678
yx20250715@gmail.com
-3. 更多交易信号
-包住形态(原策略)
-锤子线和上吊线
-早晨之星和黄昏之星
-乌云盖顶和刺透形态
-三只乌鸦和三白兵
-移动平均线金叉死叉
-RSI超买超卖
-布林带突破
-
-4. 风险管理
-止损止盈:可配置的百分比止损止盈
-时间止损:持仓超过24小时自动平仓
-凯利公式改进版:仓位管理
-最大回撤控制
-手续费考虑:0.05%交易手续费
-申请API原因:我计划通过API实现自动化量化交易策略,以减少人工干预,提高交易效率并减少人为错误。通过API,我能够实时获取市场数据、执行交易指令、监控账户状态,并根据市场变化自动调整策略。
-申请API具体方式用途:我将使用API来获取实时的市场行情数据,执行各种量化策略(如网格交易、动量策略、均值回归等),并进行账户管理。API将自动监控并调整我的交易参数,比如仓位、止损、止盈等,同时还需要定期回测策略和优化模型。
-申请API主要交易什么币种:我主要通过API进行ETH和BTC的合约交易,还有一起其他的主流币
-申请API操作频率:预计API的调用频率将根据市场波动而变化,通常情况下,我的API请求频率大约在每秒5-10次,有时会根据策略的调整需要进行更频繁的请求。
-申请API大概体量:预计每日API请求量为5,000 - 15,000次,具体取决于市场波动和交易策略的复杂度。月度交易量预估为50,000 - 200,000 ETH/BTC。
-申请API对接后预估增长目标:通过API对接,我预计能够实现量化策略的高效执行和自动化交易,减少人工干预和错误。目标是在对接后的3个月内,将交易频率提高50%,并将月盈利提高20%至30%。
-
-
-45.84
-4998.6
\ No newline at end of file
+555.73
+2278
\ No newline at end of file
diff --git a/bitmart/交易,一直加仓,拉平开仓价位.py b/bitmart/交易,一直加仓,拉平开仓价位.py
deleted file mode 100644
index 7507fab..0000000
--- a/bitmart/交易,一直加仓,拉平开仓价位.py
+++ /dev/null
@@ -1,986 +0,0 @@
-"""
-BitMart 被动做市/高频刷单策略
-核心逻辑:在盘口两侧不断挂单,赚取价差+返佣
-使用浏览器自动化下单,获取高返佣
-"""
-
-import time
-from loguru import logger
-from threading import Lock
-from dataclasses import dataclass
-from bitmart.api_contract import APIContract
-from typing import Optional, Dict, List, Tuple
-from DrissionPage import ChromiumPage, ChromiumOptions
-
-from bit_tools import openBrowser
-
-
-# ================================================================
-# 📊 配置类
-# ================================================================
-
-@dataclass
-class MarketMakingConfig:
- bit_id: str = "f2320f57e24c45529a009e1541e25961"
- """做市策略配置"""
- # API配置(仅用于查询,不下单)
- api_key: str = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
- secret_key: str = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
- memo: str = "合约交易"
- contract_symbol: str = "ETHUSDT"
-
- # 浏览器配置
- tge_id: int = 196495 # TGE浏览器ID
- tge_url: str = "http://127.0.0.1:50326"
- tge_headers: Dict = None
- trading_url: str = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
-
- # 做市参数
- spread_percent: float = 0.01 # 价差百分比(0.01% = 买一卖一之间)
- order_size_usdt: float = 10.0 # 每单金额(USDT)
- max_position_usdt: float = 100.0 # 最大持仓金额(USDT)
-
- # 订单管理
- order_refresh_interval: float = 2.0 # 订单刷新间隔(秒)
- order_timeout: float = 60.0 # 订单超时时间(秒),超时后撤单重新挂
-
- # 风险控制
- max_daily_loss: float = 50.0 # 每日最大亏损(USDT)
- max_daily_trades: int = 1000 # 每日最大交易次数
-
- # 杠杆和模式
- leverage: str = "30" # 杠杆倍数
- open_type: str = "cross" # 全仓模式
-
- def __post_init__(self):
- """初始化TGE headers"""
- if self.tge_headers is None:
- self.tge_headers = {
- "Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
- "Content-Type": "application/json"
- }
-
-
-# ================================================================
-# 📊 订单簿数据结构
-# ================================================================
-
-@dataclass
-class OrderBook:
- """订单簿数据"""
- bids: List[Tuple[float, float]] # [(价格, 数量), ...] 买盘,价格从高到低
- asks: List[Tuple[float, float]] # [(价格, 数量), ...] 卖盘,价格从低到高
- timestamp: float
-
- @property
- def best_bid(self) -> Optional[float]:
- """买一价"""
- return self.bids[0][0] if self.bids else None
-
- @property
- def best_ask(self) -> Optional[float]:
- """卖一价"""
- return self.asks[0][0] if self.asks else None
-
- @property
- def spread(self) -> Optional[float]:
- """价差"""
- if self.best_bid and self.best_ask:
- return self.best_ask - self.best_bid
- return None
-
- @property
- def mid_price(self) -> Optional[float]:
- """中间价"""
- if self.best_bid and self.best_ask:
- return (self.best_bid + self.best_ask) / 2
- return None
-
-
-@dataclass
-class PendingOrder:
- """pending订单信息"""
- order_id: str
- side: str # "buy" or "sell"
- price: float
- size: float
- create_time: float
- status: str # "pending", "filled", "cancelled"
-
-
-# ================================================================
-# 📊 浏览器管理器
-# ================================================================
-
-class BrowserManager:
- """浏览器管理器:负责浏览器的启动、接管和标签页管理"""
-
- def __init__(self, config: MarketMakingConfig, bit_id="f2320f57e24c45529a009e1541e25961"):
- self.bit_id = "f2320f57e24c45529a009e1541e25961"
- self.config = config
- self.tge_port: Optional[int] = None
- self.page: Optional[ChromiumPage] = None
-
- def open_browser(self) -> bool:
- """打开浏览器并获取端口"""
- try:
- bit_port = openBrowser(id=self.bit_id)
- co = ChromiumOptions()
- co.set_local_port(port=bit_port)
- self.page = ChromiumPage(addr_or_opts=co)
- return True
- except Exception as e:
- logger.error(f"打开浏览器失败: {e}")
- return False
-
- def take_over_browser(self) -> bool:
- """接管浏览器"""
- if not self.tge_port:
- logger.error("浏览器端口未设置")
- return False
-
- try:
- co = ChromiumOptions()
- co.set_local_port(self.tge_port)
- self.page = ChromiumPage(addr_or_opts=co)
- self.page.set.window.max()
- logger.success("成功接管浏览器")
- return True
- except Exception as e:
- logger.error(f"接管浏览器失败: {e}")
- return False
-
- def close_extra_tabs(self) -> bool:
- """关闭多余的标签页,只保留第一个"""
- if not self.page:
- return False
-
- try:
- tabs = self.page.get_tabs()
- closed_count = 0
- for idx, tab in enumerate(tabs):
- if idx == 0:
- continue
- tab.close()
- closed_count += 1
- if closed_count > 0:
- logger.info(f"已关闭{closed_count}个多余标签页")
- return True
- except Exception as e:
- logger.warning(f"关闭多余标签页失败: {e}")
- return False
-
-
-# ================================================================
-# 📊 浏览器交易执行器
-# ================================================================
-
-class BrowserTradingExecutor:
- """浏览器交易执行器:通过浏览器自动化下单(获取高返佣)"""
-
- def __init__(self, page: ChromiumPage):
- self.page = page
-
- def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
- """安全点击"""
- try:
- ele = self.page.ele(xpath)
- if not ele:
- return False
- ele.scroll.to_see(center=True)
- time.sleep(sleep)
- ele.click()
- return True
- except Exception as e:
- logger.error(f"点击失败 {xpath}: {e}")
- return False
-
- def 开单(self, marketPriceLongOrder: int = 0, limitPriceShortOrder: int = 0,
- size: Optional[float] = None, price: Optional[float] = None) -> bool:
-
- size = 0.1
- """
- 开单操作(通过浏览器自动化,获取高返佣)
-
- Args:
- marketPriceLongOrder: 市价最多或者做空,1是最多,-1是做空
- limitPriceShortOrder: 限价最多或者做空,1是最多,-1是做空
- size: 数量
- price: 价格(限价单需要)
-
- Returns:
- 是否成功
- """
- try:
- # 市价单
- if marketPriceLongOrder == -1:
- # 市价做空
- if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
- return False
- self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
- return False
- logger.success(f"市价做空成功: {size}")
- return True
-
- elif marketPriceLongOrder == 1:
- # 市价做多
- if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
- return False
- self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
- return False
- logger.success(f"市价做多成功: {size}")
- return True
-
- # 限价单
- if limitPriceShortOrder == -1:
- # 限价做空
- if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
- return False
- self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
- time.sleep(1)
- self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
- return False
- logger.success(f"限价做空成功: {size} @ {price}")
- return True
-
- elif limitPriceShortOrder == 1:
- # 限价做多
- if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
- return False
- self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
- time.sleep(1)
- self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
- return False
- logger.success(f"限价做多成功: {size} @ {price}")
- return True
-
- return False
- except Exception as e:
- logger.error(f"开单异常: {e}")
- return False
-
- def 平仓(self) -> bool:
- """市价平仓"""
- try:
- if self.click_safe('x://span[normalize-space(text()) ="市价"]'):
- logger.success("平仓成功")
- return True
- return False
- except Exception as e:
- logger.error(f"平仓异常: {e}")
- return False
-
- def place_limit_order(self, side: str, price: float, size: float) -> bool:
- """
- 下限价单(通过浏览器)
-
- Args:
- side: "buy" 或 "sell"
- price: 价格
- size: 数量(张数)
-
- Returns:
- 是否成功
- """
- try:
- # size已经是张数,直接使用
- if side == "buy":
- # 限价做多
- return self.开单(limitPriceShortOrder=1, size=size, price=price)
- else:
- # 限价做空
- return self.开单(limitPriceShortOrder=-1, size=size, price=price)
- except Exception as e:
- logger.error(f"限价下单异常: {e}")
- return False
-
-
-# ================================================================
-# 📊 BitMart API 封装(仅用于查询,不下单)
-# ================================================================
-
-class BitMartMarketMakerAPI:
- """BitMart做市API封装(仅用于查询,不下单)"""
-
- def __init__(self, config: MarketMakingConfig):
- self.config = config
- self.contractAPI = APIContract(
- config.api_key,
- config.secret_key,
- config.memo,
- timeout=(5, 15)
- )
-
- def get_order_book(self, depth: int = 20) -> Optional[OrderBook]:
- """
- 获取订单簿
-
- Args:
- depth: 深度数量(可能不使用)
-
- Returns:
- OrderBook对象或None
- """
- try:
- # BitMart合约API获取深度数据
- # 根据错误信息,get_depth()不接受size参数
- # 尝试不同的调用方式
- try:
- # 方法1:不传深度参数,使用默认值(最可能的方式)
- response = self.contractAPI.get_depth(
- contract_symbol=self.config.contract_symbol
- )[0]
- except TypeError as e1:
- try:
- # 方法2:尝试使用 limit 参数
- response = self.contractAPI.get_depth(
- contract_symbol=self.config.contract_symbol,
- limit=depth
- )[0]
- except TypeError as e2:
- try:
- # 方法3:尝试使用 depth 参数
- response = self.contractAPI.get_depth(
- contract_symbol=self.config.contract_symbol,
- depth=depth
- )[0]
- except TypeError as e3:
- logger.error(f"get_depth()方法调用失败,尝试的参数方式都失败: {e1}, {e2}, {e3}")
- return None
-
- if response.get('code') == 1000:
- data = response.get('data', {})
- # BitMart返回格式可能是不同的,需要根据实际调整
- bids = []
- asks = []
-
- if isinstance(data, dict):
- bids_raw = data.get('bids', [])
- asks_raw = data.get('asks', [])
-
- # 处理不同格式
- for b in bids_raw:
- if isinstance(b, (list, tuple)) and len(b) >= 2:
- bids.append((float(b[0]), float(b[1])))
- elif isinstance(b, dict):
- bids.append((float(b.get('price', 0)), float(b.get('size', 0))))
-
- for a in asks_raw:
- if isinstance(a, (list, tuple)) and len(a) >= 2:
- asks.append((float(a[0]), float(a[1])))
- elif isinstance(a, dict):
- asks.append((float(a.get('price', 0)), float(a.get('size', 0))))
-
- # 买盘按价格从高到低排序,卖盘按价格从低到高排序
- bids.sort(key=lambda x: x[0], reverse=True)
- asks.sort(key=lambda x: x[0])
-
- if bids and asks:
- return OrderBook(
- bids=bids,
- asks=asks,
- timestamp=time.time()
- )
- return None
- except Exception as e:
- logger.error(f"获取订单簿异常: {e}")
- # 如果获取订单簿失败,尝试使用最新价格作为备用方案
- logger.warning("尝试使用最新价格作为备用方案")
- current_price = self.get_current_price()
- if current_price:
- # 使用当前价格和价差百分比计算买一卖一
- spread_amount = current_price * self.config.spread_percent / 100
- bids = [(current_price - spread_amount / 2, 1.0)]
- asks = [(current_price + spread_amount / 2, 1.0)]
- return OrderBook(
- bids=bids,
- asks=asks,
- timestamp=time.time()
- )
- return None
-
- def get_current_price(self) -> Optional[float]:
- """获取当前最新价格"""
- try:
- end_time = int(time.time())
- response = self.contractAPI.get_kline(
- contract_symbol=self.config.contract_symbol,
- step=1, # 1分钟
- start_time=end_time - 60,
- end_time=end_time
- )[0]
-
- if response.get('code') == 1000:
- data = response.get('data', [])
- if data:
- return float(data[-1]["close_price"])
- return None
- except Exception as e:
- logger.error(f"获取价格异常: {e}")
- return None
-
- def get_available_balance(self) -> Optional[float]:
- """获取合约账户可用USDT余额"""
- try:
- response = self.contractAPI.get_assets_detail()[0]
- if response.get('code') == 1000:
- data = response['data']
- if isinstance(data, dict):
- return float(data.get('available_balance', 0))
- elif isinstance(data, list):
- for asset in data:
- if asset.get('currency') == 'USDT':
- return float(asset.get('available_balance', 0))
- return None
- except Exception as e:
- logger.error(f"余额查询异常: {e}")
- return None
-
- def get_position(self) -> Optional[Dict]:
- """获取当前持仓"""
- try:
- response = self.contractAPI.get_position(
- contract_symbol=self.config.contract_symbol
- )[0]
-
- if response.get('code') == 1000:
- positions = response.get('data', [])
- if positions:
- return positions[0]
- return None
- return None
- except Exception as e:
- logger.error(f"持仓查询异常: {e}")
- return None
-
- def set_leverage(self) -> bool:
- """设置杠杆和全仓模式"""
- try:
- response = self.contractAPI.post_submit_leverage(
- contract_symbol=self.config.contract_symbol,
- leverage=self.config.leverage,
- open_type=self.config.open_type
- )[0]
-
- if response.get('code') == 1000:
- logger.success(f"全仓模式 + {self.config.leverage}x 杠杆设置成功")
- return True
- else:
- logger.error(f"杠杆设置失败: {response}")
- return False
- except Exception as e:
- logger.error(f"设置杠杆异常: {e}")
- return False
-
- # ============== 新增:撤单、平仓 ==============
- def get_open_orders(self) -> List[Dict]:
- """获取当前所有挂单"""
- try:
- resp = self.contractAPI.get_open_order(
- contract_symbol=self.config.contract_symbol
- )[0]
- if resp.get("code") == 1000:
- data = resp.get("data", [])
- return data if isinstance(data, list) else []
- return []
- except Exception as e:
- logger.error(f"查询挂单异常: {e}")
- return []
-
- def cancel_order(self, order_id: str) -> bool:
- """撤销单个挂单"""
- try:
- resp = self.contractAPI.post_cancel_order(
- contract_symbol=self.config.contract_symbol,
- order_id=order_id
- )[0]
- if resp.get("code") == 1000:
- logger.success(f"撤单成功: {order_id}")
- return True
- logger.error(f"撤单失败: {resp}")
- return False
- except Exception as e:
- logger.error(f"撤单异常: {e}")
- return False
-
- def cancel_all_orders(self) -> None:
- """撤销所有挂单(无精确超时信息时,直接全撤)"""
- open_orders = self.get_open_orders()
- for od in open_orders:
- oid = str(od.get("order_id") or od.get("id") or "")
- if oid:
- self.cancel_order(oid)
-
- def close_position(self) -> bool:
- """
- 使用API平仓(市价/近似市价)
- 逻辑:查询当前持仓,根据方向下相反方向的平仓单
- """
- try:
- position = self.get_position()
- if not position:
- logger.info("无持仓,无需平仓")
- return True
-
- position_type = int(position.get("position_type", 0)) # 1=多, 2=空
- current_amount = float(position.get("current_amount", 0))
- if current_amount <= 0:
- logger.info("持仓数量为0,无需平仓")
- return True
-
- # 获取现价作为平仓价格参考
- current_price = self.get_current_price()
- if not current_price:
- logger.error("无法获取现价,平仓失败")
- return False
-
- # BitMart合约订单类型:3=平多(限价),4=平空(限价)
- if position_type == 1:
- order_type = 3 # 平多
- elif position_type == 2:
- order_type = 4 # 平空
- else:
- logger.error(f"未知持仓方向: {position_type}")
- return False
-
- # 下平仓单
- resp = self.contractAPI.post_submit_order(
- contract_symbol=self.config.contract_symbol,
- type=order_type,
- price=str(current_price),
- size=str(current_amount)
- )[0]
-
- if resp.get("code") == 1000:
- logger.success(f"API平仓成功,方向={position_type}, 数量={current_amount}, 价格={current_price}")
- return True
-
- logger.error(f"API平仓失败: {resp}")
- return False
- except Exception as e:
- logger.error(f"API平仓异常: {e}")
- return False
-
-
-# ================================================================
-# 📊 做市策略核心
-# ================================================================
-
-class MarketMakingStrategy:
- """被动做市策略(使用浏览器自动化下单,获取高返佣)"""
-
- def __init__(self, config: MarketMakingConfig, bit_id=None):
- self.bit_id = bit_id
- self.config = config
- self.api = BitMartMarketMakerAPI(config) # 仅用于查询
-
- # 浏览器管理
- self.browser_manager = BrowserManager(config)
- self.trading_executor: Optional[BrowserTradingExecutor] = None
-
- # 订单管理(使用时间戳作为订单ID,因为浏览器下单无法直接获取订单ID)
- self.pending_orders: Dict[str, PendingOrder] = {}
- self.order_lock = Lock()
-
- # 统计
- self.daily_trades = 0
- self.daily_profit = 0.0
- self.total_trades = 0
- self.total_profit = 0.0
-
- # 运行状态
- self.running = False
- self.last_order_refresh = 0.0
-
- # 初始化浏览器和杠杆
- if not self._initialize_browser():
- raise Exception("浏览器初始化失败")
- self.api.set_leverage()
-
- def _initialize_browser(self) -> bool:
- """初始化浏览器"""
- try:
- # 打开浏览器
- if not self.browser_manager.open_browser():
- logger.error("打开浏览器失败")
- return False
-
- # # 接管浏览器
- # if not self.browser_manager.take_over_browser():
- # logger.error("接管浏览器失败")
- # return False
-
- # 关闭多余标签页
- self.browser_manager.close_extra_tabs()
-
- # 打开交易页面
- self.browser_manager.page.get(self.config.trading_url)
- time.sleep(2) # 等待页面加载
-
- # 初始化交易执行器
- self.trading_executor = BrowserTradingExecutor(self.browser_manager.page)
-
- logger.success("浏览器初始化完成")
- return True
- except Exception as e:
- logger.error(f"浏览器初始化异常: {e}")
- return False
-
- def calculate_order_prices(self, order_book: OrderBook) -> Tuple[Optional[float], Optional[float]]:
- """
- 计算挂单价格
-
- Args:
- order_book: 订单簿
-
- Returns:
- (buy_price, sell_price)
- """
- if not order_book.mid_price or not order_book.best_bid or not order_book.best_ask:
- return None, None
-
- mid = order_book.mid_price
- spread_amount = mid * self.config.spread_percent / 100
-
- # 买单价格:中间价 - 价差的一半,但不能低于买一
- buy_price = mid - spread_amount / 2
- buy_price = min(buy_price, order_book.best_bid * 0.9999) # 略低于买一,确保能成交
-
- # 卖单价格:中间价 + 价差的一半,但不能高于卖一
- sell_price = mid + spread_amount / 2
- sell_price = max(sell_price, order_book.best_ask * 1.0001) # 略高于卖一,确保能成交
-
- # 确保价差合理
- if sell_price <= buy_price:
- # 如果价差太小,使用买一卖一价格
- buy_price = order_book.best_bid * 0.9999
- sell_price = order_book.best_ask * 1.0001
-
- return buy_price, sell_price
-
- def should_refresh_orders(self) -> bool:
- """判断是否需要刷新订单"""
- now = time.time()
- if now - self.last_order_refresh >= self.config.order_refresh_interval:
- return True
- return False
-
- def cancel_stale_orders(self):
- """撤销超时订单(使用API撤单)"""
- now = time.time()
- to_cancel = []
-
- with self.order_lock:
- for order_id, order in self.pending_orders.items():
- if order.status == "pending":
- if now - order.create_time > self.config.order_timeout:
- to_cancel.append(order_id)
-
- if not to_cancel:
- return
-
- logger.info(f"发现{len(to_cancel)}个超时订单,尝试API撤单")
- try:
- # 先通过API获取真实挂单列表并撤单
- self.api.cancel_all_orders()
- # 本地状态同步
- with self.order_lock:
- for order_id in to_cancel:
- if order_id in self.pending_orders:
- self.pending_orders[order_id].status = "cancelled"
- except Exception as e:
- logger.error(f"API撤单失败: {e}")
-
- def update_pending_orders(self):
- """更新挂单状态(通过持仓变化判断订单是否成交)"""
- try:
- # 获取当前持仓
- current_position = self.api.get_position()
- current_position_type = 0
- current_position_amount = 0.0
-
- if current_position:
- current_position_type = int(current_position.get('position_type', 0))
- current_position_amount = abs(float(current_position.get('current_amount', 0)))
-
- with self.order_lock:
- # 检查挂单是否成交(通过持仓变化判断)
- for order_id, order in list(self.pending_orders.items()):
- if order.status == "pending":
- # 检查订单是否超时
- if time.time() - order.create_time > self.config.order_timeout:
- # 订单超时,标记为取消
- order.status = "cancelled"
- logger.info(f"订单超时: {order_id} {order.side} @ {order.price}")
- continue
- except Exception as e:
- logger.error(f"更新挂单状态异常: {e}")
-
- def _place_counter_order(self, filled_order: PendingOrder):
- """
- 订单成交后,在另一侧挂单
-
- Args:
- filled_order: 已成交的订单
- """
- # 等待一小段时间,确保订单状态更新
- time.sleep(0.1)
-
- order_book = self.api.get_order_book()
- if not order_book:
- logger.warning("无法获取订单簿,无法挂反向单")
- return
-
- # 计算反向订单价格
- buy_price, sell_price = self.calculate_order_prices(order_book)
-
- if filled_order.side == "buy":
- # 买单成交,挂卖单
- if sell_price and self.trading_executor:
- contract_size = self.config.order_size_usdt / sell_price / 0.01
- if contract_size < 1:
- contract_size = 1
-
- if self.trading_executor.place_limit_order("sell", sell_price, contract_size):
- order_id = f"sell_{int(time.time() * 1000)}"
- with self.order_lock:
- self.pending_orders[order_id] = PendingOrder(
- order_id=order_id,
- side="sell",
- price=sell_price,
- size=self.config.order_size_usdt,
- create_time=time.time(),
- status="pending"
- )
- logger.info(f"买单成交后挂卖单: {sell_price}, 订单ID: {order_id}")
- else:
- logger.warning("买单成交后挂卖单失败")
- else:
- # 卖单成交,挂买单平空或开多
- if buy_price and self.trading_executor:
- contract_size = self.config.order_size_usdt / buy_price / 0.01
- if contract_size < 1:
- contract_size = 1
-
- if self.trading_executor.place_limit_order("buy", buy_price, contract_size):
- order_id = f"buy_{int(time.time() * 1000)}"
- with self.order_lock:
- self.pending_orders[order_id] = PendingOrder(
- order_id=order_id,
- side="buy",
- price=buy_price,
- size=self.config.order_size_usdt,
- create_time=time.time(),
- status="pending"
- )
- logger.info(f"卖单成交后挂买单: {buy_price}, 订单ID: {order_id}")
- else:
- logger.warning("卖单成交后挂买单失败")
-
- def place_market_making_orders(self):
- """放置做市订单"""
- # 获取订单簿
- order_book = self.api.get_order_book()
- if not order_book or not order_book.mid_price:
- logger.warning("无法获取订单簿")
- return
-
- # 检查持仓
- position = self.api.get_position()
- position_value = 0.0
- if position:
- current_price = order_book.mid_price
- position_amount = abs(float(position.get('current_amount', 0)))
- # 计算持仓价值(USDT)
- position_value = position_amount * current_price
-
- # 如果持仓超过限制,只挂反向单
- if position_value >= self.config.max_position_usdt:
- logger.warning(f"持仓超过限制: {position_value} USDT,只挂反向单")
- # 只挂反向单平仓
- if position:
- position_type = int(position.get('position_type', 0))
- if position_type == 1: # 多仓
- # 挂卖单平多
- _, sell_price = self.calculate_order_prices(order_book)
- if sell_price and self.trading_executor:
- contract_size = self.config.order_size_usdt / sell_price / 0.01
- if contract_size < 1:
- contract_size = 1
- self.trading_executor.place_limit_order("sell", sell_price, contract_size)
- elif position_type == 2: # 空仓
- # 挂买单平空
- buy_price, _ = self.calculate_order_prices(order_book)
- if buy_price and self.trading_executor:
- contract_size = self.config.order_size_usdt / buy_price / 0.01
- if contract_size < 1:
- contract_size = 1
- self.trading_executor.place_limit_order("buy", buy_price, contract_size)
- return
-
- # 计算挂单价格
- buy_price, sell_price = self.calculate_order_prices(order_book)
-
- if not buy_price or not sell_price:
- return
-
- # 检查当前挂单数量
- with self.order_lock:
- pending_buy_count = sum(1 for o in self.pending_orders.values()
- if o.side == "buy" and o.status == "pending")
- pending_sell_count = sum(1 for o in self.pending_orders.values()
- if o.side == "sell" and o.status == "pending")
-
- # 如果两侧都有挂单,不重复挂
- if pending_buy_count > 0 and pending_sell_count > 0:
- return
-
- # 挂买单(通过浏览器)
- if pending_buy_count == 0:
- # 计算张数(根据合约规格调整)
- # 假设页面输入框单位是张数,需要将USDT金额转换为张数
- # size_usdt / price = ETH数量,再除以合约面值得到张数
- contract_size = self.config.order_size_usdt / buy_price / 0.01
- if contract_size < 1:
- contract_size = 1
-
- if self.trading_executor and self.trading_executor.place_limit_order("buy", buy_price, contract_size):
- # 使用时间戳作为订单ID
- order_id = f"buy_{int(time.time() * 1000)}"
- with self.order_lock:
- self.pending_orders[order_id] = PendingOrder(
- order_id=order_id,
- side="buy",
- price=buy_price,
- size=self.config.order_size_usdt,
- create_time=time.time(),
- status="pending"
- )
- logger.info(f"挂买单成功: {buy_price}, 订单ID: {order_id}")
- else:
- logger.warning("挂买单失败")
-
- # 挂卖单(通过浏览器)
- if pending_sell_count == 0:
- # 计算张数
- contract_size = self.config.order_size_usdt / sell_price / 0.01
- if contract_size < 1:
- contract_size = 1
-
- if self.trading_executor and self.trading_executor.place_limit_order("sell", sell_price, contract_size):
- # 使用时间戳作为订单ID
- order_id = f"sell_{int(time.time() * 1000)}"
- with self.order_lock:
- self.pending_orders[order_id] = PendingOrder(
- order_id=order_id,
- side="sell",
- price=sell_price,
- size=self.config.order_size_usdt,
- create_time=time.time(),
- status="pending"
- )
- logger.info(f"挂卖单成功: {sell_price}, 订单ID: {order_id}")
- else:
- logger.warning("挂卖单失败")
-
- self.last_order_refresh = time.time()
-
- def check_risk_limits(self) -> bool:
- """检查风险限制"""
- # 检查每日交易次数
- if self.daily_trades >= self.config.max_daily_trades:
- logger.warning(f"达到每日最大交易次数: {self.daily_trades}")
- return False
-
- # 检查每日亏损
- if self.daily_profit <= -self.config.max_daily_loss:
- logger.error(f"达到每日最大亏损: {self.daily_profit}")
- # send_dingtalk_message(f"做市策略达到每日最大亏损: {self.daily_profit} USDT", error=True)
- return False
-
- return True
-
- def run(self):
- """主运行循环"""
- self.running = True
- logger.info("做市策略启动")
-
- while self.running:
- try:
- # 检查风险限制
- if not self.check_risk_limits():
- logger.error("风险限制触发,停止策略")
- break
-
- # 撤销超时订单
- self.cancel_stale_orders()
-
- # 更新挂单状态
- self.update_pending_orders()
-
- # 刷新订单
- if self.should_refresh_orders():
- self.place_market_making_orders()
-
- # 短暂休眠
- time.sleep(0.5)
-
- except KeyboardInterrupt:
- logger.info("收到中断信号,停止策略")
- break
- except Exception as e:
- logger.error(f"策略运行异常: {e}")
- time.sleep(1)
-
- # 清理:刷新页面,手动撤销挂单
- logger.info("清理挂单...使用API撤单")
- try:
- self.api.cancel_all_orders()
- with self.order_lock:
- for order_id in list(self.pending_orders.keys()):
- if self.pending_orders[order_id].status == "pending":
- self.pending_orders[order_id].status = "cancelled"
- except Exception as e:
- logger.error(f"清理挂单失败: {e}")
-
- logger.info("做市策略已停止")
-
- def stop(self):
- """停止策略"""
- self.running = False
-
-
-# ================================================================
-# 🚀 主程序
-# ================================================================
-
-if __name__ == '__main__':
- config = MarketMakingConfig(
- contract_symbol="ETHUSDT",
- bit_id="f2320f57e24c45529a009e1541e25961", # TGE浏览器ID
- trading_url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT",
- spread_percent=0.01, # 0.01%价差
- order_size_usdt=0.1, # 每单10 USDT
- max_position_usdt=3.0, # 最大持仓100 USDT
- order_refresh_interval=2.0, # 2秒刷新一次
- order_timeout=60.0, # 60秒超时
- max_daily_loss=50.0, # 每日最大亏损50 USDT
- max_daily_trades=1000, # 每日最大1000笔
- leverage="35",
- open_type="cross"
- )
-
- strategy = MarketMakingStrategy(config)
-
- try:
- strategy.run()
- except Exception as e:
- logger.error(f"程序异常: {e}")
- # send_dingtalk_message(f"做市策略异常: {e}", error=True)
-
-# 9359,53
-# 14.35
diff --git a/bitmart/优化拿仓版本.py b/bitmart/优化拿仓版本.py
deleted file mode 100644
index 489fcbb..0000000
--- a/bitmart/优化拿仓版本.py
+++ /dev/null
@@ -1,458 +0,0 @@
-"""
-BitMart 被动做市/高频刷单策略 (修复版 V2)
-修复内容:
-1. 修正 get_order_book 中解析深度数据的方式,由字典键名访问改为列表索引访问 (['price'] -> [0])
-"""
-
-import time
-import requests
-from typing import Optional, Dict, List, Tuple
-from dataclasses import dataclass
-from loguru import logger
-from threading import Lock
-
-from DrissionPage import ChromiumPage, ChromiumOptions
-from bitmart.api_contract import APIContract
-
-
-# ================================================================
-# 📊 配置类
-# ================================================================
-
-@dataclass
-class MarketMakingConfig:
- """做市策略配置"""
- # API配置(仅用于查询,不下单)
- api_key: str = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
- secret_key: str = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
- memo: str = "合约交易"
- contract_symbol: str = "ETHUSDT"
-
- # 浏览器配置
- tge_id: int = 196495 # TGE浏览器ID
- tge_url: str = "http://127.0.0.1:50326"
- tge_headers: Dict = None
- trading_url: str = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
-
- # 做市基础参数
- spread_percent: float = 0.04 # 基础价差 (0.04% 约为 $1左右 on ETH)
- order_size_usdt: float = 10.0 # 每单金额(USDT)
- max_position_usdt: float = 100.0 # 最大持仓金额(USDT)
-
- # 🚀 高级策略参数
- # 库存倾斜:每持有100U,价格偏移多少。正数表示持有多单时价格下调(利于卖出,不利于买入)
- inventory_skew_factor: float = 0.0005
- # 价格容忍度:只有当(目标价 - 当前挂单价) / 目标价 > 0.05% 时才改单,避免频繁操作
- price_tolerance: float = 0.0005
-
- # 风险控制
- max_daily_loss: float = 50.0
- leverage: str = "30"
- open_type: str = "cross"
-
- def __post_init__(self):
- """初始化TGE headers"""
- if self.tge_headers is None:
- self.tge_headers = {
- "Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
- "Content-Type": "application/json"
- }
-
-
-# ================================================================
-# 📊 订单簿数据结构
-# ================================================================
-
-@dataclass
-class OrderBook:
- """订单簿数据"""
- bids: List[Tuple[float, float]] # [(价格, 数量), ...]
- asks: List[Tuple[float, float]] # [(价格, 数量), ...]
- timestamp: float
-
- @property
- def mid_price(self) -> Optional[float]:
- """中间价"""
- if self.bids and self.asks:
- return (self.bids[0][0] + self.asks[0][0]) / 2
- return None
-
-
-# ================================================================
-# 📊 浏览器管理器
-# ================================================================
-
-class BrowserManager:
- """浏览器管理器:负责浏览器的启动、接管和标签页管理"""
-
- def __init__(self, config: MarketMakingConfig):
- self.config = config
- self.tge_port: Optional[int] = None
- self.page: Optional[ChromiumPage] = None
-
- def open_browser(self) -> bool:
- """打开浏览器并获取端口"""
- try:
- response = requests.post(
- f"{self.config.tge_url}/api/browser/start",
- json={"envId": self.config.tge_id},
- headers=self.config.tge_headers,
- timeout=10
- )
- data = response.json()
- if "data" in data and "port" in data["data"]:
- self.tge_port = data["data"]["port"]
- logger.success(f"成功打开浏览器,端口:{self.tge_port}")
- return True
- else:
- logger.error(f"打开浏览器响应异常: {data}")
- return False
- except Exception as e:
- logger.error(f"打开浏览器失败: {e}")
- return False
-
- def take_over_browser(self) -> bool:
- """接管浏览器"""
- if not self.tge_port:
- logger.error("浏览器端口未设置")
- return False
-
- try:
- co = ChromiumOptions()
- co.set_local_port(self.tge_port)
- self.page = ChromiumPage(addr_or_opts=co)
- logger.success("成功接管浏览器")
- return True
- except Exception as e:
- logger.error(f"接管浏览器失败: {e}")
- return False
-
- def close_extra_tabs(self) -> bool:
- """关闭多余的标签页,只保留第一个"""
- if not self.page:
- return False
- try:
- tabs = self.page.get_tabs()
- for idx, tab in enumerate(tabs):
- if idx == 0: continue
- tab.close()
- return True
- except Exception as e:
- logger.warning(f"关闭多余标签页失败: {e}")
- return False
-
-
-# ================================================================
-# 📊 浏览器交易执行器
-# ================================================================
-
-class BrowserTradingExecutor:
- """浏览器交易执行器:通过浏览器自动化下单(获取高返佣)"""
-
- def __init__(self, page: ChromiumPage):
- self.page = page
-
- def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
- """安全点击"""
- try:
- ele = self.page.ele(xpath)
- if not ele:
- return False
- ele.scroll.to_see(center=True)
- time.sleep(sleep)
- ele.click()
- return True
- except Exception as e:
- logger.error(f"点击失败 {xpath}: {e}")
- return False
-
- def 开单(self, marketPriceLongOrder: int = 0, limitPriceShortOrder: int = 0,
- size: Optional[float] = None, price: Optional[float] = None) -> bool:
- """开单操作"""
- size = 0.1
-
- try:
- # 市价单 (代码略)
- if marketPriceLongOrder == -1: pass
- elif marketPriceLongOrder == 1: pass
-
- # 限价单
- if limitPriceShortOrder == -1:
- # 限价做空
- if not self.click_safe('x://button[normalize-space(text()) ="限价"]'): return False
- self.page.ele('x://*[ @id="price_0"]').input(vals=price, clear=True)
- time.sleep(0.2)
- self.page.ele('x://*[ @id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'): return False
- logger.success(f"浏览器下单: 限价做空 {size} @ {price}")
- return True
-
- elif limitPriceShortOrder == 1:
- # 限价做多
- if not self.click_safe('x://button[normalize-space(text()) ="限价"]'): return False
- self.page.ele('x://*[ @id="price_0"]').input(vals=price, clear=True)
- time.sleep(0.2)
- self.page.ele('x://*[ @id="size_0"]').input(vals=size, clear=True)
- if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'): return False
- logger.success(f"浏览器下单: 限价做多 {size} @ {price}")
- return True
-
- return False
- except Exception as e:
- logger.error(f"开单异常: {e}")
- return False
-
- def place_limit_order(self, side: str, price: float, size: float) -> bool:
- """统一接口"""
- if side == "buy":
- return self.开单(limitPriceShortOrder=1, size=size, price=price)
- else:
- return self.开单(limitPriceShortOrder=-1, size=size, price=price)
-
-
-# ================================================================
-# 📊 BitMart API 封装 (修复 get_order_book)
-# ================================================================
-
-class BitMartMarketMakerAPI:
- """BitMart做市API封装(仅用于查询,不下单)"""
-
- def __init__(self, config: MarketMakingConfig):
- self.config = config
- self.contractAPI = APIContract(
- config.api_key,
- config.secret_key,
- config.memo,
- timeout=(5, 15)
- )
-
- def get_order_book(self) -> Optional[OrderBook]:
- try:
- # 移除不支持的 limit 参数
- response = self.contractAPI.get_depth(contract_symbol=self.config.contract_symbol)[0]
-
- if response.get('code') == 1000:
- data = response.get('data', {})
- bids = []
- asks = []
- # 解析数据
- if isinstance(data, dict):
- bids_raw = data.get('bids', [])
- asks_raw = data.get('asks', [])
-
- # 修复:b 是列表 [price, size],不是字典
- for b in bids_raw[:10]:
- # b[0] 是价格, b[1] 是数量
- bids.append((float(b[0]), float(b[1])))
-
- for a in asks_raw[:10]:
- # a[0] 是价格, a[1] 是数量
- asks.append((float(a[0]), float(a[1])))
-
- if bids and asks:
- return OrderBook(bids=bids, asks=asks, timestamp=time.time())
- else:
- logger.warning(f"获取深度失败: {response}")
- return None
- except Exception as e:
- logger.error(f"获取订单簿异常: {e}")
- return None
-
- def get_position_net(self) -> float:
- """获取净持仓 (多为正,空为负)"""
- try:
- response = self.contractAPI.get_position(contract_symbol=self.config.contract_symbol)[0]
- if response.get('code') == 1000:
- data = response.get('data', [])
- if data:
- pos = data[0]
- current_amount = float(pos.get('current_amount', 0))
- position_type = int(pos.get('position_type', 0)) # 1多 2空
- if position_type == 1: return current_amount
- if position_type == 2: return -current_amount
- return 0.0
- except Exception as e:
- logger.error(f"持仓查询异常: {e}")
- return 0.0
-
- def get_open_orders(self) -> List[Dict]:
- """获取当前挂单"""
- try:
- resp = self.contractAPI.get_open_order(contract_symbol=self.config.contract_symbol)[0]
- if resp.get("code") == 1000:
- return resp.get("data", [])
- return []
- except Exception as e:
- logger.error(f"查询挂单异常: {e}")
- return []
-
- def cancel_order(self, order_id: str) -> bool:
- """API撤单"""
- try:
- resp = self.contractAPI.post_cancel_order(contract_symbol=self.config.contract_symbol, order_id=order_id)[0]
- return resp.get("code") == 1000
- except Exception as e:
- logger.error(f"API撤单异常: {e}")
- return False
-
- def set_leverage(self):
- try:
- self.contractAPI.post_submit_leverage(contract_symbol=self.config.contract_symbol, leverage=self.config.leverage, open_type=self.config.open_type)
- except:
- pass
-
-
-# ================================================================
-# 🧠 策略核心
-# ================================================================
-
-class MarketMakingStrategy:
- """优化版被动做市策略"""
-
- def __init__(self, config: MarketMakingConfig):
- self.config = config
- self.api = BitMartMarketMakerAPI(config)
- self.browser_manager = BrowserManager(config)
- self.trading_executor: Optional[BrowserTradingExecutor] = None
- self.running = False
-
- # 初始化流程
- if not self._initialize_browser():
- raise Exception("浏览器初始化失败")
- self.api.set_leverage()
-
- def _initialize_browser(self) -> bool:
- try:
- if not self.browser_manager.open_browser(): return False
- if not self.browser_manager.take_over_browser(): return False
- self.browser_manager.close_extra_tabs()
-
- # 访问交易页
- logger.info(f"正在访问交易页: {self.config.trading_url}")
- self.browser_manager.page.get(self.config.trading_url)
- time.sleep(3)
-
- self.trading_executor = BrowserTradingExecutor(self.browser_manager.page)
- logger.success("浏览器环境就绪")
- return True
- except Exception as e:
- logger.error(f"浏览器初始化异常: {e}")
- return False
-
- def calculate_target_prices(self, mid_price: float, net_position: float) -> Tuple[float, float]:
- """核心算法:计算考虑了库存倾斜的目标买卖价"""
- # 1. 基础价差的一半
- half_spread = mid_price * (self.config.spread_percent / 100) / 2
-
- # 2. 库存倾斜调整
- skew_adjust = net_position * self.config.inventory_skew_factor * mid_price
-
- quote_mid = mid_price - skew_adjust
-
- target_bid = quote_mid - half_spread
- target_ask = quote_mid + half_spread
-
- # 3. 价格修正 (防止穿仓)
- if target_ask <= target_bid:
- target_ask = target_bid + mid_price * 0.0001
-
- return round(target_bid, 2), round(target_ask, 2)
-
- def reconcile_orders(self, target_bid: float, target_ask: float):
- """调节逻辑:对比API实际挂单 vs 目标价格"""
- open_orders = self.api.get_open_orders()
-
- current_bids = []
- current_asks = []
-
- for o in open_orders:
- side = o.get('side')
- # 兼容API返回
- side_str = str(side).lower()
- if side_str == '1' or 'buy' in side_str:
- current_bids.append(o)
- elif side_str == '2' or 'sell' in side_str:
- current_asks.append(o)
-
- # --- 调节买单 ---
- valid_bid_exists = False
- for order in current_bids:
- price = float(order.get('price', 0))
- diff_pct = abs(price - target_bid) / target_bid
-
- if diff_pct < self.config.price_tolerance:
- valid_bid_exists = True
- else:
- logger.info(f"♻️ 买单价格偏离 (现{price} vs 标{target_bid}),撤单")
- self.api.cancel_order(order.get('order_id') or order.get('id'))
-
- if not valid_bid_exists:
- # 计算张数
- size_contract = self.config.order_size_usdt / target_bid / 0.01
- size_contract = max(1, int(size_contract))
- logger.info(f"➕ 补买单: {target_bid} (数量:{size_contract})")
- self.trading_executor.place_limit_order("buy", target_bid, size_contract)
-
- # --- 调节卖单 ---
- valid_ask_exists = False
- for order in current_asks:
- price = float(order.get('price', 0))
- diff_pct = abs(price - target_ask) / target_ask
-
- if diff_pct < self.config.price_tolerance:
- valid_ask_exists = True
- else:
- logger.info(f"♻️ 卖单价格偏离 (现{price} vs 标{target_ask}),撤单")
- self.api.cancel_order(order.get('order_id') or order.get('id'))
-
- if not valid_ask_exists:
- size_contract = self.config.order_size_usdt / target_ask / 0.01
- size_contract = max(1, int(size_contract))
- logger.info(f"➕ 补卖单: {target_ask} (数量:{size_contract})")
- self.trading_executor.place_limit_order("sell", target_ask, size_contract)
-
- def run(self):
- self.running = True
- logger.info("🚀 策略已启动")
-
- while self.running:
- try:
- # 1. 获取市场数据
- ob = self.api.get_order_book()
- if not ob:
- time.sleep(1)
- continue
- mid_price = ob.mid_price
-
- # 2. 获取持仓
- net_position = self.api.get_position_net()
-
- # 3. 计算目标价
- t_bid, t_ask = self.calculate_target_prices(mid_price, net_position)
- logger.info(f"Mid:{mid_price:.2f} | Pos:{net_position} | Target Bid:{t_bid} Ask:{t_ask}")
-
- # 4. 调节挂单
- self.reconcile_orders(t_bid, t_ask)
-
- # 5. 循环间隔
- time.sleep(3)
-
- except KeyboardInterrupt:
- logger.warning("停止策略")
- break
- except Exception as e:
- logger.error(f"Loop Exception: {e}")
- time.sleep(2)
-
-if __name__ == '__main__':
- config = MarketMakingConfig(
- contract_symbol="ETHUSDT",
- spread_percent=0.04,
- order_size_usdt=0.1,
- max_position_usdt=50.0,
- inventory_skew_factor=0.0005,
- price_tolerance=0.0005
- )
-
- strategy = MarketMakingStrategy(config)
- strategy.run()
diff --git a/bitmart/回测.py b/bitmart/回测.py
deleted file mode 100644
index f709dce..0000000
--- a/bitmart/回测.py
+++ /dev/null
@@ -1,83 +0,0 @@
-import time
-import csv
-
-import loguru
-from bitmart.api_contract import APIContract
-
-# ------------------ 配置 ------------------
-START_YEAR = 2025
-CONTRACT_SYMBOL = "ETHUSDT"
-STEP = 3 # K 线周期,单位分钟
-CSV_FILE = f"kline_{STEP}.csv"
-
-memo = "合约交易"
-api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
-secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
-
-contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15))
-
-# ------------------ 时间戳 ------------------
-start_of_year = int(time.mktime((START_YEAR, 1, 1, 0, 0, 0, 0, 0, 0)))
-current_time = int(time.time())
-
-# ------------------ 抓取数据 ------------------
-all_data = []
-existing_ids = set()
-start_time = start_of_year
-
-# 每次请求时间长度 = step * 500 条 K 线
-request_interval_ms = STEP * 60 * 500
-
-while start_time < current_time:
- end_time = min(start_time + request_interval_ms, current_time)
- loguru.logger.info(f"抓取时间段: {start_time} ~ {end_time}")
-
- try:
- response = contractAPI.get_kline(
- contract_symbol=CONTRACT_SYMBOL,
- step=STEP,
- start_time=start_time,
- end_time=end_time
- )[0]["data"]
-
- formatted = []
- for k in response:
- print(k)
- k_id = int(k["timestamp"])
- if k_id in existing_ids:
- continue
- existing_ids.add(k_id)
- formatted.append({
- 'id': int(k["timestamp"]),
- 'open': float(k["open_price"]),
- 'high': float(k["high_price"]),
- 'low': float(k["low_price"]),
- 'close': float(k["close_price"]),
- 'volume': float(k["volume"])
- })
-
- formatted.sort(key=lambda x: x['id'])
- all_data.extend(formatted)
-
- if len(response) < 500:
- start_time = end_time
- else:
- start_time = formatted[-1]['id'] + 1
-
- except Exception as e:
- print(f"请求出错: {e},等待 60 秒后重试")
- time.sleep(60)
-
- time.sleep(0.2) # 控制速率,保证 <= 每 2 秒 12 次
-
-# ------------------ 保存 CSV ------------------
-csv_columns = ['id', 'open', 'high', 'low', 'close', 'volume']
-try:
- with open(CSV_FILE, 'w', newline='') as csvfile:
- writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
- writer.writeheader()
- for data in all_data:
- writer.writerow(data)
- print(f"数据已保存到 {CSV_FILE},共 {len(all_data)} 条")
-except IOError:
- print("I/O error")
diff --git a/bitmart/回测图表.png b/bitmart/回测图表.png
deleted file mode 100644
index d3df2fd..0000000
Binary files a/bitmart/回测图表.png and /dev/null differ
diff --git a/bitmart/回测图表_交互式.html b/bitmart/回测图表_交互式.html
deleted file mode 100644
index b67088e..0000000
--- a/bitmart/回测图表_交互式.html
+++ /dev/null
@@ -1,3888 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/bitmart/均线回归.py b/bitmart/均线回归.py
deleted file mode 100644
index 9616cce..0000000
--- a/bitmart/均线回归.py
+++ /dev/null
@@ -1,866 +0,0 @@
-import os
-import time
-import uuid
-import datetime
-from dataclasses import dataclass
-
-from tqdm import tqdm
-from loguru import logger
-
-from bitmart.api_contract import APIContract
-from bitmart.lib.cloud_exceptions import APIException
-
-from 交易.tools import send_dingtalk_message
-
-
-@dataclass
-class StrategyConfig:
- # =============================
- # 1m | ETH 永续 | 控止损≤5/日
- # =============================
-
- # ===== 合约 =====
- contract_symbol: str = "ETHUSDT"
- open_type: str = "cross"
- leverage: str = "30"
-
- # ===== K线与指标 =====
- step_min: int = 1
- lookback_min: int = 240
- ema_len: int = 36
- atr_len: int = 14
-
- # =========================================================
- # ✅ 自动阈值:ATR/Price 分位数基准(更稳,不被短时噪声带跑)
- # =========================================================
- vol_baseline_window: int = 60
- vol_baseline_quantile: float = 0.65
- vol_scale_min: float = 0.80
- vol_scale_max: float = 1.60
-
- # ✅ baseline 每 60 秒刷新一次(体感更明显、也省CPU)
- base_ratio_refresh_sec: int = 60
-
- # =========================================================
- # ✅ 动态 floor(方案一)
- # floor = clamp(min, base_k * base_ratio, max)
- # 目的:跟着典型波动变,过滤小噪声;tp/sl 也随环境自适应
- # =========================================================
- # entry_dev_floor 动态
- entry_dev_floor_min: float = 0.0012 # 0.12%
- entry_dev_floor_max: float = 0.0030 # 0.30%(可按你偏好调)
- entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio
-
- # tp_floor 动态
- tp_floor_min: float = 0.0006 # 0.06%
- tp_floor_max: float = 0.0020 # 0.20%
- tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio(止盈别太大,1m回归更实际)
-
- # sl_floor 动态
- sl_floor_min: float = 0.0018 # 0.18%
- sl_floor_max: float = 0.0060 # 0.60%
- sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratio(ETH 1m 插针多,止损下限可更稳)
-
- # =========================================================
- # ✅ 动态阈值倍率(仍然保留你原来思路)
- # =========================================================
- entry_k: float = 1.45
- tp_k: float = 0.65
- sl_k: float = 1.05
-
- # ===== 时间/冷却 =====
- max_hold_sec: int = 75
- cooldown_sec_after_exit: int = 20
-
- # ===== 下单/仓位 =====
- risk_percent: float = 0.004
- min_size: int = 1
- max_size: int = 5000
-
- # ===== 日内风控 =====
- daily_loss_limit: float = 0.02
- daily_profit_cap: float = 0.01
-
- # ===== 危险模式过滤 =====
- atr_ratio_kill: float = 0.0038
- big_body_kill: float = 0.010
-
- # ===== 轮询节奏 =====
- klines_refresh_sec: int = 10
- tick_refresh_sec: int = 1
- status_notify_sec: int = 60
-
- # =========================================================
- # ✅ 止损后同向入场加门槛(但不禁止同向重入)
- # =========================================================
- reentry_penalty_mult: float = 1.55
- reentry_penalty_max_sec: int = 180
- reset_band_k: float = 0.45
- reset_band_floor: float = 0.0006
-
- # =========================================================
- # ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动
- # =========================================================
- post_sl_sl_max_sec: int = 90
- post_sl_mult_min: float = 1.02
- post_sl_mult_max: float = 1.16
- post_sl_vol_alpha: float = 0.20
-
-
-class BitmartFuturesMeanReversionBot:
- def __init__(self, cfg: StrategyConfig):
- self.cfg = cfg
-
- self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
- self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
- self.memo = "合约交易"
-
- if not self.api_key or not self.secret_key:
- raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)")
-
- self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
-
- # 持仓状态: -1 空, 0 无, 1 多
- self.pos = 0
- self.entry_price = None
- self.entry_ts = None
- self.last_exit_ts = 0
-
- # 日内权益基准
- self.day_start_equity = None
- self.trading_enabled = True
- self.day_tag = datetime.date.today()
-
- # 缓存
- self._klines_cache = None
- self._klines_cache_ts = 0
- self._last_status_notify_ts = 0
-
- # ✅ base_ratio 缓存
- self._base_ratio_cached = 0.0015 # 初始化默认值 0.15%
- self._base_ratio_ts = 0.0
-
- # ✅ 止损后"同向入场加门槛"状态
- self.last_sl_dir = 0 # 1=多止损,-1=空止损,0=无
- self.last_sl_ts = 0.0
-
- # ✅ 止损后"同方向 SL 联动放宽"状态
- self.post_sl_dir = 0
- self.post_sl_ts = 0.0
- self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale
-
- self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
-
- logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%")
-
- # ----------------- 通用工具 -----------------
- def ding(self, msg, error=False):
- prefix = "❌bitmart:" if error else "🔔bitmart:"
- if error:
- for _ in range(3):
- send_dingtalk_message(f"{prefix}{msg}")
- else:
- send_dingtalk_message(f"{prefix}{msg}")
-
- def set_leverage(self) -> bool:
- try:
- resp = self.contractAPI.post_submit_leverage(
- contract_symbol=self.cfg.contract_symbol,
- leverage=self.cfg.leverage,
- open_type=self.cfg.open_type
- )[0]
- if resp.get("code") == 1000:
- logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
- return True
- logger.error(f"设置杠杆失败: {resp}")
- self.ding(f"设置杠杆失败: {resp}", error=True)
- return False
- except Exception as e:
- logger.error(f"设置杠杆异常: {e}")
- self.ding(f"设置杠杆异常: {e}", error=True)
- return False
-
- # ----------------- 行情/指标 -----------------
- def get_klines_cached(self):
- now = time.time()
- if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
- return self._klines_cache
-
- kl = self.get_klines()
- if kl:
- self._klines_cache = kl
- self._klines_cache_ts = now
- return self._klines_cache
-
- def get_klines(self):
- try:
- end_time = int(time.time())
- start_time = end_time - 60 * self.cfg.lookback_min
-
- resp = self.contractAPI.get_kline(
- contract_symbol=self.cfg.contract_symbol,
- step=self.cfg.step_min,
- start_time=start_time,
- end_time=end_time
- )[0]
-
- if resp.get("code") != 1000:
- logger.error(f"获取K线失败: {resp}")
- return None
-
- data = resp.get("data", [])
- formatted = []
- for k in data:
- formatted.append({
- "id": int(k["timestamp"]),
- "open": float(k["open_price"]),
- "high": float(k["high_price"]),
- "low": float(k["low_price"]),
- "close": float(k["close_price"]),
- })
- formatted.sort(key=lambda x: x["id"])
- return formatted
- except Exception as e:
- logger.error(f"获取K线异常: {e}")
- self.ding(f"获取K线异常: {e}", error=True)
- return None
-
- def get_last_price(self, fallback_close: float) -> float:
- try:
- if hasattr(self.contractAPI, "get_contract_details"):
- r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
- d = r.get("data") if isinstance(r, dict) else None
- if isinstance(d, dict):
- for key in ("last_price", "mark_price", "index_price"):
- if key in d and d[key] is not None:
- return float(d[key])
-
- if hasattr(self.contractAPI, "get_ticker"):
- r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
- d = r.get("data") if isinstance(r, dict) else None
- if isinstance(d, dict):
- for key in ("last_price", "price", "last", "close"):
- if key in d and d[key] is not None:
- return float(d[key])
- except Exception:
- pass
-
- return float(fallback_close)
-
- @staticmethod
- def ema(values, n: int) -> float:
- k = 2 / (n + 1)
- e = values[0]
- for v in values[1:]:
- e = v * k + e * (1 - k)
- return e
-
- @staticmethod
- def atr(klines, n: int) -> float:
- if len(klines) < n + 1:
- return 0.0
- trs = []
- for i in range(-n, 0):
- cur = klines[i]
- prev = klines[i - 1]
- tr = max(
- cur["high"] - cur["low"],
- abs(cur["high"] - prev["close"]),
- abs(cur["low"] - prev["close"]),
- )
- trs.append(tr)
- return sum(trs) / len(trs)
-
- def is_danger_market(self, klines, price: float) -> bool:
- last = klines[-1]
- body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
- if body >= self.cfg.big_body_kill:
- return True
-
- a = self.atr(klines, self.cfg.atr_len)
- atr_ratio = (a / price) if price > 0 else 0.0
- if atr_ratio >= self.cfg.atr_ratio_kill:
- return True
-
- return False
-
- def atr_ratio_baseline(self, klines) -> float:
- """简化版ATR基准计算"""
- window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1)
- if window <= 10: # 数据太少
- logger.warning(f"数据不足计算基准: {len(klines)}根K线")
- return 0.0
-
- ratios = []
-
- # 简化计算:每隔3根K线计算一个ATR比率(减少计算量)
- step = 3
- for i in range(-window, 0, step):
- if len(klines) + i < self.cfg.atr_len + 1:
- continue
-
- # 计算当前位置的ATR
- start_idx = len(klines) + i - self.cfg.atr_len
- end_idx = len(klines) + i
-
- if start_idx < 0 or end_idx <= start_idx:
- continue
-
- sub_klines = klines[start_idx:end_idx]
-
- # 确保有足够数据计算ATR
- if len(sub_klines) >= self.cfg.atr_len + 1:
- a = self.atr(sub_klines, self.cfg.atr_len)
- price = klines[end_idx - 1]["close"]
-
- if a > 0 and price > 0:
- ratio = a / price
- if 0.0001 < ratio < 0.01: # 过滤异常值
- ratios.append(ratio)
-
- if len(ratios) < 5: # 样本太少
- # 尝试直接使用整个数据计算一个ATR比率
- a = self.atr(klines[-60:], self.cfg.atr_len) # 使用最近60根K线
- price = klines[-1]["close"]
- if a > 0 and price > 0:
- baseline = a / price
- logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%")
- return baseline
- else:
- return 0.0
-
- # 计算分位数
- ratios.sort()
- idx = min(len(ratios) - 1,
- max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1))))
- baseline = ratios[idx]
-
- logger.debug(f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, "
- f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]")
-
- return baseline
-
- def get_base_ratio_cached(self, klines) -> float:
- """获取缓存的基准波动率,定期刷新"""
- now = time.time()
- refresh_sec = self.cfg.base_ratio_refresh_sec
-
- if (self._base_ratio_cached is None or
- (now - self._base_ratio_ts) >= refresh_sec):
-
- # 使用简单版本的基准计算
- baseline = self.atr_ratio_baseline(klines)
-
- if baseline > 0.0001: # 大于0.01%才认为是有效值
- self._base_ratio_cached = baseline
- self._base_ratio_ts = now
- logger.info(f"基准波动率更新: {baseline * 100:.4f}%")
- else:
- # 使用基于价格的动态默认值
- current_price = klines[-1]["close"] if klines else 3000
- # ETH价格越高,基准波动率越小(百分比)
- if current_price > 4000:
- default_baseline = 0.0010 # 0.10%
- elif current_price > 3500:
- default_baseline = 0.0012 # 0.12%
- elif current_price > 3000:
- default_baseline = 0.0015 # 0.15%
- elif current_price > 2500:
- default_baseline = 0.0018 # 0.18%
- else:
- default_baseline = 0.0020 # 0.20%
-
- self._base_ratio_cached = default_baseline
- self._base_ratio_ts = now
- logger.warning(f"使用价格动态默认基准: {default_baseline * 100:.4f}% "
- f"(价格=${current_price:.0f})")
-
- return self._base_ratio_cached
-
- @staticmethod
- def _clamp(x: float, lo: float, hi: float) -> float:
- """限制数值在指定范围内"""
- return max(lo, min(hi, x))
-
- def dynamic_thresholds(self, atr_ratio: float, base_ratio: float):
- """
- ✅ entry/tp/sl 全部动态(修复版):
- - vol_scale:atr_ratio/base_ratio 限幅
- - floor:方案一 (floor = clamp(min, k*base_ratio, max))
- - 最终阈值:max(floor, k * vol_scale * atr_ratio)
- """
- # 1) 检查输入有效性
- if atr_ratio <= 0:
- logger.warning(f"ATR比率异常: {atr_ratio}")
- atr_ratio = 0.001 # 默认值 0.1%
-
- # 2) 如果base_ratio太小或无效,使用调整后的atr_ratio
- if base_ratio < 0.0005: # 小于0.05%视为无效
- base_ratio = max(0.001, atr_ratio * 1.2) # 比当前ATR比率稍大
- logger.debug(f"基准太小,使用调整后的atr_ratio: {base_ratio * 100:.4f}%")
-
- # 3) vol_scale计算
- if base_ratio > 0:
- raw_scale = atr_ratio / base_ratio
- vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max)
- logger.debug(
- f"vol_scale: {raw_scale:.2f} → {vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)")
- else:
- vol_scale = 1.0
- logger.warning(f"基准无效,使用默认vol_scale=1.0")
-
- # 4) 动态floor计算
- # Entry floor
- entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio
- entry_floor = self._clamp(
- entry_floor_raw,
- self.cfg.entry_dev_floor_min,
- self.cfg.entry_dev_floor_max,
- )
-
- # TP floor
- tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio
- tp_floor = self._clamp(
- tp_floor_raw,
- self.cfg.tp_floor_min,
- self.cfg.tp_floor_max,
- )
-
- # SL floor
- sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio
- sl_floor = self._clamp(
- sl_floor_raw,
- self.cfg.sl_floor_min,
- self.cfg.sl_floor_max,
- )
-
- # 5) 最终阈值计算
- entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio
- entry_dev = max(entry_floor, entry_dev_atr_part)
-
- tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio
- tp = max(tp_floor, tp_atr_part)
-
- sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio
- sl = max(sl_floor, sl_atr_part)
-
- # 6) 确保entry_dev不会太小
- entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min)
-
- # 7) 输出详细信息
- logger.info(
- f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, "
- f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, "
- f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%"
- )
-
- return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor
-
- # ----------------- 账户/仓位 -----------------
- def get_assets_available(self) -> float:
- try:
- resp = self.contractAPI.get_assets_detail()[0]
- if resp.get("code") != 1000:
- return 0.0
- data = resp.get("data")
- if isinstance(data, dict):
- return float(data.get("available_balance", 0))
- if isinstance(data, list):
- for asset in data:
- if asset.get("currency") == "USDT":
- return float(asset.get("available_balance", 0))
- return 0.0
- except Exception as e:
- logger.error(f"余额查询异常: {e}")
- return 0.0
-
- def get_position_status(self) -> bool:
- try:
- resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
- if resp.get("code") != 1000:
- return False
-
- positions = resp.get("data", [])
- if not positions:
- self.pos = 0
- return True
-
- p = positions[0]
- self.pos = 1 if p["position_type"] == 1 else -1
- return True
- except Exception as e:
- logger.error(f"持仓查询异常: {e}")
- self.ding(f"持仓查询异常: {e}", error=True)
- return False
-
- def get_equity_proxy(self) -> float:
- return self.get_assets_available()
-
- def refresh_daily_baseline(self):
- today = datetime.date.today()
- if today != self.day_tag:
- self.day_tag = today
- self.day_start_equity = None
- self.trading_enabled = True
- self.ding(f"新的一天({today}):重置日内风控基准")
-
- def risk_kill_switch(self):
- self.refresh_daily_baseline()
- equity = self.get_equity_proxy()
- if equity <= 0:
- return
-
- if self.day_start_equity is None:
- self.day_start_equity = equity
- logger.info(f"日内权益基准设定:{equity:.2f} USDT")
- return
-
- pnl = (equity - self.day_start_equity) / self.day_start_equity
- if pnl <= -self.cfg.daily_loss_limit:
- self.trading_enabled = False
- self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
-
- if pnl >= self.cfg.daily_profit_cap:
- self.trading_enabled = False
- self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
-
- # ----------------- 下单 -----------------
- def calculate_size(self, price: float) -> int:
- bal = self.get_assets_available()
- if bal < 10:
- return 0
-
- margin = bal * self.cfg.risk_percent
- lev = int(self.cfg.leverage)
-
- # ⚠️ 沿用你的原假设:1张≈0.001ETH
- size = int((margin * lev) / (price * 0.001))
- size = max(self.cfg.min_size, size)
- size = min(self.cfg.max_size, size)
- return size
-
- def place_market_order(self, side: int, size: int) -> bool:
- if size <= 0:
- return False
-
- client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}"
- try:
- resp = self.contractAPI.post_submit_order(
- contract_symbol=self.cfg.contract_symbol,
- client_order_id=client_order_id,
- side=side,
- mode=1,
- type="market",
- leverage=self.cfg.leverage,
- open_type=self.cfg.open_type,
- size=size
- )[0]
-
- logger.info(f"order_resp: {resp}")
-
- if resp.get("code") == 1000:
- return True
-
- self.ding(f"下单失败: {resp}", error=True)
- return False
-
- except APIException as e:
- logger.error(f"API下单异常: {e}")
- self.ding(f"API下单异常: {e}", error=True)
- return False
-
- except Exception as e:
- logger.error(f"下单未知异常: {e}")
- self.ding(f"下单未知异常: {e}", error=True)
- return False
-
- def close_position_all(self):
- if self.pos == 1:
- ok = self.place_market_order(3, 999999)
- if ok:
- self.pos = 0
- elif self.pos == -1:
- ok = self.place_market_order(2, 999999)
- if ok:
- self.pos = 0
-
- # ----------------- 止损后机制 -----------------
- def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
- """检查是否需要应用重新入场惩罚"""
- if self.last_sl_dir == 0:
- return False
-
- if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
- self.last_sl_dir = 0
- return False
-
- reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
- if abs(dev) <= reset_band:
- self.last_sl_dir = 0
- return False
-
- return True
-
- def _post_sl_dynamic_mult(self) -> float:
- """计算止损后SL放宽倍数"""
- if self.post_sl_dir == 0:
- return 1.0
-
- if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
- self.post_sl_dir = 0
- self.post_sl_vol_scale = 1.0
- return 1.0
-
- raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
- raw = max(1.0, raw) # 不缩小止损,只放宽
- return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
-
- # ----------------- 交易逻辑 -----------------
- def in_cooldown(self) -> bool:
- """检查是否在冷却期内"""
- return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit
-
- def maybe_enter(self, price: float, ema_value: float, entry_dev: float):
- """检查并执行入场"""
- if self.pos != 0:
- return
- if self.in_cooldown():
- return
-
- dev = (price - ema_value) / ema_value if ema_value else 0.0
- size = self.calculate_size(price)
- if size <= 0:
- return
-
- penalty_active = self._reentry_penalty_active(dev, entry_dev)
-
- long_th = -entry_dev
- short_th = entry_dev
-
- if penalty_active:
- if self.last_sl_dir == 1:
- long_th = -entry_dev * self.cfg.reentry_penalty_mult
- logger.info(
- f"多头止损后惩罚生效: 入场阈值从 {long_th * 100:.3f}% 调整为 {(-entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
- elif self.last_sl_dir == -1:
- short_th = entry_dev * self.cfg.reentry_penalty_mult
- logger.info(
- f"空头止损后惩罚生效: 入场阈值从 {short_th * 100:.3f}% 调整为 {(entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
-
- logger.info(
- f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% "
- f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) "
- f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}"
- )
-
- if dev <= long_th:
- if self.place_market_order(1, size):
- self.pos = 1
- self.entry_price = price
- self.entry_ts = time.time()
- self.ding(f"✅开多:dev={dev * 100:.3f}% size={size} entry={price:.2f}")
-
- elif dev >= short_th:
- if self.place_market_order(4, size):
- self.pos = -1
- self.entry_price = price
- self.entry_ts = time.time()
- self.ding(f"✅开空:dev={dev * 100:.3f}% size={size} entry={price:.2f}")
-
- def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float):
- """检查并执行出场"""
- if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
- return
-
- hold = time.time() - self.entry_ts
-
- if self.pos == 1:
- pnl = (price - self.entry_price) / self.entry_price
- else:
- pnl = (self.entry_price - price) / self.entry_price
-
- sl_mult = 1.0
- if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
- sl_mult = self._post_sl_dynamic_mult()
- effective_sl = sl * sl_mult
-
- if pnl >= tp:
- self.close_position_all()
- self.ding(f"🎯止盈:pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%")
- self.entry_price, self.entry_ts = None, None
- self.last_exit_ts = time.time()
-
- elif pnl <= -effective_sl:
- sl_dir = self.pos
-
- self.close_position_all()
- self.ding(
- f"🛑止损:pnl={pnl * 100:.3f}% price={price:.2f} "
- f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})",
- error=True
- )
-
- self.last_sl_dir = sl_dir
- self.last_sl_ts = time.time()
-
- self.post_sl_dir = sl_dir
- self.post_sl_ts = time.time()
- self.post_sl_vol_scale = float(vol_scale)
-
- self.entry_price, self.entry_ts = None, None
- self.last_exit_ts = time.time()
-
- elif hold >= self.cfg.max_hold_sec:
- self.close_position_all()
- self.ding(f"⏱超时:hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}")
- self.entry_price, self.entry_ts = None, None
- self.last_exit_ts = time.time()
-
- def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
- atr_ratio: float, base_ratio: float, vol_scale: float,
- entry_dev: float, tp: float, sl: float,
- entry_floor: float, tp_floor: float, sl_floor: float):
- """限频状态通知"""
- now = time.time()
- if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
- return
- self._last_status_notify_ts = now
-
- direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无")
- penalty_active = self._reentry_penalty_active(dev, entry_dev)
-
- sl_mult = 1.0
- if self.pos != 0 and self.post_sl_dir == self.pos:
- sl_mult = self._post_sl_dynamic_mult()
-
- base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1
-
- msg = (
- f"【BitMart {self.cfg.contract_symbol}|1m均值回归(动态阈值)】\n"
- f"📊 状态:{direction_str}\n"
- f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}:{ema_value:.2f}\n"
- f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n"
- f"🌊 波动率:ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n"
- f"🎯 动态Floor:入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n"
- f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n"
- f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n"
- f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n"
- f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x\n"
- f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s"
- )
- self.ding(msg)
-
- def action(self):
- """主循环"""
- if not self.set_leverage():
- self.ding("杠杆设置失败,停止运行", error=True)
- return
-
- while True:
- now_dt = datetime.datetime.now()
- self.pbar.n = now_dt.second
- self.pbar.refresh()
-
- # 1. 获取K线数据
- klines = self.get_klines_cached()
- if not klines or len(klines) < (self.cfg.ema_len + 5):
- logger.warning("K线数据不足,等待...")
- time.sleep(1)
- continue
-
- # 2. 计算技术指标
- last_k = klines[-1]
- closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
- ema_value = self.ema(closes, self.cfg.ema_len)
-
- price = self.get_last_price(fallback_close=float(last_k["close"]))
- dev = (price - ema_value) / ema_value if ema_value else 0.0
-
- # 3. 计算波动率相关指标
- a = self.atr(klines, self.cfg.atr_len)
- atr_ratio = (a / price) if price > 0 else 0.0
-
- base_ratio = self.get_base_ratio_cached(klines)
-
- # 4. 计算动态阈值
- entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds(
- atr_ratio, base_ratio
- )
-
- # 记录调试信息
- logger.debug(
- f"循环数据: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}%, "
- f"atr_ratio={atr_ratio * 100:.3f}%, base_ratio={base_ratio * 100:.3f}%, "
- f"entry_dev={entry_dev * 100:.3f}%"
- )
-
- # 5. 风控检查
- self.risk_kill_switch()
-
- # 6. 获取持仓状态
- if not self.get_position_status():
- time.sleep(1)
- continue
-
- # 7. 检查交易是否启用
- if not self.trading_enabled:
- if self.pos != 0:
- self.close_position_all()
- logger.warning("交易被禁用(风控触发),等待...")
- time.sleep(5)
- continue
-
- # 8. 检查危险市场
- if self.is_danger_market(klines, price):
- logger.warning("危险模式:高波动/大实体K,暂停开仓")
- self.maybe_exit(price, tp, sl, vol_scale)
- time.sleep(self.cfg.tick_refresh_sec)
- continue
-
- # 9. 执行交易逻辑
- self.maybe_exit(price, tp, sl, vol_scale)
- self.maybe_enter(price, ema_value, entry_dev)
-
- # 10. 状态通知
- bal = self.get_assets_available()
- self.notify_status_throttled(
- price, ema_value, dev, bal,
- atr_ratio, base_ratio, vol_scale,
- entry_dev, tp, sl,
- entry_floor, tp_floor, sl_floor
- )
-
- time.sleep(self.cfg.tick_refresh_sec)
-
-
-if __name__ == "__main__":
- """
- Windows PowerShell:
- setx BITMART_API_KEY "你的key"
- setx BITMART_SECRET_KEY "你的secret"
- setx BITMART_MEMO "合约交易"
- 重新打开终端再运行。
-
- Linux/macOS:
- export BITMART_API_KEY="你的key"
- export BITMART_SECRET_KEY="你的secret"
- export BITMART_MEMO "合约交易"
- """
- cfg = StrategyConfig()
- bot = BitmartFuturesMeanReversionBot(cfg)
-
- # 设置日志级别为INFO以便查看详细计算过程
- logger.remove()
- logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO")
-
- try:
- bot.action()
- except KeyboardInterrupt:
- logger.info("程序被用户中断")
- bot.ding("🤖 策略已手动停止")
- except Exception as e:
- logger.error(f"程序异常退出: {e}")
- bot.ding(f"❌ 策略异常退出: {e}", error=True)
- raise
-
-# 目前动态计算阀值的速度是多少
\ No newline at end of file
diff --git a/bitmart/均线自动化开单.py b/bitmart/均线自动化开单.py
deleted file mode 100644
index 3e896f5..0000000
--- a/bitmart/均线自动化开单.py
+++ /dev/null
@@ -1,1165 +0,0 @@
-import time
-import datetime
-from typing import Tuple, Optional, List
-from dataclasses import dataclass, field
-
-from tqdm import tqdm
-from loguru import logger
-from bitmart.api_contract import APIContract
-from DrissionPage import ChromiumPage, ChromiumOptions
-
-from bit_tools import openBrowser
-from 交易.tools import send_dingtalk_message
-
-
-@dataclass
-class StrategyConfig:
- """针对90%返佣优化的均值回归策略配置"""
-
- # =============================
- # ETH 永续 | 1分钟K线 | 90%返佣优化
- # =============================
-
- # ===== 合约配置 =====
- contract_symbol: str = "ETHUSDT"
- open_type: str = "cross"
- leverage: str = "30"
-
- # ===== K线与指标 =====
- step_min: int = 1
- lookback_min: int = 240
- ema_len: int = 36
- atr_len: int = 14
-
- # ===== 动态阈值基准 =====
- vol_baseline_window: int = 60
- vol_baseline_quantile: float = 0.65
- vol_scale_min: float = 0.80
- vol_scale_max: float = 1.60
- base_ratio_refresh_sec: int = 60
-
- # ===== 动态floor配置(针对低手续费优化) =====
- entry_dev_floor_min: float = 0.0008 # 0.08%(原0.12%)
- entry_dev_floor_max: float = 0.0020 # 0.20%(原0.30%)
- entry_dev_floor_base_k: float = 1.10
-
- tp_floor_min: float = 0.0004 # 0.04%(原0.06%)
- tp_floor_max: float = 0.0015 # 0.15%(原0.20%)
- tp_floor_base_k: float = 0.55
-
- sl_floor_min: float = 0.0012 # 0.12%(原0.18%)
- sl_floor_max: float = 0.0040 # 0.40%(原0.60%)
- sl_floor_base_k: float = 1.35
-
- # ===== 阈值倍率(针对低手续费优化) =====
- entry_k: float = 1.30 # 原1.45
- tp_k: float = 0.55 # 原0.65
- sl_k: float = 0.90 # 原1.05
-
- # ===== 时间/冷却(降低以增加频率) =====
- max_hold_sec: int = 60 # 原75秒
- cooldown_sec_after_exit: int = 5 # 原20秒
- opposite_direction_cooldown: int = 30 # 平仓后同向冷却30秒
-
- # ===== 仓位管理 =====
- fixed_margin: float = 10.0 # 固定保证金
- min_size: int = 1
- max_size: int = 10
- max_daily_trades: int = 50 # 每日最大交易次数
-
- # ===== 日内风控 =====
- daily_loss_limit: float = 0.02
- daily_profit_cap: float = 0.01
-
- # ===== 危险模式过滤 =====
- atr_ratio_kill: float = 0.0038
- big_body_kill: float = 0.010
-
- # ===== 轮询节奏 =====
- klines_refresh_sec: int = 10
- tick_refresh_sec: int = 1
- status_notify_sec: int = 60
-
- # ===== 止损后机制 =====
- reentry_penalty_mult: float = 1.55
- reentry_penalty_max_sec: int = 180
- reset_band_k: float = 0.45
- reset_band_floor: float = 0.0006
-
- post_sl_sl_max_sec: int = 90
- post_sl_mult_min: float = 1.02
- post_sl_mult_max: float = 1.16
- post_sl_vol_alpha: float = 0.20
-
- # ===== 正常平仓条件(针对低手续费优化) =====
- normal_exit_threshold: float = 0.0002 # 0.02%(原0.03%)
- normal_exit_min_profit: float = 0.0001 # 0.01%(原0.02%)
-
- # ===== 90%返佣配置 =====
- platform_fee_rate: float = 0.0005 # 平台手续费:开仓价值的万分之五
- rebate_rate: float = 0.90 # 返佣比例:90%
-
- # ===== 新增:趋势过滤 =====
- use_trend_filter: bool = True
- trend_ema_len: int = 50
- trend_threshold: float = 0.0005 # 0.05%
-
- # ===== 新增:成交量确认 =====
- require_volume_confirmation: bool = True
- volume_ma_len: int = 20
- volume_threshold: float = 1.2
-
- # ===== 新增:连续亏损控制 =====
- max_consecutive_losses: int = 3
- recovery_wait_sec: int = 60
-
-
-class BitmartFuturesMeanReversionBot:
- def __init__(self, cfg: StrategyConfig, bit_id=None):
- self.bit_id = bit_id
- self.page: Optional[ChromiumPage] = None
- self.cfg = cfg
-
- self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
- self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
- self.memo = "合约交易"
-
- if not self.api_key or not self.secret_key:
- raise RuntimeError("请先设置API密钥")
-
- self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
-
- # 持仓状态
- self.pos = 0 # -1 空, 0 无, 1 多
- self.entry_price: Optional[float] = None
- self.entry_ts: Optional[float] = None
- self.last_exit_ts = 0.0
-
- # 新增:记录上次平仓方向
- self.last_exit_direction = 0 # 1=多平, -1=空平
- self.last_exit_price = 0.0
-
- # 开仓信息
- self.entry_margin: Optional[float] = None
- self.entry_position_value: Optional[float] = None
-
- # 日内权益基准
- self.day_start_equity: Optional[float] = None
- self.trading_enabled = True
- self.day_tag = datetime.date.today()
- self.today_trade_count = 0
- self.consecutive_losses = 0
-
- # 缓存
- self._klines_cache: Optional[List] = None
- self._klines_cache_ts = 0.0
- self._last_status_notify_ts = 0.0
-
- # 基准波动率缓存
- self._base_ratio_cached = 0.0015
- self._base_ratio_ts = 0.0
-
- # 止损后机制状态
- self.last_sl_dir = 0
- self.last_sl_ts = 0.0
- self.post_sl_dir = 0
- self.post_sl_ts = 0.0
- self.post_sl_vol_scale = 1.0
-
- self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
-
- logger.info(f"初始化完成 - 合约:{cfg.contract_symbol}, 杠杆:{cfg.leverage}x, 返佣:{cfg.rebate_rate * 100}%")
-
- # ----------------- 工具函数 -----------------
- def ding(self, msg: str, error: bool = False):
- """发送钉钉通知"""
- prefix = "❌bitmart:" if error else "🔔bitmart:"
- if error:
- for _ in range(1):
- send_dingtalk_message(f"{prefix}{msg}")
- else:
- send_dingtalk_message(f"{prefix}{msg}")
-
- def set_leverage(self) -> bool:
- """设置杠杆"""
- try:
- resp = self.contractAPI.post_submit_leverage(
- contract_symbol=self.cfg.contract_symbol,
- leverage=self.cfg.leverage,
- open_type=self.cfg.open_type
- )[0]
- if resp.get("code") == 1000:
- logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
- return True
- logger.error(f"设置杠杆失败: {resp}")
- return False
- except Exception as e:
- logger.error(f"设置杠杆异常: {e}")
- return False
-
- # ----------------- 行情数据 -----------------
- def get_klines_cached(self) -> Optional[List]:
- """获取缓存的K线数据"""
- now = time.time()
- if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
- return self._klines_cache
-
- kl = self.get_klines()
- if kl:
- self._klines_cache = kl
- self._klines_cache_ts = now
- return self._klines_cache
-
- def get_klines(self) -> Optional[List]:
- """获取K线数据"""
- try:
- end_time = int(time.time())
- start_time = end_time - 60 * self.cfg.lookback_min
-
- resp = self.contractAPI.get_kline(
- contract_symbol=self.cfg.contract_symbol,
- step=self.cfg.step_min,
- start_time=start_time,
- end_time=end_time
- )[0]
-
- if resp.get("code") != 1000:
- logger.error(f"获取K线失败: {resp}")
- return None
-
- data = resp.get("data", [])
- formatted = []
- for k in data:
- formatted.append({
- "id": int(k["timestamp"]),
- "open": float(k["open_price"]),
- "high": float(k["high_price"]),
- "low": float(k["low_price"]),
- "close": float(k["close_price"]),
- })
- formatted.sort(key=lambda x: x["id"])
- return formatted
- except Exception as e:
- logger.error(f"获取K线异常: {e}")
- return None
-
- def get_last_price(self, fallback_close: float) -> float:
- """获取最新价格"""
- try:
- if hasattr(self.contractAPI, "get_contract_details"):
- r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
- d = r.get("data") if isinstance(r, dict) else None
- if isinstance(d, dict):
- for key in ("last_price", "mark_price", "index_price"):
- if key in d and d[key] is not None:
- return float(d[key])
-
- if hasattr(self.contractAPI, "get_ticker"):
- r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
- d = r.get("data") if isinstance(r, dict) else None
- if isinstance(d, dict):
- for key in ("last_price", "price", "last", "close"):
- if key in d and d[key] is not None:
- return float(d[key])
- except Exception:
- pass
-
- return float(fallback_close)
-
- @staticmethod
- def ema(values: List[float], n: int) -> float:
- """计算EMA"""
- k = 2 / (n + 1)
- e = values[0]
- for v in values[1:]:
- e = v * k + e * (1 - k)
- return e
-
- @staticmethod
- def atr(klines: List[dict], n: int) -> float:
- """计算ATR"""
- if len(klines) < n + 1:
- return 0.0
- trs = []
- for i in range(-n, 0):
- cur = klines[i]
- prev = klines[i - 1]
- tr = max(
- cur["high"] - cur["low"],
- abs(cur["high"] - prev["close"]),
- abs(cur["low"] - prev["close"]),
- )
- trs.append(tr)
- return sum(trs) / len(trs)
-
- # ----------------- 风控检查 -----------------
- def is_danger_market(self, klines: List[dict], price: float) -> bool:
- """检查是否危险市场"""
- last = klines[-1]
- body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
- if body >= self.cfg.big_body_kill:
- logger.warning(f"大实体K线: {body * 100:.2f}%")
- return True
-
- a = self.atr(klines, self.cfg.atr_len)
- atr_ratio = (a / price) if price > 0 else 0.0
- if atr_ratio >= self.cfg.atr_ratio_kill:
- logger.warning(f"高ATR比率: {atr_ratio * 100:.2f}%")
- return True
-
- return False
-
- def check_trend_filter(self, price: float) -> bool:
- """趋势过滤器"""
- if not self.cfg.use_trend_filter:
- return True
-
- klines = self.get_klines_cached()
- if not klines or len(klines) < self.cfg.trend_ema_len:
- return True
-
- try:
- closes = [k["close"] for k in klines[-(self.cfg.trend_ema_len + 1):]]
- trend_ema = self.ema(closes, self.cfg.trend_ema_len)
- trend_dev = (price - trend_ema) / trend_ema if trend_ema else 0.0
-
- if abs(trend_dev) > self.cfg.trend_threshold:
- logger.debug(f"趋势过滤触发: 偏差={trend_dev * 100:.3f}%, 阈值={self.cfg.trend_threshold * 100:.3f}%")
- return False
-
- return True
- except Exception as e:
- logger.error(f"趋势过滤异常: {e}")
- return True
-
- # ----------------- 波动率计算 -----------------
- def atr_ratio_baseline(self, klines: List[dict]) -> float:
- """计算ATR比率基准"""
- window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1)
- if window <= 10:
- logger.warning(f"数据不足计算基准: {len(klines)}根K线")
- return 0.0
-
- ratios = []
- step = 3
-
- for i in range(-window, 0, step):
- if len(klines) + i < self.cfg.atr_len + 1:
- continue
-
- start_idx = len(klines) + i - self.cfg.atr_len
- end_idx = len(klines) + i
-
- if start_idx < 0 or end_idx <= start_idx:
- continue
-
- sub_klines = klines[start_idx:end_idx]
- if len(sub_klines) >= self.cfg.atr_len + 1:
- a = self.atr(sub_klines, self.cfg.atr_len)
- price = klines[end_idx - 1]["close"]
- if a > 0 and price > 0:
- ratio = a / price
- if 0.0001 < ratio < 0.01:
- ratios.append(ratio)
-
- if len(ratios) < 5:
- a = self.atr(klines[-60:], self.cfg.atr_len)
- price = klines[-1]["close"]
- if a > 0 and price > 0:
- baseline = a / price
- logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%")
- return baseline
- else:
- return 0.0
-
- ratios.sort()
- idx = min(len(ratios) - 1,
- max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1))))
- baseline = ratios[idx]
-
- logger.debug(f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%")
- return baseline
-
- def get_base_ratio_cached(self, klines: List[dict]) -> float:
- """获取缓存的基准波动率"""
- now = time.time()
- refresh_sec = self.cfg.base_ratio_refresh_sec
-
- if (self._base_ratio_cached is None or
- (now - self._base_ratio_ts) >= refresh_sec):
-
- baseline = self.atr_ratio_baseline(klines)
-
- if baseline > 0.0001:
- self._base_ratio_cached = baseline
- self._base_ratio_ts = now
- logger.info(f"基准波动率更新: {baseline * 100:.4f}%")
- else:
- current_price = klines[-1]["close"] if klines else 3000
- if current_price > 4000:
- default_baseline = 0.0010
- elif current_price > 3500:
- default_baseline = 0.0012
- elif current_price > 3000:
- default_baseline = 0.0015
- elif current_price > 2500:
- default_baseline = 0.0018
- else:
- default_baseline = 0.0020
-
- self._base_ratio_cached = default_baseline
- self._base_ratio_ts = now
- logger.warning(f"使用价格动态默认基准: {default_baseline * 100:.4f}%")
-
- return self._base_ratio_cached
-
- @staticmethod
- def _clamp(x: float, lo: float, hi: float) -> float:
- """限制数值在指定范围内"""
- return max(lo, min(hi, x))
-
- # ----------------- 动态阈值计算 -----------------
- def dynamic_thresholds(self, atr_ratio: float, base_ratio: float) -> Tuple[
- float, float, float, float, float, float, float]:
- """计算动态阈值"""
- if atr_ratio <= 0:
- logger.warning(f"ATR比率异常: {atr_ratio}")
- atr_ratio = 0.001
-
- if base_ratio < 0.0005:
- base_ratio = max(0.001, atr_ratio * 1.2)
- logger.debug(f"基准太小,使用调整后的atr_ratio: {base_ratio * 100:.4f}%")
-
- # vol_scale计算
- if base_ratio > 0:
- raw_scale = atr_ratio / base_ratio
- vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max)
- else:
- vol_scale = 1.0
-
- # 动态floor计算
- entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio
- entry_floor = self._clamp(
- entry_floor_raw,
- self.cfg.entry_dev_floor_min,
- self.cfg.entry_dev_floor_max,
- )
-
- tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio
- tp_floor = self._clamp(
- tp_floor_raw,
- self.cfg.tp_floor_min,
- self.cfg.tp_floor_max,
- )
-
- sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio
- sl_floor = self._clamp(
- sl_floor_raw,
- self.cfg.sl_floor_min,
- self.cfg.sl_floor_max,
- )
-
- # 最终阈值
- entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio
- entry_dev = max(entry_floor, entry_dev_atr_part)
-
- tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio
- tp = max(tp_floor, tp_atr_part)
-
- sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio
- sl = max(sl_floor, sl_atr_part)
-
- entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min)
-
- logger.info(
- f"动态阈值: entry={entry_dev * 100:.4f}%, tp={tp * 100:.4f}%, sl={sl * 100:.4f}%, "
- f"vol_scale={vol_scale:.2f}"
- )
-
- return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor
-
- # ----------------- 账户仓位管理 -----------------
- def get_assets_available(self) -> float:
- """获取可用余额"""
- try:
- resp = self.contractAPI.get_assets_detail()[0]
- if resp.get("code") != 1000:
- return 0.0
- data = resp.get("data")
- if isinstance(data, dict):
- return float(data.get("available_balance", 0))
- if isinstance(data, list):
- for asset in data:
- if asset.get("currency") == "USDT":
- return float(asset.get("available_balance", 0))
- return 0.0
- except Exception as e:
- logger.error(f"余额查询异常: {e}")
- return 0.0
-
- def get_position_status(self) -> bool:
- """获取持仓状态"""
- try:
- resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
- if resp.get("code") != 1000:
- return False
-
- positions = resp.get("data", [])
- if not positions:
- self.pos = 0
- return True
-
- p = positions[0]
- self.pos = 1 if p["position_type"] == 1 else -1
- return True
- except Exception as e:
- logger.error(f"持仓查询异常: {e}")
- return False
-
- def get_equity_proxy(self) -> float:
- """获取权益"""
- return self.get_assets_available()
-
- def refresh_daily_baseline(self):
- """刷新日内基准"""
- today = datetime.date.today()
- if today != self.day_tag:
- self.day_tag = today
- self.day_start_equity = None
- self.trading_enabled = True
- self.today_trade_count = 0
- self.consecutive_losses = 0
- logger.info(f"新的一天({today}):重置日内风控基准")
-
- def risk_kill_switch(self):
- """风险控制开关"""
- self.refresh_daily_baseline()
- equity = self.get_equity_proxy()
- if equity <= 0:
- return
-
- if self.day_start_equity is None:
- self.day_start_equity = equity
- logger.info(f"日内权益基准设定:{equity:.2f} USDT")
- return
-
- pnl = (equity - self.day_start_equity) / self.day_start_equity
- if pnl <= -self.cfg.daily_loss_limit:
- self.trading_enabled = False
- self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
-
- if pnl >= self.cfg.daily_profit_cap:
- self.trading_enabled = False
- self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
-
- def calculate_size(self, price: float) -> int:
- """计算仓位大小"""
- bal = self.get_assets_available()
- if bal < self.cfg.fixed_margin:
- logger.warning(f"余额不足:{bal:.2f} USDT < {self.cfg.fixed_margin} USDT")
- return 0
-
- margin = self.cfg.fixed_margin
- lev = int(self.cfg.leverage)
- size = int((margin * lev) / (price * 0.001))
- size = max(self.cfg.min_size, size)
- size = min(self.cfg.max_size, size)
-
- logger.debug(f"计算仓位:保证金={margin}u, 杠杆={lev}x, size={size}")
- return size
-
- # ----------------- 浏览器操作 -----------------
- def openBrowser(self) -> bool:
- """打开浏览器"""
- try:
- bit_port = openBrowser(id=self.bit_id)
- co = ChromiumOptions()
- co.set_local_port(port=bit_port)
- self.page = ChromiumPage(addr_or_opts=co)
- return True
- except Exception as e:
- logger.error(f"打开浏览器失败: {e}")
- return False
-
- def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
- """安全点击"""
- try:
- ele = self.page.ele(xpath)
- if not ele:
- return False
- ele.scroll.to_see(center=True)
- time.sleep(sleep)
- ele.click()
- return True
- except Exception as e:
- logger.error(f"点击失败: {e}")
- return False
-
- # ----------------- 订单执行 -----------------
- def place_market_order(self, side: int, size: int) -> bool:
- """下市价单"""
- if size <= 0:
- return False
- size = 10
- try:
- # 确保size在合理范围内
- size = max(self.cfg.min_size, min(self.cfg.max_size, size))
-
- # 开多单
- if side == 1:
- self.click_safe('x://button[normalize-space(text()) ="市价"]')
- self.page.ele('x://*[@id="size_0"]').input(str(size), clear=True)
- self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
- logger.info(f"✅ 开多单: size={size}")
- return True
-
- # 开空单
- elif side == 4:
- self.click_safe('x://button[normalize-space(text()) ="市价"]')
- self.page.ele('x://*[@id="size_0"]').input(str(size), clear=True)
- self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
- logger.info(f"✅ 开空单: size={size}")
- return True
-
- # 平多单
- elif side == 2:
- self.click_safe('x://span[normalize-space(text()) ="市价"]')
- time.sleep(0.3)
- self.page.ele('x://*[@id="size_0"]').input(str(size), clear=True)
- time.sleep(0.3)
- self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
- logger.info(f"✅ 平多单: size={size}")
- return True
-
- # 平空单
- elif side == 3:
- self.click_safe('x://span[normalize-space(text()) ="市价"]')
- time.sleep(0.3)
- self.page.ele('x://*[@id="size_0"]').input(str(size), clear=True)
- time.sleep(0.3)
- self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
- logger.info(f"✅ 平空单: size={size}")
- return True
-
- else:
- logger.error(f"未知的订单方向: side={side}")
- return False
-
- except Exception as e:
- logger.error(f"下单异常: {e}")
- return False
-
- def calculate_fee(self, position_value: float) -> float:
- """计算手续费(考虑90%返佣)"""
- platform_fee = position_value * self.cfg.platform_fee_rate
- actual_fee = platform_fee * (1 - self.cfg.rebate_rate)
- return actual_fee
-
- def calculate_net_pnl(self, price: float) -> Tuple[float, float, float]:
- """计算净盈亏"""
- if self.pos == 0 or self.entry_price is None or self.entry_margin is None or self.entry_position_value is None:
- return 0.0, 0.0, 0.0
-
- leverage = int(self.cfg.leverage)
-
- # 计算价格变动比例
- if self.pos == 1:
- price_change_ratio = (price - self.entry_price) / self.entry_price
- else:
- price_change_ratio = (self.entry_price - price) / self.entry_price
-
- # 计算毛盈亏比例(考虑杠杆)
- gross_pnl_ratio = leverage * price_change_ratio
-
- # 计算手续费
- entry_fee = self.calculate_fee(self.entry_position_value)
- exit_position_value = self.entry_position_value
- exit_fee = self.calculate_fee(exit_position_value)
- total_fee = entry_fee + exit_fee
-
- # 手续费相对于保证金的比率
- fee_ratio = total_fee / self.entry_margin
-
- # 净盈亏 = 毛盈亏 - 手续费比率
- net_pnl_ratio = gross_pnl_ratio - fee_ratio
-
- return net_pnl_ratio, total_fee, gross_pnl_ratio
-
- def close_position_all(self) -> bool:
- """平掉所有持仓"""
- if self.pos == 0:
- logger.info("当前无持仓,无需平仓")
- return True
-
- max_retries = 3
- retry_delay = 1.0
-
- for attempt in range(1, max_retries + 1):
- logger.info(f"平仓尝试 {attempt}/{max_retries}...")
- old_pos = self.pos
-
- # 执行平仓操作
- if self.pos == 1:
- ok = self.place_market_order(2, 999999)
- if not ok:
- logger.warning(f"平多单操作失败 (尝试 {attempt}/{max_retries})")
- if attempt < max_retries:
- time.sleep(retry_delay)
- continue
- else:
- self.ding(f"平多单失败:已重试{max_retries}次仍失败", error=True)
- return False
- elif self.pos == -1:
- ok = self.place_market_order(3, 999999)
- if not ok:
- logger.warning(f"平空单操作失败 (尝试 {attempt}/{max_retries})")
- if attempt < max_retries:
- time.sleep(retry_delay)
- continue
- else:
- self.ding(f"平空单失败:已重试{max_retries}次仍失败", error=True)
- return False
- else:
- return True
-
- # 等待订单执行
- time.sleep(1.5)
-
- # 验证是否平仓成功
- verify_success = self._verify_position_closed(old_pos)
- if verify_success:
- self.pos = 0
- logger.success(f"✅ 平仓成功 (尝试 {attempt}/{max_retries})")
- return True
- else:
- logger.warning(f"平仓验证失败 (尝试 {attempt}/{max_retries})")
- if attempt < max_retries:
- time.sleep(retry_delay)
- self.get_position_status()
- else:
- self.ding(f"平仓失败:已重试{max_retries}次", error=True)
- return False
-
- return False
-
- def _verify_position_closed(self, expected_old_pos: int) -> bool:
- """验证持仓是否已平仓"""
- try:
- resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
-
- if resp.get("code") != 1000:
- logger.warning(f"查询持仓状态失败: {resp}")
- return False
-
- positions = resp.get("data", [])
- if not positions or len(positions) == 0:
- logger.info("✅ SDK验证:持仓已平仓")
- return True
-
- for p in positions:
- position_type = p.get("position_type", 0)
- current_pos = 1 if position_type == 1 else -1
- if current_pos == expected_old_pos:
- logger.warning(f"⚠️ SDK验证:持仓仍存在")
- return False
-
- logger.info("✅ SDK验证:持仓已平仓或已改变")
- return True
-
- except Exception as e:
- logger.error(f"验证持仓状态异常: {e}")
- return False
-
- # ----------------- 止损后机制 -----------------
- def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
- """检查是否需要应用重新入场惩罚"""
- if self.last_sl_dir == 0:
- return False
-
- if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
- self.last_sl_dir = 0
- return False
-
- reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
- if abs(dev) <= reset_band:
- self.last_sl_dir = 0
- return False
-
- return True
-
- def _post_sl_dynamic_mult(self) -> float:
- """计算止损后SL放宽倍数"""
- if self.post_sl_dir == 0:
- return 1.0
-
- if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
- self.post_sl_dir = 0
- self.post_sl_vol_scale = 1.0
- return 1.0
-
- raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
- raw = max(1.0, raw)
- return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
-
- # ----------------- 交易逻辑(核心修复) -----------------
- def in_cooldown(self) -> bool:
- """检查是否在冷却期内"""
- return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit
-
- def maybe_enter(self, price: float, ema_value: float, entry_dev: float):
- """
- 检查并执行入场 - 修复版
- 修复问题:避免平仓后立即开反向仓位
- """
- # 基础检查
- if self.pos != 0:
- return
-
- if self.in_cooldown():
- return
-
- if not self.trading_enabled:
- return
-
- # 连续亏损控制
- if self.consecutive_losses >= self.cfg.max_consecutive_losses:
- logger.warning(f"连续亏损{self.consecutive_losses}次,暂停交易{self.cfg.recovery_wait_sec}秒")
- time.sleep(self.cfg.recovery_wait_sec)
- return
-
- # 日内交易次数限制
- if self.today_trade_count >= self.cfg.max_daily_trades:
- logger.info(f"达到日交易次数上限: {self.today_trade_count}")
- return
-
- # 趋势过滤
- if not self.check_trend_filter(price):
- return
-
- dev = (price - ema_value) / ema_value if ema_value else 0.0
- size = self.calculate_size(price)
- if size <= 0:
- return
-
- # 方向过滤:避免平仓后立即反向开仓
- if self.last_exit_direction != 0:
- time_since_exit = time.time() - self.last_exit_ts
-
- # 如果是平多后(做空方向),且在冷却期内,避免立即开多
- if (self.last_exit_direction == 1 and # 上次是多平
- time_since_exit < self.cfg.opposite_direction_cooldown and
- dev <= 0): # 当前适合开多
- logger.info(f"平多后{int(time_since_exit)}秒内,避免开多")
- return
-
- # 如果是平空后(做多方向),且在冷却期内,避免立即开空
- if (self.last_exit_direction == -1 and # 上次是空平
- time_since_exit < self.cfg.opposite_direction_cooldown and
- dev >= 0): # 当前适合开空
- logger.info(f"平空后{int(time_since_exit)}秒内,避免开空")
- return
-
- penalty_active = self._reentry_penalty_active(dev, entry_dev)
- long_th = -entry_dev
- short_th = entry_dev
-
- if penalty_active:
- if self.last_sl_dir == 1:
- long_th = -entry_dev * self.cfg.reentry_penalty_mult
- elif self.last_sl_dir == -1:
- short_th = entry_dev * self.cfg.reentry_penalty_mult
-
- logger.info(
- f"入场检查: 偏离={dev * 100:.3f}%, 阈值=[{long_th * 100:.3f}%, {short_th * 100:.3f}%], "
- f"size={size}, 连续亏损={self.consecutive_losses}"
- )
-
- # 开多条件:价格显著低于EMA
- if dev <= long_th:
- if self.place_market_order(1, size):
- self.pos = 1
- self.entry_price = price
- self.entry_ts = time.time()
- self.entry_margin = self.cfg.fixed_margin
- self.entry_position_value = self.entry_margin * int(self.cfg.leverage)
- self.ding(f"✅开多:偏离={dev * 100:.3f}%, 价格={price:.2f}")
-
- # 开空条件:价格显著高于EMA
- elif dev >= short_th:
- if self.place_market_order(4, size):
- self.pos = -1
- self.entry_price = price
- self.entry_ts = time.time()
- self.entry_margin = self.cfg.fixed_margin
- self.entry_position_value = self.entry_margin * int(self.cfg.leverage)
- self.ding(f"✅开空:偏离={dev * 100:.3f}%, 价格={price:.2f}")
-
- def maybe_exit(self, price: float, ema_value: float, tp: float, sl: float, vol_scale: float):
- """
- 检查并执行出场
- 记录平仓方向,用于后续入场过滤
- """
- if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
- return
-
- hold = time.time() - self.entry_ts
- net_pnl, total_fee, gross_pnl = self.calculate_net_pnl(price)
- dev = (price - ema_value) / ema_value if ema_value else 0.0
-
- # 计算止损倍数
- sl_mult = 1.0
- if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
- sl_mult = self._post_sl_dynamic_mult()
- effective_sl = sl * sl_mult
-
- # 记录平仓前的方向
- old_pos = self.pos
-
- # 条件1:正常平仓(价格回归EMA)
- if abs(dev) <= self.cfg.normal_exit_threshold:
- if net_pnl >= self.cfg.normal_exit_min_profit: # 净盈利达到最小要求
- if self.close_position_all():
- self._record_exit(price, old_pos, net_pnl, total_fee, gross_pnl, "正常平仓")
- return
- else:
- logger.error("正常平仓失败")
- return
-
- # 条件2:止盈
- if gross_pnl >= tp:
- if net_pnl > 0: # 扣除手续费后仍有盈利
- if self.close_position_all():
- self._record_exit(price, old_pos, net_pnl, total_fee, gross_pnl, "止盈")
- return
- else:
- logger.error("止盈平仓失败")
- return
-
- # 条件3:止损
- if gross_pnl <= -effective_sl:
- if self.close_position_all():
- self._record_exit(price, old_pos, net_pnl, total_fee, gross_pnl, "止损")
-
- # 记录止损状态
- sl_dir = old_pos
- self.last_sl_dir = sl_dir
- self.last_sl_ts = time.time()
- self.post_sl_dir = sl_dir
- self.post_sl_ts = time.time()
- self.post_sl_vol_scale = float(vol_scale)
- return
- else:
- logger.error("止损平仓失败")
- return
-
- # 条件4:超时平仓
- if hold >= self.cfg.max_hold_sec:
- if net_pnl > 0: # 扣除手续费后仍有盈利
- if self.close_position_all():
- self._record_exit(price, old_pos, net_pnl, total_fee, gross_pnl, "超时平仓")
- return
- else:
- logger.error("超时平仓失败")
- return
- else:
- logger.debug(f"超时但净盈亏为负,继续持有")
-
- def _record_exit(self, price: float, old_pos: int, net_pnl: float,
- total_fee: float, gross_pnl: float, exit_type: str):
- """记录平仓信息"""
- self.entry_price = None
- self.entry_ts = None
- self.entry_margin = None
- self.entry_position_value = None
- self.last_exit_ts = time.time()
- self.last_exit_direction = old_pos
- self.last_exit_price = price
- self.today_trade_count += 1
-
- # 更新连续亏损计数
- if net_pnl < 0:
- self.consecutive_losses += 1
- else:
- self.consecutive_losses = 0
-
- self.ding(
- f"📊{exit_type}:方向={'多' if old_pos == 1 else '空'}, "
- f"毛盈亏={gross_pnl * 100:.3f}%, 净盈亏={net_pnl * 100:.3f}%, "
- f"手续费={total_fee:.4f}u, 价格={price:.2f}"
- )
-
- # ----------------- 状态通知 -----------------
- def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
- atr_ratio: float, base_ratio: float, vol_scale: float,
- entry_dev: float, tp: float, sl: float,
- entry_floor: float, tp_floor: float, sl_floor: float):
- """限频状态通知"""
- now = time.time()
- if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
- return
- self._last_status_notify_ts = now
-
- direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无")
-
- # 计算当前盈亏
- current_net_pnl, current_fee, current_gross_pnl = 0.0, 0.0, 0.0
- if self.pos != 0 and self.entry_price:
- current_net_pnl, current_fee, current_gross_pnl = self.calculate_net_pnl(price)
-
- msg = (
- f"【BitMart {self.cfg.contract_symbol}|1m均值回归(90%返佣优化)】\n"
- f"📊 状态:{direction_str} | 今日交易:{self.today_trade_count}/{self.cfg.max_daily_trades}\n"
- f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}:{ema_value:.2f} | 偏离:{dev * 100:.3f}%\n"
- f"🎯 阈值:入场±{entry_dev * 100:.3f}% | 止盈{tp * 100:.3f}% | 止损{sl * 100:.3f}%\n"
- f"🌊 波动率:ATR={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n"
- f"💳 余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x | 保证金:{self.cfg.fixed_margin}u\n"
- )
-
- if self.pos != 0:
- msg += (
- f"📈 当前盈亏:毛={current_gross_pnl * 100:.3f}% | 净={current_net_pnl * 100:.3f}% | "
- f"手续费={current_fee:.4f}u\n"
- f"⚠️ 连续亏损:{self.consecutive_losses}次\n"
- )
-
- self.ding(msg)
-
- # ----------------- 主循环 -----------------
- def action(self):
- """主循环"""
- if not self.set_leverage():
- self.ding("杠杆设置失败,停止运行", error=True)
- return
-
- # 打开浏览器
- if not self.openBrowser():
- self.ding("打开 TGE 失败!", error=True)
- return
- logger.info("TGE 端口获取成功")
-
- self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
- time.sleep(3) # 等待页面加载
-
- logger.info("策略启动 - 90%返佣优化版")
- logger.info("策略特点:低阈值、高频次、严格方向过滤")
-
- while True:
- now_dt = datetime.datetime.now()
- self.pbar.n = now_dt.second
- self.pbar.refresh()
-
- # 1. 获取K线数据
- klines = self.get_klines_cached()
- if not klines or len(klines) < (self.cfg.ema_len + 5):
- logger.warning("K线数据不足,等待...")
- time.sleep(1)
- continue
-
- # 2. 计算技术指标
- last_k = klines[-1]
- closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
- ema_value = self.ema(closes, self.cfg.ema_len)
- price = self.get_last_price(fallback_close=float(last_k["close"]))
- dev = (price - ema_value) / ema_value if ema_value else 0.0
-
- # 3. 计算波动率指标
- a = self.atr(klines, self.cfg.atr_len)
- atr_ratio = (a / price) if price > 0 else 0.0
- base_ratio = self.get_base_ratio_cached(klines)
-
- # 4. 计算动态阈值
- entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds(
- atr_ratio, base_ratio
- )
-
- # 5. 风控检查
- self.risk_kill_switch()
-
- # 6. 获取持仓状态
- if not self.get_position_status():
- time.sleep(1)
- continue
-
- # 7. 检查交易是否启用
- if not self.trading_enabled:
- if self.pos != 0:
- if self.close_position_all():
- logger.info("交易被禁用,已平仓")
- else:
- logger.error("交易被禁用,但平仓失败")
- logger.warning("交易被禁用,等待...")
- time.sleep(5)
- continue
-
- # 8. 检查危险市场
- if self.is_danger_market(klines, price):
- logger.warning("危险模式,暂停开仓")
- self.maybe_exit(price, ema_value, tp, sl, vol_scale)
- time.sleep(self.cfg.tick_refresh_sec)
- continue
-
- # 9. 执行交易逻辑
- # 先检查平仓
- self.maybe_exit(price, ema_value, tp, sl, vol_scale)
- # 再检查开仓
- self.maybe_enter(price, ema_value, entry_dev)
-
- # 10. 状态通知
- bal = self.get_assets_available()
- self.notify_status_throttled(
- price, ema_value, dev, bal,
- atr_ratio, base_ratio, vol_scale,
- entry_dev, tp, sl,
- entry_floor, tp_floor, sl_floor
- )
-
- time.sleep(self.cfg.tick_refresh_sec)
-
-
-if __name__ == "__main__":
- """
- Windows PowerShell:
- setx BITMART_API_KEY "你的key"
- setx BITMART_SECRET_KEY "你的secret"
- setx BITMART_MEMO "合约交易"
- 重新打开终端再运行。
-
- Linux/macOS:
- export BITMART_API_KEY="你的key"
- export BITMART_SECRET_KEY="你的secret"
- export BITMART_MEMO "合约交易"
- """
- cfg = StrategyConfig()
- bot = BitmartFuturesMeanReversionBot(cfg, bit_id="f2320f57e24c45529a009e1541e25961")
-
- # 设置日志级别
- logger.remove()
- logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO")
-
- logger.info("""
- ====================================================
- 90%返佣优化策略说明:
-
- 策略特点:
- 1. 针对90%返佣优化,阈值降低30-50%
- 2. 增加方向过滤,避免平仓后立即反向开仓
- 3. 冷却时间从20秒减至5秒,提高交易频率
- 4. 添加连续亏损控制和趋势过滤
- 5. 日内交易次数限制:50次
-
- 核心修复:
- 1. 避免平仓后立即反向开仓(原策略最大问题)
- 2. 修复平仓后可能立即开反向仓位的问题
- 3. 增加对同方向冷却的控制
- ====================================================
- """)
-
- try:
- bot.action()
- except KeyboardInterrupt:
- logger.info("程序被用户中断")
- bot.ding("🤖 策略已手动停止")
- except Exception as e:
- logger.error(f"程序异常退出: {e}")
- bot.ding(f"❌ 策略异常退出: {e}", error=True)
- raise
diff --git a/bitmart/抓取数据_30分钟.py b/bitmart/抓取数据_30分钟.py
deleted file mode 100644
index b1c52f2..0000000
--- a/bitmart/抓取数据_30分钟.py
+++ /dev/null
@@ -1,169 +0,0 @@
-"""
-BitMart 15分钟K线数据抓取脚本
-从 BitMart API 获取15分钟K线数据并存储到数据库
-"""
-
-import time
-from loguru import logger
-from bitmart.api_contract import APIContract
-from models.bitmart_15 import BitMart15
-
-
-class BitMartDataCollector:
- def __init__(self):
- self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
- self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
- self.memo = "数据抓取"
- self.contract_symbol = "ETHUSDT"
- self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
-
- def get_klines(self, start_time=None, end_time=None, limit=200):
- """
- 获取K线数据
- :param start_time: 开始时间戳(秒级)
- :param end_time: 结束时间戳(秒级)
- :param limit: 获取数量限制
- :return: K线数据列表
- """
- try:
- if not end_time:
- end_time = int(time.time())
- if not start_time:
- start_time = end_time - 3600 * 24 * 1 # 默认获取最近7天
-
- response = self.contractAPI.get_kline(
- contract_symbol=self.contract_symbol,
- step=15, # 15分钟
- start_time=start_time,
- end_time=end_time
- )[0]
-
- if response['code'] != 1000:
- logger.error(f"获取K线失败: {response}")
- return []
-
- klines = response.get('data', [])
- formatted = []
- for k in klines:
- # BitMart API 返回的时间戳是秒级,需要转换为毫秒级
- # 根据 bitmart/框架.py 中的使用方式,API返回的是秒级时间戳
- timestamp_ms = int(k["timestamp"]) * 1000
-
- formatted.append({
- 'id': timestamp_ms,
- 'open': float(k["open_price"]),
- 'high': float(k["high_price"]),
- 'low': float(k["low_price"]),
- 'close': float(k["close_price"])
- })
-
- # 按时间戳排序
- formatted.sort(key=lambda x: x['id'])
- return formatted
- except Exception as e:
- logger.error(f"获取K线异常: {e}")
- return []
-
- def save_klines(self, klines):
- """
- 保存K线数据到数据库
- :param klines: K线数据列表
- :return: 保存的数量
- """
- saved_count = 0
- for kline in klines:
- try:
- BitMart15.get_or_create(
- id=kline['id'],
- defaults={
- 'open': kline['open'],
- 'high': kline['high'],
- 'low': kline['low'],
- 'close': kline['close'],
- }
- )
- saved_count += 1
- except Exception as e:
- logger.error(f"保存K线数据失败 {kline['id']}: {e}")
-
- return saved_count
-
- def collect_historical_data(self, start_date=None, days=None):
- """
- 抓取历史数据(从指定日期到现在)
- :param start_date: 起始日期字符串,格式 'YYYY-MM-DD',如 '2025-01-01'
- :param days: 如果不指定 start_date,则抓取最近多少天的数据
- """
- import datetime
-
- now = int(time.time())
-
- if start_date:
- # 解析起始日期
- start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
- target_start_time = int(start_dt.timestamp())
- logger.info(f"开始抓取 BitMart {self.contract_symbol} 从 {start_date} 到现在的15分钟K线数据")
- elif days:
- target_start_time = now - 3600 * 24 * days
- logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的15分钟K线数据")
- else:
- target_start_time = now - 3600 * 24 * 30 # 默认30天
- logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 30 天的15分钟K线数据")
-
- # 分批获取,每次获取5天的数据(15分钟K线数据量较大)
- batch_days = 5
- total_saved = 0
- fail_count = 0
- max_fail = 3 # 连续失败超过3次则停止
-
- current_end = now
- while current_end > target_start_time:
- current_start = max(current_end - 3600 * 24 * batch_days, target_start_time)
-
- logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} "
- f"到 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}")
-
- klines = self.get_klines(start_time=current_start, end_time=current_end)
- if klines:
- saved = self.save_klines(klines)
- total_saved += saved
- logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved} 条")
- fail_count = 0 # 重置失败计数
- else:
- fail_count += 1
- logger.warning(f"本批次未获取到数据 (连续失败 {fail_count} 次)")
- if fail_count >= max_fail:
- logger.error(f"连续 {max_fail} 次获取数据失败,可能已达到 API 历史数据限制,停止抓取")
- break
-
- current_end = current_start
- time.sleep(1) # 避免请求过快
-
- logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据")
-
- def collect_realtime_data(self):
- """
- 实时抓取最新数据(用于定时任务)
- """
- logger.info("开始抓取 BitMart 最新15分钟K线数据")
-
- # 获取最近1小时的数据(确保能获取到最新的K线)
- end_time = int(time.time())
- start_time = end_time - 3600 * 2 # 最近2小时
-
- klines = self.get_klines(start_time=start_time, end_time=end_time)
- if klines:
- saved = self.save_klines(klines)
- logger.success(f"保存 {saved} 条最新K线数据")
- else:
- logger.warning("未获取到最新数据")
-
-
-if __name__ == '__main__':
- collector = BitMartDataCollector()
-
- # 抓取从 2025-01-01 到现在的15分钟K线历史数据
- collector.collect_historical_data(start_date='2025-01-01')
-
- # 如果需要实时抓取,可以取消下面的注释
- # collector.collect_realtime_data()
diff --git a/mexc/30分钟.py b/mexc/30分钟.py
index 217b511..7c6fa4c 100644
--- a/mexc/30分钟.py
+++ b/mexc/30分钟.py
@@ -41,6 +41,57 @@ from models.mexc import Mexc30
# ========================= 工具函数 =========================
+# 交易对的最小价格单位(tick size)配置
+# 格式:交易对符号 -> 最小单位
+TICK_SIZE_MAP = {
+ 'SOLUSDT': 0.01,
+ 'BTCUSDT': 0.1,
+ 'ETHUSDT': 0.01,
+ 'BNBUSDT': 0.01,
+ # 可以根据需要添加更多交易对
+}
+
+# 默认最小单位(如果交易对不在配置中)
+DEFAULT_TICK_SIZE = 0.01
+
+
+def get_tick_size(symbol: str) -> float:
+ """
+ 获取交易对的最小价格单位
+ :param symbol: 交易对符号,如 'SOLUSDT'
+ :return: 最小单位,如 0.01
+ """
+ return TICK_SIZE_MAP.get(symbol.upper(), DEFAULT_TICK_SIZE)
+
+
+def adjust_price_for_trade(price: float, direction: str, symbol: str = 'SOLUSDT') -> float:
+ """
+ 根据交易方向调整价格,考虑买卖价差
+
+ 买入(开多/平空):价格 + tick_size
+ 卖出(开空/平多):价格 - tick_size
+
+ :param price: 原始价格(K线开盘价或收盘价)
+ :param direction: 交易方向,'long' 表示买入,'short' 表示卖出
+ :param symbol: 交易对符号,用于获取最小单位
+ :return: 调整后的价格
+ """
+ tick_size = get_tick_size(symbol)
+
+ if direction == 'long':
+ # 买入:价格 + tick_size
+ adjusted = price + tick_size
+ elif direction == 'short':
+ # 卖出:价格 - tick_size
+ adjusted = price - tick_size
+ else:
+ # 未知方向,返回原价
+ adjusted = price
+
+ # 确保价格不会为负
+ return max(adjusted, tick_size)
+
+
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
@@ -109,7 +160,7 @@ def get_data_by_date(model, date_str: str):
# ========================= 回测逻辑 =========================
-def backtest_15m_trend_optimized(dates: List[str]):
+def backtest_15m_trend_optimized(dates: List[str], symbol: str = 'SOLUSDT'):
"""
回测策略逻辑:
1. 开仓条件(信号出现时,下一根K线开盘价开仓):
@@ -182,7 +233,9 @@ def backtest_15m_trend_optimized(dates: List[str]):
if current_position is None:
if direction:
# 信号出现(prev和curr形成信号),在下一根K线(next_bar)的开盘价开仓
- entry_price = float(next_bar['open'])
+ # 应用买卖价差:买入(开多)时价格+0.01,卖出(开空)时价格-0.01
+ raw_price = float(next_bar['open'])
+ entry_price = adjust_price_for_trade(raw_price, direction, symbol)
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
@@ -192,7 +245,7 @@ def backtest_15m_trend_optimized(dates: List[str]):
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
- logger.debug(f"开仓: {stats[signal_key]['name']} {'做多' if direction == 'long' else '做空'} @ {entry_price:.2f}")
+ logger.debug(f"开仓: {stats[signal_key]['name']} {'做多' if direction == 'long' else '做空'} @ {entry_price:.2f} (原始价格: {raw_price:.2f})")
idx += 1
continue
@@ -203,7 +256,11 @@ def backtest_15m_trend_optimized(dates: List[str]):
# 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓
# 策略:遇到反向信号(如持有多单时遇到阴包阳),平仓并反手开仓
if direction and direction != pos_dir:
- exit_price = float(next_bar['open'])
+ # 平仓:持有多单时卖出(价格-0.01),持有空单时买入(价格+0.01)
+ raw_exit_price = float(next_bar['open'])
+ exit_direction = 'short' if pos_dir == 'long' else 'long' # 平仓方向与持仓方向相反
+ exit_price = adjust_price_for_trade(raw_exit_price, exit_direction, symbol)
+
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
@@ -218,17 +275,19 @@ def backtest_15m_trend_optimized(dates: List[str]):
stats[pos_sig_key]['total_profit'] += diff
if diff > 0: stats[pos_sig_key]['wins'] += 1
- # 反手开仓(下一根K线开盘价)
+ # 反手开仓(下一根K线开盘价,应用买卖价差)
+ raw_entry_price = float(next_bar['open'])
+ entry_price = adjust_price_for_trade(raw_entry_price, direction, symbol)
current_position = {
'direction': direction,
'signal': stats[signal_key]['name'],
'signal_key': signal_key,
- 'entry_price': exit_price,
+ 'entry_price': entry_price,
'entry_time': next_bar['id']
}
consecutive_opposite_count = 0 # 重置连续反色计数
stats[signal_key]['count'] += 1
- logger.debug(f"反向信号反手: 平{'做多' if pos_dir == 'long' else '做空'} @ {exit_price:.2f}, 开{'做多' if direction == 'long' else '做空'}")
+ logger.debug(f"反向信号反手: 平{'做多' if pos_dir == 'long' else '做空'} @ {exit_price:.2f} (原始: {raw_exit_price:.2f}), 开{'做多' if direction == 'long' else '做空'} @ {entry_price:.2f} (原始: {raw_entry_price:.2f})")
idx += 1
continue
@@ -240,7 +299,9 @@ def backtest_15m_trend_optimized(dates: List[str]):
# 如果已经连续两根阴线,下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
logger.debug(f"平仓: 做多遇到连续两根阴线")
- exit_price = float(next_bar['open'])
+ # 平多单:卖出,价格 - 0.01
+ raw_exit_price = float(next_bar['open'])
+ exit_price = adjust_price_for_trade(raw_exit_price, 'short', symbol)
diff = exit_price - current_position['entry_price']
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
@@ -268,7 +329,9 @@ def backtest_15m_trend_optimized(dates: List[str]):
# 如果已经连续两根阳线,下一根K线开盘价平仓
if consecutive_opposite_count >= 2:
logger.debug(f"平仓: 做空遇到连续两根阳线")
- exit_price = float(next_bar['open'])
+ # 平空单:买入,价格 + 0.01
+ raw_exit_price = float(next_bar['open'])
+ exit_price = adjust_price_for_trade(raw_exit_price, 'long', symbol)
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
@@ -305,8 +368,12 @@ def backtest_15m_trend_optimized(dates: List[str]):
# 尾仓:最后一根收盘价平仓
if current_position:
last = all_data[-1]
- exit_price = float(last['close'])
pos_dir = current_position['direction']
+ # 平仓:持有多单时卖出(价格-0.01),持有空单时买入(价格+0.01)
+ raw_exit_price = float(last['close'])
+ exit_direction = 'short' if pos_dir == 'long' else 'long' # 平仓方向与持仓方向相反
+ exit_price = adjust_price_for_trade(raw_exit_price, exit_direction, symbol)
+
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
@@ -346,7 +413,9 @@ if __name__ == '__main__':
print(dates)
# dates = [f"2025-09-{i}" for i in range(1, 32)]
- trades, stats = backtest_15m_trend_optimized(dates)
+ # 指定交易对符号,用于获取正确的最小价格单位
+ symbol = 'SOLUSDT'
+ trades, stats = backtest_15m_trend_optimized(dates, symbol=symbol)
logger.info("===== 每笔交易详情 =====")