This commit is contained in:
27942
2026-02-01 19:50:32 +08:00
parent c2e6d6d78d
commit b5d4f6dfdc
10 changed files with 82 additions and 7652 deletions

View File

@@ -1,986 +0,0 @@
"""
BitMart 被动做市/高频刷单策略
核心逻辑:在盘口两侧不断挂单,赚取价差+返佣
使用浏览器自动化下单,获取高返佣
"""
import time
from loguru import logger
from threading import Lock
from dataclasses import dataclass
from bitmart.api_contract import APIContract
from typing import Optional, Dict, List, Tuple
from DrissionPage import ChromiumPage, ChromiumOptions
from bit_tools import openBrowser
# ================================================================
# 📊 配置类
# ================================================================
@dataclass
class MarketMakingConfig:
bit_id: str = "f2320f57e24c45529a009e1541e25961"
"""做市策略配置"""
# API配置仅用于查询不下单
api_key: str = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key: str = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
memo: str = "合约交易"
contract_symbol: str = "ETHUSDT"
# 浏览器配置
tge_id: int = 196495 # TGE浏览器ID
tge_url: str = "http://127.0.0.1:50326"
tge_headers: Dict = None
trading_url: str = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 做市参数
spread_percent: float = 0.01 # 价差百分比0.01% = 买一卖一之间)
order_size_usdt: float = 10.0 # 每单金额USDT
max_position_usdt: float = 100.0 # 最大持仓金额USDT
# 订单管理
order_refresh_interval: float = 2.0 # 订单刷新间隔(秒)
order_timeout: float = 60.0 # 订单超时时间(秒),超时后撤单重新挂
# 风险控制
max_daily_loss: float = 50.0 # 每日最大亏损USDT
max_daily_trades: int = 1000 # 每日最大交易次数
# 杠杆和模式
leverage: str = "30" # 杠杆倍数
open_type: str = "cross" # 全仓模式
def __post_init__(self):
"""初始化TGE headers"""
if self.tge_headers is None:
self.tge_headers = {
"Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# ================================================================
# 📊 订单簿数据结构
# ================================================================
@dataclass
class OrderBook:
"""订单簿数据"""
bids: List[Tuple[float, float]] # [(价格, 数量), ...] 买盘,价格从高到低
asks: List[Tuple[float, float]] # [(价格, 数量), ...] 卖盘,价格从低到高
timestamp: float
@property
def best_bid(self) -> Optional[float]:
"""买一价"""
return self.bids[0][0] if self.bids else None
@property
def best_ask(self) -> Optional[float]:
"""卖一价"""
return self.asks[0][0] if self.asks else None
@property
def spread(self) -> Optional[float]:
"""价差"""
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
@property
def mid_price(self) -> Optional[float]:
"""中间价"""
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return None
@dataclass
class PendingOrder:
"""pending订单信息"""
order_id: str
side: str # "buy" or "sell"
price: float
size: float
create_time: float
status: str # "pending", "filled", "cancelled"
# ================================================================
# 📊 浏览器管理器
# ================================================================
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, config: MarketMakingConfig, bit_id="f2320f57e24c45529a009e1541e25961"):
self.bit_id = "f2320f57e24c45529a009e1541e25961"
self.config = config
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def open_browser(self) -> bool:
"""打开浏览器并获取端口"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
logger.success("成功接管浏览器")
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
closed_count = 0
for idx, tab in enumerate(tabs):
if idx == 0:
continue
tab.close()
closed_count += 1
if closed_count > 0:
logger.info(f"已关闭{closed_count}个多余标签页")
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
# ================================================================
# 📊 浏览器交易执行器
# ================================================================
class BrowserTradingExecutor:
"""浏览器交易执行器:通过浏览器自动化下单(获取高返佣)"""
def __init__(self, page: ChromiumPage):
self.page = page
def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except Exception as e:
logger.error(f"点击失败 {xpath}: {e}")
return False
def 开单(self, marketPriceLongOrder: int = 0, limitPriceShortOrder: int = 0,
size: Optional[float] = None, price: Optional[float] = None) -> bool:
size = 0.1
"""
开单操作(通过浏览器自动化,获取高返佣)
Args:
marketPriceLongOrder: 市价最多或者做空1是最多-1是做空
limitPriceShortOrder: 限价最多或者做空1是最多-1是做空
size: 数量
price: 价格(限价单需要)
Returns:
是否成功
"""
try:
# 市价单
if marketPriceLongOrder == -1:
# 市价做空
if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
return False
self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
return False
logger.success(f"市价做空成功: {size}")
return True
elif marketPriceLongOrder == 1:
# 市价做多
if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
return False
self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
return False
logger.success(f"市价做多成功: {size}")
return True
# 限价单
if limitPriceShortOrder == -1:
# 限价做空
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
return False
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
return False
logger.success(f"限价做空成功: {size} @ {price}")
return True
elif limitPriceShortOrder == 1:
# 限价做多
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
return False
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
return False
logger.success(f"限价做多成功: {size} @ {price}")
return True
return False
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def 平仓(self) -> bool:
"""市价平仓"""
try:
if self.click_safe('x://span[normalize-space(text()) ="市价"]'):
logger.success("平仓成功")
return True
return False
except Exception as e:
logger.error(f"平仓异常: {e}")
return False
def place_limit_order(self, side: str, price: float, size: float) -> bool:
"""
下限价单(通过浏览器)
Args:
side: "buy""sell"
price: 价格
size: 数量(张数)
Returns:
是否成功
"""
try:
# size已经是张数直接使用
if side == "buy":
# 限价做多
return self.开单(limitPriceShortOrder=1, size=size, price=price)
else:
# 限价做空
return self.开单(limitPriceShortOrder=-1, size=size, price=price)
except Exception as e:
logger.error(f"限价下单异常: {e}")
return False
# ================================================================
# 📊 BitMart API 封装(仅用于查询,不下单)
# ================================================================
class BitMartMarketMakerAPI:
"""BitMart做市API封装仅用于查询不下单"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.contractAPI = APIContract(
config.api_key,
config.secret_key,
config.memo,
timeout=(5, 15)
)
def get_order_book(self, depth: int = 20) -> Optional[OrderBook]:
"""
获取订单簿
Args:
depth: 深度数量(可能不使用)
Returns:
OrderBook对象或None
"""
try:
# BitMart合约API获取深度数据
# 根据错误信息get_depth()不接受size参数
# 尝试不同的调用方式
try:
# 方法1不传深度参数使用默认值最可能的方式
response = self.contractAPI.get_depth(
contract_symbol=self.config.contract_symbol
)[0]
except TypeError as e1:
try:
# 方法2尝试使用 limit 参数
response = self.contractAPI.get_depth(
contract_symbol=self.config.contract_symbol,
limit=depth
)[0]
except TypeError as e2:
try:
# 方法3尝试使用 depth 参数
response = self.contractAPI.get_depth(
contract_symbol=self.config.contract_symbol,
depth=depth
)[0]
except TypeError as e3:
logger.error(f"get_depth()方法调用失败,尝试的参数方式都失败: {e1}, {e2}, {e3}")
return None
if response.get('code') == 1000:
data = response.get('data', {})
# BitMart返回格式可能是不同的需要根据实际调整
bids = []
asks = []
if isinstance(data, dict):
bids_raw = data.get('bids', [])
asks_raw = data.get('asks', [])
# 处理不同格式
for b in bids_raw:
if isinstance(b, (list, tuple)) and len(b) >= 2:
bids.append((float(b[0]), float(b[1])))
elif isinstance(b, dict):
bids.append((float(b.get('price', 0)), float(b.get('size', 0))))
for a in asks_raw:
if isinstance(a, (list, tuple)) and len(a) >= 2:
asks.append((float(a[0]), float(a[1])))
elif isinstance(a, dict):
asks.append((float(a.get('price', 0)), float(a.get('size', 0))))
# 买盘按价格从高到低排序,卖盘按价格从低到高排序
bids.sort(key=lambda x: x[0], reverse=True)
asks.sort(key=lambda x: x[0])
if bids and asks:
return OrderBook(
bids=bids,
asks=asks,
timestamp=time.time()
)
return None
except Exception as e:
logger.error(f"获取订单簿异常: {e}")
# 如果获取订单簿失败,尝试使用最新价格作为备用方案
logger.warning("尝试使用最新价格作为备用方案")
current_price = self.get_current_price()
if current_price:
# 使用当前价格和价差百分比计算买一卖一
spread_amount = current_price * self.config.spread_percent / 100
bids = [(current_price - spread_amount / 2, 1.0)]
asks = [(current_price + spread_amount / 2, 1.0)]
return OrderBook(
bids=bids,
asks=asks,
timestamp=time.time()
)
return None
def get_current_price(self) -> Optional[float]:
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.config.contract_symbol,
step=1, # 1分钟
start_time=end_time - 60,
end_time=end_time
)[0]
if response.get('code') == 1000:
data = response.get('data', [])
if data:
return float(data[-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self) -> Optional[float]:
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response.get('code') == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position(self) -> Optional[Dict]:
"""获取当前持仓"""
try:
response = self.contractAPI.get_position(
contract_symbol=self.config.contract_symbol
)[0]
if response.get('code') == 1000:
positions = response.get('data', [])
if positions:
return positions[0]
return None
return None
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return None
def set_leverage(self) -> bool:
"""设置杠杆和全仓模式"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.config.contract_symbol,
leverage=self.config.leverage,
open_type=self.config.open_type
)[0]
if response.get('code') == 1000:
logger.success(f"全仓模式 + {self.config.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ============== 新增:撤单、平仓 ==============
def get_open_orders(self) -> List[Dict]:
"""获取当前所有挂单"""
try:
resp = self.contractAPI.get_open_order(
contract_symbol=self.config.contract_symbol
)[0]
if resp.get("code") == 1000:
data = resp.get("data", [])
return data if isinstance(data, list) else []
return []
except Exception as e:
logger.error(f"查询挂单异常: {e}")
return []
def cancel_order(self, order_id: str) -> bool:
"""撤销单个挂单"""
try:
resp = self.contractAPI.post_cancel_order(
contract_symbol=self.config.contract_symbol,
order_id=order_id
)[0]
if resp.get("code") == 1000:
logger.success(f"撤单成功: {order_id}")
return True
logger.error(f"撤单失败: {resp}")
return False
except Exception as e:
logger.error(f"撤单异常: {e}")
return False
def cancel_all_orders(self) -> None:
"""撤销所有挂单(无精确超时信息时,直接全撤)"""
open_orders = self.get_open_orders()
for od in open_orders:
oid = str(od.get("order_id") or od.get("id") or "")
if oid:
self.cancel_order(oid)
def close_position(self) -> bool:
"""
使用API平仓市价/近似市价)
逻辑:查询当前持仓,根据方向下相反方向的平仓单
"""
try:
position = self.get_position()
if not position:
logger.info("无持仓,无需平仓")
return True
position_type = int(position.get("position_type", 0)) # 1=多, 2=空
current_amount = float(position.get("current_amount", 0))
if current_amount <= 0:
logger.info("持仓数量为0无需平仓")
return True
# 获取现价作为平仓价格参考
current_price = self.get_current_price()
if not current_price:
logger.error("无法获取现价,平仓失败")
return False
# BitMart合约订单类型3=平多限价4=平空(限价)
if position_type == 1:
order_type = 3 # 平多
elif position_type == 2:
order_type = 4 # 平空
else:
logger.error(f"未知持仓方向: {position_type}")
return False
# 下平仓单
resp = self.contractAPI.post_submit_order(
contract_symbol=self.config.contract_symbol,
type=order_type,
price=str(current_price),
size=str(current_amount)
)[0]
if resp.get("code") == 1000:
logger.success(f"API平仓成功方向={position_type}, 数量={current_amount}, 价格={current_price}")
return True
logger.error(f"API平仓失败: {resp}")
return False
except Exception as e:
logger.error(f"API平仓异常: {e}")
return False
# ================================================================
# 📊 做市策略核心
# ================================================================
class MarketMakingStrategy:
"""被动做市策略(使用浏览器自动化下单,获取高返佣)"""
def __init__(self, config: MarketMakingConfig, bit_id=None):
self.bit_id = bit_id
self.config = config
self.api = BitMartMarketMakerAPI(config) # 仅用于查询
# 浏览器管理
self.browser_manager = BrowserManager(config)
self.trading_executor: Optional[BrowserTradingExecutor] = None
# 订单管理使用时间戳作为订单ID因为浏览器下单无法直接获取订单ID
self.pending_orders: Dict[str, PendingOrder] = {}
self.order_lock = Lock()
# 统计
self.daily_trades = 0
self.daily_profit = 0.0
self.total_trades = 0
self.total_profit = 0.0
# 运行状态
self.running = False
self.last_order_refresh = 0.0
# 初始化浏览器和杠杆
if not self._initialize_browser():
raise Exception("浏览器初始化失败")
self.api.set_leverage()
def _initialize_browser(self) -> bool:
"""初始化浏览器"""
try:
# 打开浏览器
if not self.browser_manager.open_browser():
logger.error("打开浏览器失败")
return False
# # 接管浏览器
# if not self.browser_manager.take_over_browser():
# logger.error("接管浏览器失败")
# return False
# 关闭多余标签页
self.browser_manager.close_extra_tabs()
# 打开交易页面
self.browser_manager.page.get(self.config.trading_url)
time.sleep(2) # 等待页面加载
# 初始化交易执行器
self.trading_executor = BrowserTradingExecutor(self.browser_manager.page)
logger.success("浏览器初始化完成")
return True
except Exception as e:
logger.error(f"浏览器初始化异常: {e}")
return False
def calculate_order_prices(self, order_book: OrderBook) -> Tuple[Optional[float], Optional[float]]:
"""
计算挂单价格
Args:
order_book: 订单簿
Returns:
(buy_price, sell_price)
"""
if not order_book.mid_price or not order_book.best_bid or not order_book.best_ask:
return None, None
mid = order_book.mid_price
spread_amount = mid * self.config.spread_percent / 100
# 买单价格:中间价 - 价差的一半,但不能低于买一
buy_price = mid - spread_amount / 2
buy_price = min(buy_price, order_book.best_bid * 0.9999) # 略低于买一,确保能成交
# 卖单价格:中间价 + 价差的一半,但不能高于卖一
sell_price = mid + spread_amount / 2
sell_price = max(sell_price, order_book.best_ask * 1.0001) # 略高于卖一,确保能成交
# 确保价差合理
if sell_price <= buy_price:
# 如果价差太小,使用买一卖一价格
buy_price = order_book.best_bid * 0.9999
sell_price = order_book.best_ask * 1.0001
return buy_price, sell_price
def should_refresh_orders(self) -> bool:
"""判断是否需要刷新订单"""
now = time.time()
if now - self.last_order_refresh >= self.config.order_refresh_interval:
return True
return False
def cancel_stale_orders(self):
"""撤销超时订单使用API撤单"""
now = time.time()
to_cancel = []
with self.order_lock:
for order_id, order in self.pending_orders.items():
if order.status == "pending":
if now - order.create_time > self.config.order_timeout:
to_cancel.append(order_id)
if not to_cancel:
return
logger.info(f"发现{len(to_cancel)}个超时订单尝试API撤单")
try:
# 先通过API获取真实挂单列表并撤单
self.api.cancel_all_orders()
# 本地状态同步
with self.order_lock:
for order_id in to_cancel:
if order_id in self.pending_orders:
self.pending_orders[order_id].status = "cancelled"
except Exception as e:
logger.error(f"API撤单失败: {e}")
def update_pending_orders(self):
"""更新挂单状态(通过持仓变化判断订单是否成交)"""
try:
# 获取当前持仓
current_position = self.api.get_position()
current_position_type = 0
current_position_amount = 0.0
if current_position:
current_position_type = int(current_position.get('position_type', 0))
current_position_amount = abs(float(current_position.get('current_amount', 0)))
with self.order_lock:
# 检查挂单是否成交(通过持仓变化判断)
for order_id, order in list(self.pending_orders.items()):
if order.status == "pending":
# 检查订单是否超时
if time.time() - order.create_time > self.config.order_timeout:
# 订单超时,标记为取消
order.status = "cancelled"
logger.info(f"订单超时: {order_id} {order.side} @ {order.price}")
continue
except Exception as e:
logger.error(f"更新挂单状态异常: {e}")
def _place_counter_order(self, filled_order: PendingOrder):
"""
订单成交后,在另一侧挂单
Args:
filled_order: 已成交的订单
"""
# 等待一小段时间,确保订单状态更新
time.sleep(0.1)
order_book = self.api.get_order_book()
if not order_book:
logger.warning("无法获取订单簿,无法挂反向单")
return
# 计算反向订单价格
buy_price, sell_price = self.calculate_order_prices(order_book)
if filled_order.side == "buy":
# 买单成交,挂卖单
if sell_price and self.trading_executor:
contract_size = self.config.order_size_usdt / sell_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor.place_limit_order("sell", sell_price, contract_size):
order_id = f"sell_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="sell",
price=sell_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"买单成交后挂卖单: {sell_price}, 订单ID: {order_id}")
else:
logger.warning("买单成交后挂卖单失败")
else:
# 卖单成交,挂买单平空或开多
if buy_price and self.trading_executor:
contract_size = self.config.order_size_usdt / buy_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor.place_limit_order("buy", buy_price, contract_size):
order_id = f"buy_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="buy",
price=buy_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"卖单成交后挂买单: {buy_price}, 订单ID: {order_id}")
else:
logger.warning("卖单成交后挂买单失败")
def place_market_making_orders(self):
"""放置做市订单"""
# 获取订单簿
order_book = self.api.get_order_book()
if not order_book or not order_book.mid_price:
logger.warning("无法获取订单簿")
return
# 检查持仓
position = self.api.get_position()
position_value = 0.0
if position:
current_price = order_book.mid_price
position_amount = abs(float(position.get('current_amount', 0)))
# 计算持仓价值USDT
position_value = position_amount * current_price
# 如果持仓超过限制,只挂反向单
if position_value >= self.config.max_position_usdt:
logger.warning(f"持仓超过限制: {position_value} USDT只挂反向单")
# 只挂反向单平仓
if position:
position_type = int(position.get('position_type', 0))
if position_type == 1: # 多仓
# 挂卖单平多
_, sell_price = self.calculate_order_prices(order_book)
if sell_price and self.trading_executor:
contract_size = self.config.order_size_usdt / sell_price / 0.01
if contract_size < 1:
contract_size = 1
self.trading_executor.place_limit_order("sell", sell_price, contract_size)
elif position_type == 2: # 空仓
# 挂买单平空
buy_price, _ = self.calculate_order_prices(order_book)
if buy_price and self.trading_executor:
contract_size = self.config.order_size_usdt / buy_price / 0.01
if contract_size < 1:
contract_size = 1
self.trading_executor.place_limit_order("buy", buy_price, contract_size)
return
# 计算挂单价格
buy_price, sell_price = self.calculate_order_prices(order_book)
if not buy_price or not sell_price:
return
# 检查当前挂单数量
with self.order_lock:
pending_buy_count = sum(1 for o in self.pending_orders.values()
if o.side == "buy" and o.status == "pending")
pending_sell_count = sum(1 for o in self.pending_orders.values()
if o.side == "sell" and o.status == "pending")
# 如果两侧都有挂单,不重复挂
if pending_buy_count > 0 and pending_sell_count > 0:
return
# 挂买单(通过浏览器)
if pending_buy_count == 0:
# 计算张数(根据合约规格调整)
# 假设页面输入框单位是张数需要将USDT金额转换为张数
# size_usdt / price = ETH数量再除以合约面值得到张数
contract_size = self.config.order_size_usdt / buy_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor and self.trading_executor.place_limit_order("buy", buy_price, contract_size):
# 使用时间戳作为订单ID
order_id = f"buy_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="buy",
price=buy_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"挂买单成功: {buy_price}, 订单ID: {order_id}")
else:
logger.warning("挂买单失败")
# 挂卖单(通过浏览器)
if pending_sell_count == 0:
# 计算张数
contract_size = self.config.order_size_usdt / sell_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor and self.trading_executor.place_limit_order("sell", sell_price, contract_size):
# 使用时间戳作为订单ID
order_id = f"sell_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="sell",
price=sell_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"挂卖单成功: {sell_price}, 订单ID: {order_id}")
else:
logger.warning("挂卖单失败")
self.last_order_refresh = time.time()
def check_risk_limits(self) -> bool:
"""检查风险限制"""
# 检查每日交易次数
if self.daily_trades >= self.config.max_daily_trades:
logger.warning(f"达到每日最大交易次数: {self.daily_trades}")
return False
# 检查每日亏损
if self.daily_profit <= -self.config.max_daily_loss:
logger.error(f"达到每日最大亏损: {self.daily_profit}")
# send_dingtalk_message(f"做市策略达到每日最大亏损: {self.daily_profit} USDT", error=True)
return False
return True
def run(self):
"""主运行循环"""
self.running = True
logger.info("做市策略启动")
while self.running:
try:
# 检查风险限制
if not self.check_risk_limits():
logger.error("风险限制触发,停止策略")
break
# 撤销超时订单
self.cancel_stale_orders()
# 更新挂单状态
self.update_pending_orders()
# 刷新订单
if self.should_refresh_orders():
self.place_market_making_orders()
# 短暂休眠
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("收到中断信号,停止策略")
break
except Exception as e:
logger.error(f"策略运行异常: {e}")
time.sleep(1)
# 清理:刷新页面,手动撤销挂单
logger.info("清理挂单...使用API撤单")
try:
self.api.cancel_all_orders()
with self.order_lock:
for order_id in list(self.pending_orders.keys()):
if self.pending_orders[order_id].status == "pending":
self.pending_orders[order_id].status = "cancelled"
except Exception as e:
logger.error(f"清理挂单失败: {e}")
logger.info("做市策略已停止")
def stop(self):
"""停止策略"""
self.running = False
# ================================================================
# 🚀 主程序
# ================================================================
if __name__ == '__main__':
config = MarketMakingConfig(
contract_symbol="ETHUSDT",
bit_id="f2320f57e24c45529a009e1541e25961", # TGE浏览器ID
trading_url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT",
spread_percent=0.01, # 0.01%价差
order_size_usdt=0.1, # 每单10 USDT
max_position_usdt=3.0, # 最大持仓100 USDT
order_refresh_interval=2.0, # 2秒刷新一次
order_timeout=60.0, # 60秒超时
max_daily_loss=50.0, # 每日最大亏损50 USDT
max_daily_trades=1000, # 每日最大1000笔
leverage="35",
open_type="cross"
)
strategy = MarketMakingStrategy(config)
try:
strategy.run()
except Exception as e:
logger.error(f"程序异常: {e}")
# send_dingtalk_message(f"做市策略异常: {e}", error=True)
# 9359,53
# 14.35

View File

@@ -1,458 +0,0 @@
"""
BitMart 被动做市/高频刷单策略 (修复版 V2)
修复内容:
1. 修正 get_order_book 中解析深度数据的方式,由字典键名访问改为列表索引访问 (['price'] -> [0])
"""
import time
import requests
from typing import Optional, Dict, List, Tuple
from dataclasses import dataclass
from loguru import logger
from threading import Lock
from DrissionPage import ChromiumPage, ChromiumOptions
from bitmart.api_contract import APIContract
# ================================================================
# 📊 配置类
# ================================================================
@dataclass
class MarketMakingConfig:
"""做市策略配置"""
# API配置仅用于查询不下单
api_key: str = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key: str = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
memo: str = "合约交易"
contract_symbol: str = "ETHUSDT"
# 浏览器配置
tge_id: int = 196495 # TGE浏览器ID
tge_url: str = "http://127.0.0.1:50326"
tge_headers: Dict = None
trading_url: str = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 做市基础参数
spread_percent: float = 0.04 # 基础价差 (0.04% 约为 $1左右 on ETH)
order_size_usdt: float = 10.0 # 每单金额USDT
max_position_usdt: float = 100.0 # 最大持仓金额USDT
# 🚀 高级策略参数
# 库存倾斜每持有100U价格偏移多少。正数表示持有多单时价格下调利于卖出不利于买入
inventory_skew_factor: float = 0.0005
# 价格容忍度:只有当(目标价 - 当前挂单价) / 目标价 > 0.05% 时才改单,避免频繁操作
price_tolerance: float = 0.0005
# 风险控制
max_daily_loss: float = 50.0
leverage: str = "30"
open_type: str = "cross"
def __post_init__(self):
"""初始化TGE headers"""
if self.tge_headers is None:
self.tge_headers = {
"Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# ================================================================
# 📊 订单簿数据结构
# ================================================================
@dataclass
class OrderBook:
"""订单簿数据"""
bids: List[Tuple[float, float]] # [(价格, 数量), ...]
asks: List[Tuple[float, float]] # [(价格, 数量), ...]
timestamp: float
@property
def mid_price(self) -> Optional[float]:
"""中间价"""
if self.bids and self.asks:
return (self.bids[0][0] + self.asks[0][0]) / 2
return None
# ================================================================
# 📊 浏览器管理器
# ================================================================
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def open_browser(self) -> bool:
"""打开浏览器并获取端口"""
try:
response = requests.post(
f"{self.config.tge_url}/api/browser/start",
json={"envId": self.config.tge_id},
headers=self.config.tge_headers,
timeout=10
)
data = response.json()
if "data" in data and "port" in data["data"]:
self.tge_port = data["data"]["port"]
logger.success(f"成功打开浏览器,端口:{self.tge_port}")
return True
else:
logger.error(f"打开浏览器响应异常: {data}")
return False
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
logger.success("成功接管浏览器")
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
for idx, tab in enumerate(tabs):
if idx == 0: continue
tab.close()
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
# ================================================================
# 📊 浏览器交易执行器
# ================================================================
class BrowserTradingExecutor:
"""浏览器交易执行器:通过浏览器自动化下单(获取高返佣)"""
def __init__(self, page: ChromiumPage):
self.page = page
def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except Exception as e:
logger.error(f"点击失败 {xpath}: {e}")
return False
def 开单(self, marketPriceLongOrder: int = 0, limitPriceShortOrder: int = 0,
size: Optional[float] = None, price: Optional[float] = None) -> bool:
"""开单操作"""
size = 0.1
try:
# 市价单 (代码略)
if marketPriceLongOrder == -1: pass
elif marketPriceLongOrder == 1: pass
# 限价单
if limitPriceShortOrder == -1:
# 限价做空
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'): return False
self.page.ele('x://*[ @id="price_0"]').input(vals=price, clear=True)
time.sleep(0.2)
self.page.ele('x://*[ @id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'): return False
logger.success(f"浏览器下单: 限价做空 {size} @ {price}")
return True
elif limitPriceShortOrder == 1:
# 限价做多
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'): return False
self.page.ele('x://*[ @id="price_0"]').input(vals=price, clear=True)
time.sleep(0.2)
self.page.ele('x://*[ @id="size_0"]').input(vals=size, clear=True)
if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'): return False
logger.success(f"浏览器下单: 限价做多 {size} @ {price}")
return True
return False
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def place_limit_order(self, side: str, price: float, size: float) -> bool:
"""统一接口"""
if side == "buy":
return self.开单(limitPriceShortOrder=1, size=size, price=price)
else:
return self.开单(limitPriceShortOrder=-1, size=size, price=price)
# ================================================================
# 📊 BitMart API 封装 (修复 get_order_book)
# ================================================================
class BitMartMarketMakerAPI:
"""BitMart做市API封装仅用于查询不下单"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.contractAPI = APIContract(
config.api_key,
config.secret_key,
config.memo,
timeout=(5, 15)
)
def get_order_book(self) -> Optional[OrderBook]:
try:
# 移除不支持的 limit 参数
response = self.contractAPI.get_depth(contract_symbol=self.config.contract_symbol)[0]
if response.get('code') == 1000:
data = response.get('data', {})
bids = []
asks = []
# 解析数据
if isinstance(data, dict):
bids_raw = data.get('bids', [])
asks_raw = data.get('asks', [])
# 修复b 是列表 [price, size],不是字典
for b in bids_raw[:10]:
# b[0] 是价格, b[1] 是数量
bids.append((float(b[0]), float(b[1])))
for a in asks_raw[:10]:
# a[0] 是价格, a[1] 是数量
asks.append((float(a[0]), float(a[1])))
if bids and asks:
return OrderBook(bids=bids, asks=asks, timestamp=time.time())
else:
logger.warning(f"获取深度失败: {response}")
return None
except Exception as e:
logger.error(f"获取订单簿异常: {e}")
return None
def get_position_net(self) -> float:
"""获取净持仓 (多为正,空为负)"""
try:
response = self.contractAPI.get_position(contract_symbol=self.config.contract_symbol)[0]
if response.get('code') == 1000:
data = response.get('data', [])
if data:
pos = data[0]
current_amount = float(pos.get('current_amount', 0))
position_type = int(pos.get('position_type', 0)) # 1多 2空
if position_type == 1: return current_amount
if position_type == 2: return -current_amount
return 0.0
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return 0.0
def get_open_orders(self) -> List[Dict]:
"""获取当前挂单"""
try:
resp = self.contractAPI.get_open_order(contract_symbol=self.config.contract_symbol)[0]
if resp.get("code") == 1000:
return resp.get("data", [])
return []
except Exception as e:
logger.error(f"查询挂单异常: {e}")
return []
def cancel_order(self, order_id: str) -> bool:
"""API撤单"""
try:
resp = self.contractAPI.post_cancel_order(contract_symbol=self.config.contract_symbol, order_id=order_id)[0]
return resp.get("code") == 1000
except Exception as e:
logger.error(f"API撤单异常: {e}")
return False
def set_leverage(self):
try:
self.contractAPI.post_submit_leverage(contract_symbol=self.config.contract_symbol, leverage=self.config.leverage, open_type=self.config.open_type)
except:
pass
# ================================================================
# 🧠 策略核心
# ================================================================
class MarketMakingStrategy:
"""优化版被动做市策略"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.api = BitMartMarketMakerAPI(config)
self.browser_manager = BrowserManager(config)
self.trading_executor: Optional[BrowserTradingExecutor] = None
self.running = False
# 初始化流程
if not self._initialize_browser():
raise Exception("浏览器初始化失败")
self.api.set_leverage()
def _initialize_browser(self) -> bool:
try:
if not self.browser_manager.open_browser(): return False
if not self.browser_manager.take_over_browser(): return False
self.browser_manager.close_extra_tabs()
# 访问交易页
logger.info(f"正在访问交易页: {self.config.trading_url}")
self.browser_manager.page.get(self.config.trading_url)
time.sleep(3)
self.trading_executor = BrowserTradingExecutor(self.browser_manager.page)
logger.success("浏览器环境就绪")
return True
except Exception as e:
logger.error(f"浏览器初始化异常: {e}")
return False
def calculate_target_prices(self, mid_price: float, net_position: float) -> Tuple[float, float]:
"""核心算法:计算考虑了库存倾斜的目标买卖价"""
# 1. 基础价差的一半
half_spread = mid_price * (self.config.spread_percent / 100) / 2
# 2. 库存倾斜调整
skew_adjust = net_position * self.config.inventory_skew_factor * mid_price
quote_mid = mid_price - skew_adjust
target_bid = quote_mid - half_spread
target_ask = quote_mid + half_spread
# 3. 价格修正 (防止穿仓)
if target_ask <= target_bid:
target_ask = target_bid + mid_price * 0.0001
return round(target_bid, 2), round(target_ask, 2)
def reconcile_orders(self, target_bid: float, target_ask: float):
"""调节逻辑对比API实际挂单 vs 目标价格"""
open_orders = self.api.get_open_orders()
current_bids = []
current_asks = []
for o in open_orders:
side = o.get('side')
# 兼容API返回
side_str = str(side).lower()
if side_str == '1' or 'buy' in side_str:
current_bids.append(o)
elif side_str == '2' or 'sell' in side_str:
current_asks.append(o)
# --- 调节买单 ---
valid_bid_exists = False
for order in current_bids:
price = float(order.get('price', 0))
diff_pct = abs(price - target_bid) / target_bid
if diff_pct < self.config.price_tolerance:
valid_bid_exists = True
else:
logger.info(f"♻️ 买单价格偏离 (现{price} vs 标{target_bid}),撤单")
self.api.cancel_order(order.get('order_id') or order.get('id'))
if not valid_bid_exists:
# 计算张数
size_contract = self.config.order_size_usdt / target_bid / 0.01
size_contract = max(1, int(size_contract))
logger.info(f" 补买单: {target_bid} (数量:{size_contract})")
self.trading_executor.place_limit_order("buy", target_bid, size_contract)
# --- 调节卖单 ---
valid_ask_exists = False
for order in current_asks:
price = float(order.get('price', 0))
diff_pct = abs(price - target_ask) / target_ask
if diff_pct < self.config.price_tolerance:
valid_ask_exists = True
else:
logger.info(f"♻️ 卖单价格偏离 (现{price} vs 标{target_ask}),撤单")
self.api.cancel_order(order.get('order_id') or order.get('id'))
if not valid_ask_exists:
size_contract = self.config.order_size_usdt / target_ask / 0.01
size_contract = max(1, int(size_contract))
logger.info(f" 补卖单: {target_ask} (数量:{size_contract})")
self.trading_executor.place_limit_order("sell", target_ask, size_contract)
def run(self):
self.running = True
logger.info("🚀 策略已启动")
while self.running:
try:
# 1. 获取市场数据
ob = self.api.get_order_book()
if not ob:
time.sleep(1)
continue
mid_price = ob.mid_price
# 2. 获取持仓
net_position = self.api.get_position_net()
# 3. 计算目标价
t_bid, t_ask = self.calculate_target_prices(mid_price, net_position)
logger.info(f"Mid:{mid_price:.2f} | Pos:{net_position} | Target Bid:{t_bid} Ask:{t_ask}")
# 4. 调节挂单
self.reconcile_orders(t_bid, t_ask)
# 5. 循环间隔
time.sleep(3)
except KeyboardInterrupt:
logger.warning("停止策略")
break
except Exception as e:
logger.error(f"Loop Exception: {e}")
time.sleep(2)
if __name__ == '__main__':
config = MarketMakingConfig(
contract_symbol="ETHUSDT",
spread_percent=0.04,
order_size_usdt=0.1,
max_position_usdt=50.0,
inventory_skew_factor=0.0005,
price_tolerance=0.0005
)
strategy = MarketMakingStrategy(config)
strategy.run()

View File

@@ -1,83 +0,0 @@
import time
import csv
import loguru
from bitmart.api_contract import APIContract
# ------------------ 配置 ------------------
START_YEAR = 2025
CONTRACT_SYMBOL = "ETHUSDT"
STEP = 3 # K 线周期,单位分钟
CSV_FILE = f"kline_{STEP}.csv"
memo = "合约交易"
api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15))
# ------------------ 时间戳 ------------------
start_of_year = int(time.mktime((START_YEAR, 1, 1, 0, 0, 0, 0, 0, 0)))
current_time = int(time.time())
# ------------------ 抓取数据 ------------------
all_data = []
existing_ids = set()
start_time = start_of_year
# 每次请求时间长度 = step * 500 条 K 线
request_interval_ms = STEP * 60 * 500
while start_time < current_time:
end_time = min(start_time + request_interval_ms, current_time)
loguru.logger.info(f"抓取时间段: {start_time} ~ {end_time}")
try:
response = contractAPI.get_kline(
contract_symbol=CONTRACT_SYMBOL,
step=STEP,
start_time=start_time,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
print(k)
k_id = int(k["timestamp"])
if k_id in existing_ids:
continue
existing_ids.add(k_id)
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"]),
'volume': float(k["volume"])
})
formatted.sort(key=lambda x: x['id'])
all_data.extend(formatted)
if len(response) < 500:
start_time = end_time
else:
start_time = formatted[-1]['id'] + 1
except Exception as e:
print(f"请求出错: {e},等待 60 秒后重试")
time.sleep(60)
time.sleep(0.2) # 控制速率,保证 <= 每 2 秒 12 次
# ------------------ 保存 CSV ------------------
csv_columns = ['id', 'open', 'high', 'low', 'close', 'volume']
try:
with open(CSV_FILE, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for data in all_data:
writer.writerow(data)
print(f"数据已保存到 {CSV_FILE},共 {len(all_data)}")
except IOError:
print("I/O error")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 144 KiB

File diff suppressed because one or more lines are too long

View File

@@ -1,866 +0,0 @@
import os
import time
import uuid
import datetime
from dataclasses import dataclass
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
@dataclass
class StrategyConfig:
# =============================
# 1m | ETH 永续 | 控止损≤5/日
# =============================
# ===== 合约 =====
contract_symbol: str = "ETHUSDT"
open_type: str = "cross"
leverage: str = "30"
# ===== K线与指标 =====
step_min: int = 1
lookback_min: int = 240
ema_len: int = 36
atr_len: int = 14
# =========================================================
# ✅ 自动阈值ATR/Price 分位数基准(更稳,不被短时噪声带跑)
# =========================================================
vol_baseline_window: int = 60
vol_baseline_quantile: float = 0.65
vol_scale_min: float = 0.80
vol_scale_max: float = 1.60
# ✅ baseline 每 60 秒刷新一次体感更明显、也省CPU
base_ratio_refresh_sec: int = 60
# =========================================================
# ✅ 动态 floor方案一
# floor = clamp(min, base_k * base_ratio, max)
# 目的跟着典型波动变过滤小噪声tp/sl 也随环境自适应
# =========================================================
# entry_dev_floor 动态
entry_dev_floor_min: float = 0.0012 # 0.12%
entry_dev_floor_max: float = 0.0030 # 0.30%(可按你偏好调)
entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio
# tp_floor 动态
tp_floor_min: float = 0.0006 # 0.06%
tp_floor_max: float = 0.0020 # 0.20%
tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio止盈别太大1m回归更实际
# sl_floor 动态
sl_floor_min: float = 0.0018 # 0.18%
sl_floor_max: float = 0.0060 # 0.60%
sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratioETH 1m 插针多,止损下限可更稳)
# =========================================================
# ✅ 动态阈值倍率(仍然保留你原来思路)
# =========================================================
entry_k: float = 1.45
tp_k: float = 0.65
sl_k: float = 1.05
# ===== 时间/冷却 =====
max_hold_sec: int = 75
cooldown_sec_after_exit: int = 20
# ===== 下单/仓位 =====
risk_percent: float = 0.004
min_size: int = 1
max_size: int = 5000
# ===== 日内风控 =====
daily_loss_limit: float = 0.02
daily_profit_cap: float = 0.01
# ===== 危险模式过滤 =====
atr_ratio_kill: float = 0.0038
big_body_kill: float = 0.010
# ===== 轮询节奏 =====
klines_refresh_sec: int = 10
tick_refresh_sec: int = 1
status_notify_sec: int = 60
# =========================================================
# ✅ 止损后同向入场加门槛(但不禁止同向重入)
# =========================================================
reentry_penalty_mult: float = 1.55
reentry_penalty_max_sec: int = 180
reset_band_k: float = 0.45
reset_band_floor: float = 0.0006
# =========================================================
# ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动
# =========================================================
post_sl_sl_max_sec: int = 90
post_sl_mult_min: float = 1.02
post_sl_mult_max: float = 1.16
post_sl_vol_alpha: float = 0.20
class BitmartFuturesMeanReversionBot:
def __init__(self, cfg: StrategyConfig):
self.cfg = cfg
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
if not self.api_key or not self.secret_key:
raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)")
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 持仓状态: -1 空, 0 无, 1 多
self.pos = 0
self.entry_price = None
self.entry_ts = None
self.last_exit_ts = 0
# 日内权益基准
self.day_start_equity = None
self.trading_enabled = True
self.day_tag = datetime.date.today()
# 缓存
self._klines_cache = None
self._klines_cache_ts = 0
self._last_status_notify_ts = 0
# ✅ base_ratio 缓存
self._base_ratio_cached = 0.0015 # 初始化默认值 0.15%
self._base_ratio_ts = 0.0
# ✅ 止损后"同向入场加门槛"状态
self.last_sl_dir = 0 # 1=多止损,-1=空止损0=无
self.last_sl_ts = 0.0
# ✅ 止损后"同方向 SL 联动放宽"状态
self.post_sl_dir = 0
self.post_sl_ts = 0.0
self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale
self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%")
# ----------------- 通用工具 -----------------
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for _ in range(3):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self) -> bool:
try:
resp = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.contract_symbol,
leverage=self.cfg.leverage,
open_type=self.cfg.open_type
)[0]
if resp.get("code") == 1000:
logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
return True
logger.error(f"设置杠杆失败: {resp}")
self.ding(f"设置杠杆失败: {resp}", error=True)
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
self.ding(f"设置杠杆异常: {e}", error=True)
return False
# ----------------- 行情/指标 -----------------
def get_klines_cached(self):
now = time.time()
if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
return self._klines_cache
kl = self.get_klines()
if kl:
self._klines_cache = kl
self._klines_cache_ts = now
return self._klines_cache
def get_klines(self):
try:
end_time = int(time.time())
start_time = end_time - 60 * self.cfg.lookback_min
resp = self.contractAPI.get_kline(
contract_symbol=self.cfg.contract_symbol,
step=self.cfg.step_min,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp.get("data", [])
formatted = []
for k in data:
formatted.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
formatted.sort(key=lambda x: x["id"])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(f"获取K线异常: {e}", error=True)
return None
def get_last_price(self, fallback_close: float) -> float:
try:
if hasattr(self.contractAPI, "get_contract_details"):
r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "mark_price", "index_price"):
if key in d and d[key] is not None:
return float(d[key])
if hasattr(self.contractAPI, "get_ticker"):
r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "price", "last", "close"):
if key in d and d[key] is not None:
return float(d[key])
except Exception:
pass
return float(fallback_close)
@staticmethod
def ema(values, n: int) -> float:
k = 2 / (n + 1)
e = values[0]
for v in values[1:]:
e = v * k + e * (1 - k)
return e
@staticmethod
def atr(klines, n: int) -> float:
if len(klines) < n + 1:
return 0.0
trs = []
for i in range(-n, 0):
cur = klines[i]
prev = klines[i - 1]
tr = max(
cur["high"] - cur["low"],
abs(cur["high"] - prev["close"]),
abs(cur["low"] - prev["close"]),
)
trs.append(tr)
return sum(trs) / len(trs)
def is_danger_market(self, klines, price: float) -> bool:
last = klines[-1]
body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
if body >= self.cfg.big_body_kill:
return True
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
if atr_ratio >= self.cfg.atr_ratio_kill:
return True
return False
def atr_ratio_baseline(self, klines) -> float:
"""简化版ATR基准计算"""
window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1)
if window <= 10: # 数据太少
logger.warning(f"数据不足计算基准: {len(klines)}根K线")
return 0.0
ratios = []
# 简化计算每隔3根K线计算一个ATR比率减少计算量
step = 3
for i in range(-window, 0, step):
if len(klines) + i < self.cfg.atr_len + 1:
continue
# 计算当前位置的ATR
start_idx = len(klines) + i - self.cfg.atr_len
end_idx = len(klines) + i
if start_idx < 0 or end_idx <= start_idx:
continue
sub_klines = klines[start_idx:end_idx]
# 确保有足够数据计算ATR
if len(sub_klines) >= self.cfg.atr_len + 1:
a = self.atr(sub_klines, self.cfg.atr_len)
price = klines[end_idx - 1]["close"]
if a > 0 and price > 0:
ratio = a / price
if 0.0001 < ratio < 0.01: # 过滤异常值
ratios.append(ratio)
if len(ratios) < 5: # 样本太少
# 尝试直接使用整个数据计算一个ATR比率
a = self.atr(klines[-60:], self.cfg.atr_len) # 使用最近60根K线
price = klines[-1]["close"]
if a > 0 and price > 0:
baseline = a / price
logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%")
return baseline
else:
return 0.0
# 计算分位数
ratios.sort()
idx = min(len(ratios) - 1,
max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1))))
baseline = ratios[idx]
logger.debug(f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, "
f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]")
return baseline
def get_base_ratio_cached(self, klines) -> float:
"""获取缓存的基准波动率,定期刷新"""
now = time.time()
refresh_sec = self.cfg.base_ratio_refresh_sec
if (self._base_ratio_cached is None or
(now - self._base_ratio_ts) >= refresh_sec):
# 使用简单版本的基准计算
baseline = self.atr_ratio_baseline(klines)
if baseline > 0.0001: # 大于0.01%才认为是有效值
self._base_ratio_cached = baseline
self._base_ratio_ts = now
logger.info(f"基准波动率更新: {baseline * 100:.4f}%")
else:
# 使用基于价格的动态默认值
current_price = klines[-1]["close"] if klines else 3000
# ETH价格越高基准波动率越小百分比
if current_price > 4000:
default_baseline = 0.0010 # 0.10%
elif current_price > 3500:
default_baseline = 0.0012 # 0.12%
elif current_price > 3000:
default_baseline = 0.0015 # 0.15%
elif current_price > 2500:
default_baseline = 0.0018 # 0.18%
else:
default_baseline = 0.0020 # 0.20%
self._base_ratio_cached = default_baseline
self._base_ratio_ts = now
logger.warning(f"使用价格动态默认基准: {default_baseline * 100:.4f}% "
f"(价格=${current_price:.0f})")
return self._base_ratio_cached
@staticmethod
def _clamp(x: float, lo: float, hi: float) -> float:
"""限制数值在指定范围内"""
return max(lo, min(hi, x))
def dynamic_thresholds(self, atr_ratio: float, base_ratio: float):
"""
✅ entry/tp/sl 全部动态(修复版):
- vol_scaleatr_ratio/base_ratio 限幅
- floor方案一 (floor = clamp(min, k*base_ratio, max))
- 最终阈值max(floor, k * vol_scale * atr_ratio)
"""
# 1) 检查输入有效性
if atr_ratio <= 0:
logger.warning(f"ATR比率异常: {atr_ratio}")
atr_ratio = 0.001 # 默认值 0.1%
# 2) 如果base_ratio太小或无效使用调整后的atr_ratio
if base_ratio < 0.0005: # 小于0.05%视为无效
base_ratio = max(0.001, atr_ratio * 1.2) # 比当前ATR比率稍大
logger.debug(f"基准太小使用调整后的atr_ratio: {base_ratio * 100:.4f}%")
# 3) vol_scale计算
if base_ratio > 0:
raw_scale = atr_ratio / base_ratio
vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max)
logger.debug(
f"vol_scale: {raw_scale:.2f}{vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)")
else:
vol_scale = 1.0
logger.warning(f"基准无效使用默认vol_scale=1.0")
# 4) 动态floor计算
# Entry floor
entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio
entry_floor = self._clamp(
entry_floor_raw,
self.cfg.entry_dev_floor_min,
self.cfg.entry_dev_floor_max,
)
# TP floor
tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio
tp_floor = self._clamp(
tp_floor_raw,
self.cfg.tp_floor_min,
self.cfg.tp_floor_max,
)
# SL floor
sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio
sl_floor = self._clamp(
sl_floor_raw,
self.cfg.sl_floor_min,
self.cfg.sl_floor_max,
)
# 5) 最终阈值计算
entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio
entry_dev = max(entry_floor, entry_dev_atr_part)
tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio
tp = max(tp_floor, tp_atr_part)
sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio
sl = max(sl_floor, sl_atr_part)
# 6) 确保entry_dev不会太小
entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min)
# 7) 输出详细信息
logger.info(
f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, "
f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, "
f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%"
)
return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor
# ----------------- 账户/仓位 -----------------
def get_assets_available(self) -> float:
try:
resp = self.contractAPI.get_assets_detail()[0]
if resp.get("code") != 1000:
return 0.0
data = resp.get("data")
if isinstance(data, dict):
return float(data.get("available_balance", 0))
if isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return 0.0
except Exception as e:
logger.error(f"余额查询异常: {e}")
return 0.0
def get_position_status(self) -> bool:
try:
resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
if resp.get("code") != 1000:
return False
positions = resp.get("data", [])
if not positions:
self.pos = 0
return True
p = positions[0]
self.pos = 1 if p["position_type"] == 1 else -1
return True
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(f"持仓查询异常: {e}", error=True)
return False
def get_equity_proxy(self) -> float:
return self.get_assets_available()
def refresh_daily_baseline(self):
today = datetime.date.today()
if today != self.day_tag:
self.day_tag = today
self.day_start_equity = None
self.trading_enabled = True
self.ding(f"新的一天({today}):重置日内风控基准")
def risk_kill_switch(self):
self.refresh_daily_baseline()
equity = self.get_equity_proxy()
if equity <= 0:
return
if self.day_start_equity is None:
self.day_start_equity = equity
logger.info(f"日内权益基准设定:{equity:.2f} USDT")
return
pnl = (equity - self.day_start_equity) / self.day_start_equity
if pnl <= -self.cfg.daily_loss_limit:
self.trading_enabled = False
self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
if pnl >= self.cfg.daily_profit_cap:
self.trading_enabled = False
self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
# ----------------- 下单 -----------------
def calculate_size(self, price: float) -> int:
bal = self.get_assets_available()
if bal < 10:
return 0
margin = bal * self.cfg.risk_percent
lev = int(self.cfg.leverage)
# ⚠️ 沿用你的原假设1张≈0.001ETH
size = int((margin * lev) / (price * 0.001))
size = max(self.cfg.min_size, size)
size = min(self.cfg.max_size, size)
return size
def place_market_order(self, side: int, size: int) -> bool:
if size <= 0:
return False
client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
resp = self.contractAPI.post_submit_order(
contract_symbol=self.cfg.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type="market",
leverage=self.cfg.leverage,
open_type=self.cfg.open_type,
size=size
)[0]
logger.info(f"order_resp: {resp}")
if resp.get("code") == 1000:
return True
self.ding(f"下单失败: {resp}", error=True)
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
self.ding(f"API下单异常: {e}", error=True)
return False
except Exception as e:
logger.error(f"下单未知异常: {e}")
self.ding(f"下单未知异常: {e}", error=True)
return False
def close_position_all(self):
if self.pos == 1:
ok = self.place_market_order(3, 999999)
if ok:
self.pos = 0
elif self.pos == -1:
ok = self.place_market_order(2, 999999)
if ok:
self.pos = 0
# ----------------- 止损后机制 -----------------
def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
"""检查是否需要应用重新入场惩罚"""
if self.last_sl_dir == 0:
return False
if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
self.last_sl_dir = 0
return False
reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
if abs(dev) <= reset_band:
self.last_sl_dir = 0
return False
return True
def _post_sl_dynamic_mult(self) -> float:
"""计算止损后SL放宽倍数"""
if self.post_sl_dir == 0:
return 1.0
if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
self.post_sl_dir = 0
self.post_sl_vol_scale = 1.0
return 1.0
raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
raw = max(1.0, raw) # 不缩小止损,只放宽
return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
# ----------------- 交易逻辑 -----------------
def in_cooldown(self) -> bool:
"""检查是否在冷却期内"""
return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit
def maybe_enter(self, price: float, ema_value: float, entry_dev: float):
"""检查并执行入场"""
if self.pos != 0:
return
if self.in_cooldown():
return
dev = (price - ema_value) / ema_value if ema_value else 0.0
size = self.calculate_size(price)
if size <= 0:
return
penalty_active = self._reentry_penalty_active(dev, entry_dev)
long_th = -entry_dev
short_th = entry_dev
if penalty_active:
if self.last_sl_dir == 1:
long_th = -entry_dev * self.cfg.reentry_penalty_mult
logger.info(
f"多头止损后惩罚生效: 入场阈值从 {long_th * 100:.3f}% 调整为 {(-entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
elif self.last_sl_dir == -1:
short_th = entry_dev * self.cfg.reentry_penalty_mult
logger.info(
f"空头止损后惩罚生效: 入场阈值从 {short_th * 100:.3f}% 调整为 {(entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
logger.info(
f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% "
f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) "
f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}"
)
if dev <= long_th:
if self.place_market_order(1, size):
self.pos = 1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开多dev={dev * 100:.3f}% size={size} entry={price:.2f}")
elif dev >= short_th:
if self.place_market_order(4, size):
self.pos = -1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开空dev={dev * 100:.3f}% size={size} entry={price:.2f}")
def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float):
"""检查并执行出场"""
if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
return
hold = time.time() - self.entry_ts
if self.pos == 1:
pnl = (price - self.entry_price) / self.entry_price
else:
pnl = (self.entry_price - price) / self.entry_price
sl_mult = 1.0
if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
sl_mult = self._post_sl_dynamic_mult()
effective_sl = sl * sl_mult
if pnl >= tp:
self.close_position_all()
self.ding(f"🎯止盈pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif pnl <= -effective_sl:
sl_dir = self.pos
self.close_position_all()
self.ding(
f"🛑止损pnl={pnl * 100:.3f}% price={price:.2f} "
f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})",
error=True
)
self.last_sl_dir = sl_dir
self.last_sl_ts = time.time()
self.post_sl_dir = sl_dir
self.post_sl_ts = time.time()
self.post_sl_vol_scale = float(vol_scale)
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif hold >= self.cfg.max_hold_sec:
self.close_position_all()
self.ding(f"⏱超时hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
atr_ratio: float, base_ratio: float, vol_scale: float,
entry_dev: float, tp: float, sl: float,
entry_floor: float, tp_floor: float, sl_floor: float):
"""限频状态通知"""
now = time.time()
if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
return
self._last_status_notify_ts = now
direction_str = "" if self.pos == 1 else ("" if self.pos == -1 else "")
penalty_active = self._reentry_penalty_active(dev, entry_dev)
sl_mult = 1.0
if self.pos != 0 and self.post_sl_dir == self.pos:
sl_mult = self._post_sl_dynamic_mult()
base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1
msg = (
f"【BitMart {self.cfg.contract_symbol}1m均值回归(动态阈值)】\n"
f"📊 状态:{direction_str}\n"
f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}{ema_value:.2f}\n"
f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n"
f"🌊 波动率ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n"
f"🎯 动态Floor入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n"
f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n"
f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n"
f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n"
f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x\n"
f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s"
)
self.ding(msg)
def action(self):
"""主循环"""
if not self.set_leverage():
self.ding("杠杆设置失败,停止运行", error=True)
return
while True:
now_dt = datetime.datetime.now()
self.pbar.n = now_dt.second
self.pbar.refresh()
# 1. 获取K线数据
klines = self.get_klines_cached()
if not klines or len(klines) < (self.cfg.ema_len + 5):
logger.warning("K线数据不足等待...")
time.sleep(1)
continue
# 2. 计算技术指标
last_k = klines[-1]
closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
ema_value = self.ema(closes, self.cfg.ema_len)
price = self.get_last_price(fallback_close=float(last_k["close"]))
dev = (price - ema_value) / ema_value if ema_value else 0.0
# 3. 计算波动率相关指标
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
base_ratio = self.get_base_ratio_cached(klines)
# 4. 计算动态阈值
entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds(
atr_ratio, base_ratio
)
# 记录调试信息
logger.debug(
f"循环数据: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}%, "
f"atr_ratio={atr_ratio * 100:.3f}%, base_ratio={base_ratio * 100:.3f}%, "
f"entry_dev={entry_dev * 100:.3f}%"
)
# 5. 风控检查
self.risk_kill_switch()
# 6. 获取持仓状态
if not self.get_position_status():
time.sleep(1)
continue
# 7. 检查交易是否启用
if not self.trading_enabled:
if self.pos != 0:
self.close_position_all()
logger.warning("交易被禁用(风控触发),等待...")
time.sleep(5)
continue
# 8. 检查危险市场
if self.is_danger_market(klines, price):
logger.warning("危险模式:高波动/大实体K暂停开仓")
self.maybe_exit(price, tp, sl, vol_scale)
time.sleep(self.cfg.tick_refresh_sec)
continue
# 9. 执行交易逻辑
self.maybe_exit(price, tp, sl, vol_scale)
self.maybe_enter(price, ema_value, entry_dev)
# 10. 状态通知
bal = self.get_assets_available()
self.notify_status_throttled(
price, ema_value, dev, bal,
atr_ratio, base_ratio, vol_scale,
entry_dev, tp, sl,
entry_floor, tp_floor, sl_floor
)
time.sleep(self.cfg.tick_refresh_sec)
if __name__ == "__main__":
"""
Windows PowerShell:
setx BITMART_API_KEY "你的key"
setx BITMART_SECRET_KEY "你的secret"
setx BITMART_MEMO "合约交易"
重新打开终端再运行。
Linux/macOS:
export BITMART_API_KEY="你的key"
export BITMART_SECRET_KEY="你的secret"
export BITMART_MEMO "合约交易"
"""
cfg = StrategyConfig()
bot = BitmartFuturesMeanReversionBot(cfg)
# 设置日志级别为INFO以便查看详细计算过程
logger.remove()
logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO")
try:
bot.action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
bot.ding("🤖 策略已手动停止")
except Exception as e:
logger.error(f"程序异常退出: {e}")
bot.ding(f"❌ 策略异常退出: {e}", error=True)
raise
# 目前动态计算阀值的速度是多少

File diff suppressed because it is too large Load Diff

View File

@@ -1,169 +0,0 @@
"""
BitMart 15分钟K线数据抓取脚本
从 BitMart API 获取15分钟K线数据并存储到数据库
"""
import time
from loguru import logger
from bitmart.api_contract import APIContract
from models.bitmart_15 import BitMart15
class BitMartDataCollector:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
def get_klines(self, start_time=None, end_time=None, limit=200):
"""
获取K线数据
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:param limit: 获取数量限制
:return: K线数据列表
"""
try:
if not end_time:
end_time = int(time.time())
if not start_time:
start_time = end_time - 3600 * 24 * 1 # 默认获取最近7天
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=15, # 15分钟
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.error(f"获取K线失败: {response}")
return []
klines = response.get('data', [])
formatted = []
for k in klines:
# BitMart API 返回的时间戳是秒级,需要转换为毫秒级
# 根据 bitmart/框架.py 中的使用方式API返回的是秒级时间戳
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
# 按时间戳排序
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
return []
def save_klines(self, klines):
"""
保存K线数据到数据库
:param klines: K线数据列表
:return: 保存的数量
"""
saved_count = 0
for kline in klines:
try:
BitMart15.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
saved_count += 1
except Exception as e:
logger.error(f"保存K线数据失败 {kline['id']}: {e}")
return saved_count
def collect_historical_data(self, start_date=None, days=None):
"""
抓取历史数据(从指定日期到现在)
:param start_date: 起始日期字符串,格式 'YYYY-MM-DD',如 '2025-01-01'
:param days: 如果不指定 start_date则抓取最近多少天的数据
"""
import datetime
now = int(time.time())
if start_date:
# 解析起始日期
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start_time = int(start_dt.timestamp())
logger.info(f"开始抓取 BitMart {self.contract_symbol}{start_date} 到现在的15分钟K线数据")
elif days:
target_start_time = now - 3600 * 24 * days
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的15分钟K线数据")
else:
target_start_time = now - 3600 * 24 * 30 # 默认30天
logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 30 天的15分钟K线数据")
# 分批获取每次获取5天的数据15分钟K线数据量较大
batch_days = 5
total_saved = 0
fail_count = 0
max_fail = 3 # 连续失败超过3次则停止
current_end = now
while current_end > target_start_time:
current_start = max(current_end - 3600 * 24 * batch_days, target_start_time)
logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}")
klines = self.get_klines(start_time=current_start, end_time=current_end)
if klines:
saved = self.save_klines(klines)
total_saved += saved
logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved}")
fail_count = 0 # 重置失败计数
else:
fail_count += 1
logger.warning(f"本批次未获取到数据 (连续失败 {fail_count} 次)")
if fail_count >= max_fail:
logger.error(f"连续 {max_fail} 次获取数据失败,可能已达到 API 历史数据限制,停止抓取")
break
current_end = current_start
time.sleep(1) # 避免请求过快
logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据")
def collect_realtime_data(self):
"""
实时抓取最新数据(用于定时任务)
"""
logger.info("开始抓取 BitMart 最新15分钟K线数据")
# 获取最近1小时的数据确保能获取到最新的K线
end_time = int(time.time())
start_time = end_time - 3600 * 2 # 最近2小时
klines = self.get_klines(start_time=start_time, end_time=end_time)
if klines:
saved = self.save_klines(klines)
logger.success(f"保存 {saved} 条最新K线数据")
else:
logger.warning("未获取到最新数据")
if __name__ == '__main__':
collector = BitMartDataCollector()
# 抓取从 2025-01-01 到现在的15分钟K线历史数据
collector.collect_historical_data(start_date='2025-01-01')
# 如果需要实时抓取,可以取消下面的注释
# collector.collect_realtime_data()