This commit is contained in:
27942
2025-12-23 14:15:29 +08:00
parent 771cd5c608
commit 60de2d8300
11 changed files with 1159 additions and 293 deletions

View File

@@ -1,291 +0,0 @@
import time
import uuid
import datetime
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
return float(c['close']) < float(c['open'])
def is_trending(self, klines):
"""判断是否为单边行情通过布林带或RSI"""
close_prices = [kline['close'] for kline in klines]
rsi_value = self.calculate_rsi(close_prices, 14) # 使用14期的RSI
if rsi_value > 70 or rsi_value < 30:
return True # 单边行情
return False # 震荡行情
def calculate_rsi(self, prices, period=14):
"""计算RSI指标"""
deltas = [prices[i] - prices[i - 1] for i in range(1, len(prices))]
gains = [delta if delta > 0 else 0 for delta in deltas]
losses = [-delta if delta < 0 else 0 for delta in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
rs = avg_gain / avg_loss if avg_loss != 0 else 0
rsi = 100 - (100 / (1 + rs))
return rsi
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][0]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def calculate_size(self):
"""计算开仓张数使用可用余额的1%作为保证金"""
balance = self.get_available_balance()
self.balance = balance
if not balance or balance < 10:
logger.warning("余额不足,无法开仓")
return 0
price = self.get_current_price()
if not price:
price = 3000 # 保守估计避免size过大
leverage = int(self.leverage)
margin = balance * self.risk_percent # 使用1%余额
# ETHUSDT 1张 ≈ 0.001 ETH
size = int((margin * leverage) / (price * 0.001))
size = max(1, size)
logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})")
return size
def place_market_order(self, side: int, size: int):
if size <= 0:
return False
client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
response = self.contractAPI.post_submit_order(
contract_symbol=self.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type='market',
leverage=self.leverage,
open_type=self.open_type,
size=size
)[0]
if response['code'] == 1000:
logger.success(
f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}")
return True
else:
logger.error(f"下单失败: {response}")
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
return False
def check_signal(self, prev, curr):
"""简化英戈尔夫形态"""
if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']):
return "long"
if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']):
return "short"
return None
def execute_trade(self):
"""执行交易逻辑,根据市场状态切换策略"""
klines = self.get_klines()
if not klines or len(klines) < 3:
return
if self.is_trending(klines): # 单边行情
self.direction = self.check_signal(klines[1], klines[2])
if self.direction:
logger.success(f"检测到{self.direction}信号准备开仓用余额1%")
self.execute_trade() # 执行趋势跟随交易
else: # 震荡行情
self.execute_grid_trade() # 执行网格交易策略
def execute_grid_trade(self):
"""网格交易策略"""
logger.info("开始网格交易")
# 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.error("无法获取当前价格,网格交易无法执行")
return
# 假设的网格区间(可以根据需要调整)
grid_step = 10 # 每次10USDT为一个网格
grid_size = 1 # 每次开仓数量单位ETH
# 计算上网格和下网格价格
lower_price = current_price - grid_step # 下网格价格
upper_price = current_price + grid_step # 上网格价格
# 生成买卖网格订单
try:
# 设置买单
buy_order = self.place_market_order(side=1, size=grid_size) # 开多
if buy_order:
logger.info(f"已成功设置买单,买入价格:{lower_price},数量:{grid_size} ETH")
# 设置卖单
sell_order = self.place_market_order(side=4, size=grid_size) # 开空
if sell_order:
logger.info(f"已成功设置卖单,卖出价格:{upper_price},数量:{grid_size} ETH")
except Exception as e:
logger.error(f"网格交易下单失败: {e}")
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
while True:
current_minute = datetime.datetime.now().minute
if current_minute < 30:
self.pbar.n = current_minute
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
self.execute_trade()
time.sleep(2) # 高频交易,减少等待时间
if __name__ == '__main__':
BitmartFuturesTransaction().action()

Binary file not shown.

860
bitmart/交易.py Normal file
View File

@@ -0,0 +1,860 @@
"""
BitMart 被动做市/高频刷单策略
核心逻辑:在盘口两侧不断挂单,赚取价差+返佣
"""
import time
import requests
from loguru import logger
from threading import Lock
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
from bitmart.api_contract import APIContract
from DrissionPage import ChromiumPage, ChromiumOptions
from 交易.tools import send_dingtalk_message
# ================================================================
# 📊 配置类
# ================================================================
@dataclass
class MarketMakingConfig:
"""做市策略配置"""
# API配置仅用于查询不下单
api_key: str = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key: str = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
memo: str = "合约交易"
contract_symbol: str = "ETHUSDT"
# 浏览器配置
tge_id: int = 196495 # TGE浏览器ID
tge_url: str = "http://127.0.0.1:50326"
tge_headers: Dict = None
trading_url: str = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 做市参数
spread_percent: float = 0.01 # 价差百分比0.01% = 买一卖一之间)
order_size_usdt: float = 10.0 # 每单金额USDT
max_position_usdt: float = 100.0 # 最大持仓金额USDT
# 订单管理
order_refresh_interval: float = 2.0 # 订单刷新间隔(秒)
order_timeout: float = 60.0 # 订单超时时间(秒),超时后撤单重新挂
# 风险控制
max_daily_loss: float = 50.0 # 每日最大亏损USDT
max_daily_trades: int = 1000 # 每日最大交易次数
# 杠杆和模式
leverage: str = "100" # 杠杆倍数
open_type: str = "cross" # 全仓模式
def __post_init__(self):
"""初始化TGE headers"""
if self.tge_headers is None:
self.tge_headers = {
"Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# ================================================================
# 📊 订单簿数据结构
# ================================================================
@dataclass
class OrderBook:
"""订单簿数据"""
bids: List[Tuple[float, float]] # [(价格, 数量), ...] 买盘,价格从高到低
asks: List[Tuple[float, float]] # [(价格, 数量), ...] 卖盘,价格从低到高
timestamp: float
@property
def best_bid(self) -> Optional[float]:
"""买一价"""
return self.bids[0][0] if self.bids else None
@property
def best_ask(self) -> Optional[float]:
"""卖一价"""
return self.asks[0][0] if self.asks else None
@property
def spread(self) -> Optional[float]:
"""价差"""
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
@property
def mid_price(self) -> Optional[float]:
"""中间价"""
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return None
@dataclass
class PendingOrder:
""" pending订单信息"""
order_id: str
side: str # "buy" or "sell"
price: float
size: float
create_time: float
status: str # "pending", "filled", "cancelled"
# ================================================================
# 📊 浏览器管理器
# ================================================================
class BrowserManager:
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def open_browser(self) -> bool:
"""打开浏览器并获取端口"""
try:
response = requests.post(
f"{self.config.tge_url}/api/browser/start",
json={"envId": self.config.tge_id},
headers=self.config.tge_headers,
timeout=10
)
self.tge_port = response.json()["data"]["port"]
logger.success(f"成功打开浏览器,端口:{self.tge_port}")
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def take_over_browser(self) -> bool:
"""接管浏览器"""
if not self.tge_port:
logger.error("浏览器端口未设置")
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
logger.success("成功接管浏览器")
return True
except Exception as e:
logger.error(f"接管浏览器失败: {e}")
return False
def close_extra_tabs(self) -> bool:
"""关闭多余的标签页,只保留第一个"""
if not self.page:
return False
try:
tabs = self.page.get_tabs()
closed_count = 0
for idx, tab in enumerate(tabs):
if idx == 0:
continue
tab.close()
closed_count += 1
if closed_count > 0:
logger.info(f"已关闭{closed_count}个多余标签页")
return True
except Exception as e:
logger.warning(f"关闭多余标签页失败: {e}")
return False
# ================================================================
# 📊 浏览器交易执行器
# ================================================================
class BrowserTradingExecutor:
"""浏览器交易执行器:通过浏览器自动化下单(获取高返佣)"""
def __init__(self, page: ChromiumPage):
self.page = page
def click_safe(self, xpath: str, sleep: float = 0.5) -> bool:
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except Exception as e:
logger.error(f"点击失败 {xpath}: {e}")
return False
def 开单(self, marketPriceLongOrder: int = 0, limitPriceShortOrder: int = 0,
size: Optional[float] = None, price: Optional[float] = None) -> bool:
"""
开单操作(通过浏览器自动化,获取高返佣)
Args:
marketPriceLongOrder: 市价最多或者做空1是最多-1是做空
limitPriceShortOrder: 限价最多或者做空1是最多-1是做空
size: 数量
price: 价格(限价单需要)
Returns:
是否成功
"""
try:
# 市价单
if marketPriceLongOrder == -1:
# 市价做空
if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
return False
self.page.ele('x://*[@id="size_0"]').input(size)
if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
return False
logger.success(f"市价做空成功: {size}")
return True
elif marketPriceLongOrder == 1:
# 市价做多
if not self.click_safe('x://button[normalize-space(text()) ="市价"]'):
return False
self.page.ele('x://*[@id="size_0"]').input(size)
if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
return False
logger.success(f"市价做多成功: {size}")
return True
# 限价单
if limitPriceShortOrder == -1:
# 限价做空
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
return False
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(size)
if not self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'):
return False
logger.success(f"限价做空成功: {size} @ {price}")
return True
elif limitPriceShortOrder == 1:
# 限价做多
if not self.click_safe('x://button[normalize-space(text()) ="限价"]'):
return False
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(size)
if not self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'):
return False
logger.success(f"限价做多成功: {size} @ {price}")
return True
return False
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def 平仓(self) -> bool:
"""市价平仓"""
try:
if self.click_safe('x://span[normalize-space(text()) ="市价"]'):
logger.success("平仓成功")
return True
return False
except Exception as e:
logger.error(f"平仓异常: {e}")
return False
def place_limit_order(self, side: str, price: float, size: float) -> bool:
"""
下限价单(通过浏览器)
Args:
side: "buy""sell"
price: 价格
size: 数量(张数)
Returns:
是否成功
"""
try:
# size已经是张数直接使用
if side == "buy":
# 限价做多
return self.开单(limitPriceShortOrder=1, size=size, price=price)
else:
# 限价做空
return self.开单(limitPriceShortOrder=-1, size=size, price=price)
except Exception as e:
logger.error(f"限价下单异常: {e}")
return False
# ================================================================
# 📊 BitMart API 封装(仅用于查询,不下单)
# ================================================================
class BitMartMarketMakerAPI:
"""BitMart做市API封装"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.contractAPI = APIContract(
config.api_key,
config.secret_key,
config.memo,
timeout=(5, 15)
)
def get_order_book(self, depth: int = 20) -> Optional[OrderBook]:
"""
获取订单簿
Args:
depth: 深度数量
Returns:
OrderBook对象或None
"""
try:
# BitMart合约API获取深度数据
# 注意需要根据实际API方法调整可能是 get_depth 或 get_market_depth
response = self.contractAPI.get_depth(
contract_symbol=self.config.contract_symbol,
size=depth
)[0]
if response.get('code') == 1000:
data = response.get('data', {})
# BitMart返回格式可能是不同的需要根据实际调整
bids = []
asks = []
if isinstance(data, dict):
bids_raw = data.get('bids', [])
asks_raw = data.get('asks', [])
# 处理不同格式
for b in bids_raw:
if isinstance(b, (list, tuple)) and len(b) >= 2:
bids.append((float(b[0]), float(b[1])))
elif isinstance(b, dict):
bids.append((float(b.get('price', 0)), float(b.get('size', 0))))
for a in asks_raw:
if isinstance(a, (list, tuple)) and len(a) >= 2:
asks.append((float(a[0]), float(a[1])))
elif isinstance(a, dict):
asks.append((float(a.get('price', 0)), float(a.get('size', 0))))
# 买盘按价格从高到低排序,卖盘按价格从低到高排序
bids.sort(key=lambda x: x[0], reverse=True)
asks.sort(key=lambda x: x[0])
if bids and asks:
return OrderBook(
bids=bids,
asks=asks,
timestamp=time.time()
)
return None
except Exception as e:
logger.error(f"获取订单簿异常: {e}")
return None
def get_current_price(self) -> Optional[float]:
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.config.contract_symbol,
step=1, # 1分钟
start_time=end_time - 60,
end_time=end_time
)[0]
if response.get('code') == 1000:
data = response.get('data', [])
if data:
return float(data[-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self) -> Optional[float]:
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response.get('code') == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position(self) -> Optional[Dict]:
"""获取当前持仓"""
try:
response = self.contractAPI.get_position(
contract_symbol=self.config.contract_symbol
)[0]
if response.get('code') == 1000:
positions = response.get('data', [])
if positions:
return positions[0]
return None
return None
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return None
def set_leverage(self) -> bool:
"""设置杠杆和全仓模式"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.config.contract_symbol,
leverage=self.config.leverage,
open_type=self.config.open_type
)[0]
if response.get('code') == 1000:
logger.success(f"全仓模式 + {self.config.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# 注意下单操作已改为浏览器自动化这里不再提供API下单方法
# ================================================================
# 📊 做市策略核心
# ================================================================
class MarketMakingStrategy:
"""被动做市策略(使用浏览器自动化下单,获取高返佣)"""
def __init__(self, config: MarketMakingConfig):
self.config = config
self.api = BitMartMarketMakerAPI(config) # 仅用于查询
# 浏览器管理
self.browser_manager = BrowserManager(config)
self.trading_executor: Optional[BrowserTradingExecutor] = None
# 订单管理使用时间戳作为订单ID因为浏览器下单无法直接获取订单ID
self.pending_orders: Dict[str, PendingOrder] = {}
self.order_lock = Lock()
# 统计
self.daily_trades = 0
self.daily_profit = 0.0
self.total_trades = 0
self.total_profit = 0.0
# 运行状态
self.running = False
self.last_order_refresh = 0.0
# 初始化浏览器和杠杆
self._initialize_browser()
self.api.set_leverage()
def _initialize_browser(self) -> bool:
"""初始化浏览器"""
try:
# 打开浏览器
if not self.browser_manager.open_browser():
logger.error("打开浏览器失败")
return False
# 接管浏览器
if not self.browser_manager.take_over_browser():
logger.error("接管浏览器失败")
return False
# 关闭多余标签页
self.browser_manager.close_extra_tabs()
# 打开交易页面
self.browser_manager.page.get(self.config.trading_url)
time.sleep(2) # 等待页面加载
# 初始化交易执行器
self.trading_executor = BrowserTradingExecutor(self.browser_manager.page)
logger.success("浏览器初始化完成")
return True
except Exception as e:
logger.error(f"浏览器初始化异常: {e}")
return False
def calculate_order_prices(self, order_book: OrderBook) -> Tuple[Optional[float], Optional[float]]:
"""
计算挂单价格
Args:
order_book: 订单簿
Returns:
(buy_price, sell_price)
"""
if not order_book.mid_price or not order_book.best_bid or not order_book.best_ask:
return None, None
mid = order_book.mid_price
spread_amount = mid * self.config.spread_percent / 100
# 买单价格:中间价 - 价差的一半,但不能低于买一
buy_price = mid - spread_amount / 2
buy_price = min(buy_price, order_book.best_bid * 0.9999) # 略低于买一,确保能成交
# 卖单价格:中间价 + 价差的一半,但不能高于卖一
sell_price = mid + spread_amount / 2
sell_price = max(sell_price, order_book.best_ask * 1.0001) # 略高于卖一,确保能成交
# 确保价差合理
if sell_price <= buy_price:
# 如果价差太小,使用买一卖一价格
buy_price = order_book.best_bid * 0.9999
sell_price = order_book.best_ask * 1.0001
return buy_price, sell_price
def should_refresh_orders(self) -> bool:
"""判断是否需要刷新订单"""
now = time.time()
if now - self.last_order_refresh >= self.config.order_refresh_interval:
return True
return False
def cancel_stale_orders(self):
"""撤销超时订单(通过浏览器刷新页面,手动撤销)"""
now = time.time()
to_cancel = []
with self.order_lock:
for order_id, order in self.pending_orders.items():
if order.status == "pending":
if now - order.create_time > self.config.order_timeout:
to_cancel.append(order_id)
# 如果有超时订单,刷新页面(页面会自动显示最新挂单状态)
if to_cancel:
logger.info(f"发现{len(to_cancel)}个超时订单,刷新页面")
try:
self.browser_manager.page.refresh()
time.sleep(1)
# 注意:实际撤销操作需要在页面上手动点击,这里只是标记
with self.order_lock:
for order_id in to_cancel:
if order_id in self.pending_orders:
self.pending_orders[order_id].status = "cancelled"
except Exception as e:
logger.error(f"刷新页面失败: {e}")
def update_pending_orders(self):
"""更新挂单状态(通过持仓变化判断订单是否成交)"""
try:
# 获取当前持仓
current_position = self.api.get_position()
current_position_type = 0
current_position_amount = 0.0
if current_position:
current_position_type = int(current_position.get('position_type', 0))
current_position_amount = abs(float(current_position.get('current_amount', 0)))
with self.order_lock:
# 检查挂单是否成交(通过持仓变化判断)
for order_id, order in list(self.pending_orders.items()):
if order.status == "pending":
# 检查订单是否超时
if time.time() - order.create_time > self.config.order_timeout:
# 订单超时,标记为取消
order.status = "cancelled"
logger.info(f"订单超时: {order_id} {order.side} @ {order.price}")
continue
# 简单判断:如果持仓方向与订单方向一致,可能已成交
# 注意这个方法不够精确实际应该通过API查询挂单状态
# 但由于使用浏览器下单无法直接获取订单ID这里简化处理
# 建议:定期刷新页面,通过页面上的挂单列表判断
pass
except Exception as e:
logger.error(f"更新挂单状态异常: {e}")
def _place_counter_order(self, filled_order: PendingOrder):
"""
订单成交后,在另一侧挂单
Args:
filled_order: 已成交的订单
"""
# 等待一小段时间,确保订单状态更新
time.sleep(0.1)
order_book = self.api.get_order_book()
if not order_book:
logger.warning("无法获取订单簿,无法挂反向单")
return
# 计算反向订单价格
buy_price, sell_price = self.calculate_order_prices(order_book)
if filled_order.side == "buy":
# 买单成交,挂卖单
if sell_price and self.trading_executor:
contract_size = self.config.order_size_usdt / sell_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor.place_limit_order("sell", sell_price, contract_size):
order_id = f"sell_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="sell",
price=sell_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"买单成交后挂卖单: {sell_price}, 订单ID: {order_id}")
else:
logger.warning("买单成交后挂卖单失败")
else:
# 卖单成交,挂买单平空或开多
if buy_price and self.trading_executor:
contract_size = self.config.order_size_usdt / buy_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor.place_limit_order("buy", buy_price, contract_size):
order_id = f"buy_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="buy",
price=buy_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"卖单成交后挂买单: {buy_price}, 订单ID: {order_id}")
else:
logger.warning("卖单成交后挂买单失败")
def place_market_making_orders(self):
"""放置做市订单"""
# 获取订单簿
order_book = self.api.get_order_book()
if not order_book or not order_book.mid_price:
logger.warning("无法获取订单簿")
return
# 检查持仓
position = self.api.get_position()
position_value = 0.0
if position:
current_price = order_book.mid_price
position_value = abs(float(position.get('current_amount', 0)) * current_price)
# 如果持仓超过限制,只挂反向单
if position_value >= self.config.max_position_usdt:
logger.warning(f"持仓超过限制: {position_value} USDT")
# 只挂反向单平仓
if position and float(position.get('position_type', 0)) == 1: # 多仓
# 挂卖单平多
_, sell_price = self.calculate_order_prices(order_book)
if sell_price:
self.api.place_limit_order("sell", sell_price, self.config.order_size_usdt)
elif position and float(position.get('position_type', 0)) == 2: # 空仓
# 挂买单平空
buy_price, _ = self.calculate_order_prices(order_book)
if buy_price:
self.api.place_limit_order("buy", buy_price, self.config.order_size_usdt)
return
# 计算挂单价格
buy_price, sell_price = self.calculate_order_prices(order_book)
if not buy_price or not sell_price:
return
# 检查当前挂单数量
with self.order_lock:
pending_buy_count = sum(1 for o in self.pending_orders.values()
if o.side == "buy" and o.status == "pending")
pending_sell_count = sum(1 for o in self.pending_orders.values()
if o.side == "sell" and o.status == "pending")
# 如果两侧都有挂单,不重复挂
if pending_buy_count > 0 and pending_sell_count > 0:
return
# 挂买单(通过浏览器)
if pending_buy_count == 0:
# 计算张数(根据合约规格调整)
# 假设页面输入框单位是张数需要将USDT金额转换为张数
# size_usdt / price = ETH数量再除以合约面值得到张数
contract_size = self.config.order_size_usdt / buy_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor and self.trading_executor.place_limit_order("buy", buy_price, contract_size):
# 使用时间戳作为订单ID
order_id = f"buy_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="buy",
price=buy_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"挂买单成功: {buy_price}, 订单ID: {order_id}")
else:
logger.warning("挂买单失败")
# 挂卖单(通过浏览器)
if pending_sell_count == 0:
# 计算张数
contract_size = self.config.order_size_usdt / sell_price / 0.01
if contract_size < 1:
contract_size = 1
if self.trading_executor and self.trading_executor.place_limit_order("sell", sell_price, contract_size):
# 使用时间戳作为订单ID
order_id = f"sell_{int(time.time() * 1000)}"
with self.order_lock:
self.pending_orders[order_id] = PendingOrder(
order_id=order_id,
side="sell",
price=sell_price,
size=self.config.order_size_usdt,
create_time=time.time(),
status="pending"
)
logger.info(f"挂卖单成功: {sell_price}, 订单ID: {order_id}")
else:
logger.warning("挂卖单失败")
self.last_order_refresh = time.time()
def check_risk_limits(self) -> bool:
"""检查风险限制"""
# 检查每日交易次数
if self.daily_trades >= self.config.max_daily_trades:
logger.warning(f"达到每日最大交易次数: {self.daily_trades}")
return False
# 检查每日亏损
if self.daily_profit <= -self.config.max_daily_loss:
logger.error(f"达到每日最大亏损: {self.daily_profit}")
send_dingtalk_message(f"做市策略达到每日最大亏损: {self.daily_profit} USDT", error=True)
return False
return True
def run(self):
"""主运行循环"""
self.running = True
logger.info("做市策略启动")
while self.running:
try:
# 检查风险限制
if not self.check_risk_limits():
logger.error("风险限制触发,停止策略")
break
# 撤销超时订单
self.cancel_stale_orders()
# 更新挂单状态
self.update_pending_orders()
# 刷新订单
if self.should_refresh_orders():
self.place_market_making_orders()
# 短暂休眠
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("收到中断信号,停止策略")
break
except Exception as e:
logger.error(f"策略运行异常: {e}")
time.sleep(1)
# 清理:刷新页面,手动撤销挂单
logger.info("清理挂单...")
try:
self.browser_manager.page.refresh()
time.sleep(1)
# 注意:实际撤销操作需要在页面上手动点击
with self.order_lock:
for order_id in list(self.pending_orders.keys()):
if self.pending_orders[order_id].status == "pending":
self.pending_orders[order_id].status = "cancelled"
except Exception as e:
logger.error(f"清理挂单失败: {e}")
logger.info("做市策略已停止")
def stop(self):
"""停止策略"""
self.running = False
# ================================================================
# 🚀 主程序
# ================================================================
if __name__ == '__main__':
config = MarketMakingConfig(
contract_symbol="ETHUSDT",
tge_id=196495, # TGE浏览器ID
trading_url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT",
spread_percent=0.01, # 0.01%价差
order_size_usdt=10.0, # 每单10 USDT
max_position_usdt=100.0, # 最大持仓100 USDT
order_refresh_interval=2.0, # 2秒刷新一次
order_timeout=60.0, # 60秒超时
max_daily_loss=50.0, # 每日最大亏损50 USDT
max_daily_trades=1000, # 每日最大1000笔
leverage="100",
open_type="cross"
)
strategy = MarketMakingStrategy(config)
try:
strategy.run()
except Exception as e:
logger.error(f"程序异常: {e}")
send_dingtalk_message(f"做市策略异常: {e}", error=True)

View File

Can't render this file because it is too large.

View File

Can't render this file because it is too large.

View File

Can't render this file because it is too large.

View File

Can't render this file because it is too large.

View File

Can't render this file because it is too large.

264
bitmart/框架.py Normal file
View File

@@ -0,0 +1,264 @@
import time
import uuid
import datetime
import requests
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self, tge_id):
self.tge_url = "http://127.0.0.1:50326"
self.tge_id = tge_id
self.tge_headers = {
"Authorization": "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
# 获取当前持仓方向
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
# 设置杠杆和全仓
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
res = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = res.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
"""接管浏览器"""
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def close_extra_tabs(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def 平仓(self):
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价最多或者做空1是最多-1是做空
limitPriceShortOrder 限价最多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开 TGE 失败!", error=True)
return
logger.info("TGE 端口获取成功")
# 2. 接管浏览器
if not self.take_over_browser():
self.ding("接管浏览器失败!", error=True)
return
logger.info("浏览器接管成功")
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=3000, clear=True)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
if __name__ == '__main__':
BitmartFuturesTransaction(tge_id=196495).action()

View File

@@ -13,7 +13,7 @@ from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self,tge_id):
def __init__(self, tge_id):
self.tge_url = "http://127.0.0.1:50326"
self.tge_id = tge_id
@@ -199,6 +199,37 @@ class BitmartFuturesTransaction:
except:
return False
def 平仓(self):
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价最多或者做空1是最多-1是做空
limitPriceShortOrder 限价最多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
@@ -220,8 +251,10 @@ class BitmartFuturesTransaction:
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=3000, clear=True)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')