diff --git a/1111 b/1111 index b087fb6..5343185 100644 --- a/1111 +++ b/1111 @@ -10,9 +10,15 @@ websea 账号:yangyicheng6666@gmail.com 密码:Wg138333. +api交易 +yto0mg6r05kkyolwefqw +db2e87093c120766bca60d2282y72760688 + yx20250715@gmail.com Abc12345678 + + 3. 更多交易信号 包住形态(原策略) 锤子线和上吊线 @@ -28,4 +34,12 @@ RSI超买超卖 时间止损:持仓超过24小时自动平仓 凯利公式改进版:仓位管理 最大回撤控制 -手续费考虑:0.05%交易手续费 \ No newline at end of file +手续费考虑:0.05%交易手续费 + + +申请API原因:我计划通过API实现自动化量化交易策略,以减少人工干预,提高交易效率并减少人为错误。通过API,我能够实时获取市场数据、执行交易指令、监控账户状态,并根据市场变化自动调整策略。 +申请API具体方式用途:我将使用API来获取实时的市场行情数据,执行各种量化策略(如网格交易、动量策略、均值回归等),并进行账户管理。API将自动监控并调整我的交易参数,比如仓位、止损、止盈等,同时还需要定期回测策略和优化模型。 +申请API主要交易什么币种:我主要通过API进行ETH和BTC的合约交易,还有一起其他的主流币 +申请API操作频率:预计API的调用频率将根据市场波动而变化,通常情况下,我的API请求频率大约在每秒5-10次,有时会根据策略的调整需要进行更频繁的请求。 +申请API大概体量:预计每日API请求量为5,000 - 15,000次,具体取决于市场波动和交易策略的复杂度。月度交易量预估为50,000 - 200,000 ETH/BTC。 +申请API对接后预估增长目标:通过API对接,我预计能够实现量化策略的高效执行和自动化交易,减少人工干预和错误。目标是在对接后的3个月内,将交易频率提高50%,并将月盈利提高20%至30%。 diff --git a/bit_tools.py b/bit_tools.py index cea8a63..51f073e 100644 --- a/bit_tools.py +++ b/bit_tools.py @@ -19,7 +19,8 @@ def createBrowser( port=None, proxyUserName=None, proxyPassword=None, - name='google' + name='google', + proxyType="socks5" ): # 创建或者更新窗口,指纹参数 browserFingerPrint 如没有特定需求,只需要指定下内核即可,如果需要更详细的参数,请参考文档 json_data = { "groupId": groupId, # 分组id @@ -27,7 +28,7 @@ def createBrowser( 'remark': '', # 备注 'proxyMethod': 1, # 代理方式 2自定义 3 提取IP # 代理类型 ['noproxy', 'http', 'https', 'socks5', 'ssh'] - 'proxyType': 'socks5', + 'proxyType': proxyType, 'host': host, # 代理主机EE 'port': port, # 代理端口 'proxyUserName': proxyUserName, # 代理账号 @@ -209,28 +210,27 @@ if __name__ == '__main__': # proxyPassword=ips_info.password # ) - # fz_datas = get_group_lists() - # # fz_datas['推特'] - # - # for i in range(10): - # for i in get_browser_lists_Browser(id=fz_datas['推特'], page=i): - # x_start_info = Xstart.get_or_none( - # Xstart.bit_id == i["id"] - # ) - # - # if not x_start_info: - # deleteBrowser(id=i["id"]) - # - # continue - # - # if x_start_info.start: - # continue - # - # deleteBrowser(id=i["id"]) - # - # # x_start_info.delete_instance() - # x_start_info.bit_id = None - # x_start_info.save() + fz_datas = get_group_lists() + # fz_datas['推特'] + + for i in range(10): + for i in get_browser_lists_Browser(id=fz_datas['推特'], page=i): + x_start_info = Xstart.get_or_none( + Xstart.bit_id == i["id"] + ) + + if not x_start_info: + deleteBrowser(id=i["id"]) + + continue + + if x_start_info.start: + continue + + deleteBrowser(id=i["id"]) + + x_start_info.bit_id = None + x_start_info.save() # for i in Xstart.select(): # res = browser_detail(id=i.bit_id) @@ -243,20 +243,20 @@ if __name__ == '__main__': # print(browser_detail(id="532651f5330e4caa917e644f9b676b")) # # 批量修改代理 - # for i in Xstart.select(): + # for i in Xstart.select().where(Xstart.start == 0): # update_proxy_Browser(id=i.bit_id, proxyType="http", host="104.168.59.92", port=random.randint(20001, 25000), ) - fz_datas = get_group_lists() - print(fz_datas) - bit_id_list = [] - for i in XToken.select().where(XToken.account_start == 2): - sql_info = Xstart.get_or_none( - Xstart.x_id == i.id - ) - - bit_id_list.append(sql_info.bit_id) - - print(len(bit_id_list)) - print(bit_id_list) - - print(group_update(fz_datas["西班牙语"], bit_id_list)) + # fz_datas = get_group_lists() + # print(fz_datas) + # bit_id_list = [] + # for i in XToken.select().where(XToken.account_start == 2): + # sql_info = Xstart.get_or_none( + # Xstart.x_id == i.id + # ) + # + # bit_id_list.append(sql_info.bit_id) + # + # print(len(bit_id_list)) + # print(bit_id_list) + # + # print(group_update(fz_datas["西班牙语"], bit_id_list)) diff --git a/bitmart/api高频交易.py b/bitmart/api高频交易.py new file mode 100644 index 0000000..1eeb99e --- /dev/null +++ b/bitmart/api高频交易.py @@ -0,0 +1,291 @@ +import time +import uuid +import datetime +from tqdm import tqdm +from loguru import logger +from bitmart.api_contract import APIContract +from bitmart.lib.cloud_exceptions import APIException +from 交易.tools import send_dingtalk_message + + +class BitmartFuturesTransaction: + def __init__(self): + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 + self.direction = None + + self.pbar = tqdm(total=30, desc="等待K线", ncols=80) + + self.last_kline_time = None + + self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) + self.open_type = "cross" # 全仓模式(你的“成本开仓”需求) + self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% + + self.open_avg_price = None # 开仓价格 + self.current_amount = None # 持仓量 + + def is_bullish(self, c): + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): + return float(c['close']) < float(c['open']) + + def is_trending(self, klines): + """判断是否为单边行情,通过布林带或RSI""" + close_prices = [kline['close'] for kline in klines] + rsi_value = self.calculate_rsi(close_prices, 14) # 使用14期的RSI + if rsi_value > 70 or rsi_value < 30: + return True # 单边行情 + return False # 震荡行情 + + def calculate_rsi(self, prices, period=14): + """计算RSI指标""" + deltas = [prices[i] - prices[i - 1] for i in range(1, len(prices))] + gains = [delta if delta > 0 else 0 for delta in deltas] + losses = [-delta if delta < 0 else 0 for delta in deltas] + + avg_gain = sum(gains[:period]) / period + avg_loss = sum(losses[:period]) / period + + rs = avg_gain / avg_loss if avg_loss != 0 else 0 + rsi = 100 - (100 / (1 + rs)) + return rsi + + def get_klines(self): + """获取最近3根30分钟K线(step=30)""" + try: + end_time = int(time.time()) + # 获取足够多的条目确保有最新3根 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=30, # 30分钟 + start_time=end_time - 3600 * 10, # 取最近10小时 + end_time=end_time + )[0]["data"] + + # 每根: [timestamp, open, high, low, close, volume] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted[-3:] # 最近3根: kline_1 (最老), kline_2, kline_3 (最新) + except Exception as e: + logger.error(f"获取K线异常: {e}") + self.ding(error=True, msg="获取K线异常") + return None + + def get_current_price(self): + """获取当前最新价格,用于计算张数""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, # 1分钟 + start_time=end_time - 3600 * 10, # 取最近10小时 + end_time=end_time + )[0] + if response['code'] == 1000: + return float(response['data'][0]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + """获取合约账户可用USDT余额""" + try: + response = self.contractAPI.get_assets_detail()[0] + if response['code'] == 1000: + data = response['data'] + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + elif isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + """获取当前持仓方向""" + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response['code'] == 1000: + positions = response['data'] + if not positions: + self.start = 0 + return True + self.start = 1 if positions[0]['position_type'] == 1 else -1 + self.open_avg_price = positions[0]['open_avg_price'] + self.current_amount = positions[0]['current_amount'] + self.position_cross = positions[0]["position_cross"] + return True + + else: + return False + + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def calculate_size(self): + """计算开仓张数:使用可用余额的1%作为保证金""" + balance = self.get_available_balance() + self.balance = balance + if not balance or balance < 10: + logger.warning("余额不足,无法开仓") + return 0 + + price = self.get_current_price() + if not price: + price = 3000 # 保守估计,避免size过大 + + leverage = int(self.leverage) + + margin = balance * self.risk_percent # 使用1%余额 + # ETHUSDT 1张 ≈ 0.001 ETH + size = int((margin * leverage) / (price * 0.001)) + size = max(1, size) + + logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})") + return size + + def place_market_order(self, side: int, size: int): + if size <= 0: + return False + + client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}" + + try: + response = self.contractAPI.post_submit_order( + contract_symbol=self.contract_symbol, + client_order_id=client_order_id, + side=side, + mode=1, + type='market', + leverage=self.leverage, + open_type=self.open_type, + size=size + )[0] + + if response['code'] == 1000: + logger.success( + f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}张") + return True + else: + logger.error(f"下单失败: {response}") + return False + except APIException as e: + logger.error(f"API下单异常: {e}") + return False + + def check_signal(self, prev, curr): + """简化英戈尔夫形态""" + if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']): + return "long" + if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']): + return "short" + return None + + def execute_trade(self): + """执行交易逻辑,根据市场状态切换策略""" + klines = self.get_klines() + if not klines or len(klines) < 3: + return + + if self.is_trending(klines): # 单边行情 + self.direction = self.check_signal(klines[1], klines[2]) + if self.direction: + logger.success(f"检测到{self.direction}信号,准备开仓(用余额1%)") + self.execute_trade() # 执行趋势跟随交易 + + else: # 震荡行情 + self.execute_grid_trade() # 执行网格交易策略 + + def execute_grid_trade(self): + """网格交易策略""" + logger.info("开始网格交易") + + # 获取当前价格 + current_price = self.get_current_price() + if not current_price: + logger.error("无法获取当前价格,网格交易无法执行") + return + + # 假设的网格区间(可以根据需要调整) + grid_step = 10 # 每次10USDT为一个网格 + grid_size = 1 # 每次开仓数量,单位ETH + + # 计算上网格和下网格价格 + lower_price = current_price - grid_step # 下网格价格 + upper_price = current_price + grid_step # 上网格价格 + + # 生成买卖网格订单 + try: + # 设置买单 + buy_order = self.place_market_order(side=1, size=grid_size) # 开多 + if buy_order: + logger.info(f"已成功设置买单,买入价格:{lower_price},数量:{grid_size} ETH") + + # 设置卖单 + sell_order = self.place_market_order(side=4, size=grid_size) # 开空 + if sell_order: + logger.info(f"已成功设置卖单,卖出价格:{upper_price},数量:{grid_size} ETH") + + except Exception as e: + logger.error(f"网格交易下单失败: {e}") + + def set_leverage(self): + """程序启动时设置全仓 + 高杠杆""" + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response['code'] == 1000: + logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") + return True + else: + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + def action(self): + # 启动时设置全仓高杠杆 + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + return + + while True: + current_minute = datetime.datetime.now().minute + if current_minute < 30: + self.pbar.n = current_minute + else: + self.pbar.n = current_minute - 30 + self.pbar.refresh() + + self.execute_trade() + time.sleep(2) # 高频交易,减少等待时间 + + +if __name__ == '__main__': + BitmartFuturesTransaction().action() diff --git a/bitmart/strategy_summary_2025_12.txt b/bitmart/strategy_summary_2025_12.txt deleted file mode 100644 index d8b324a..0000000 --- a/bitmart/strategy_summary_2025_12.txt +++ /dev/null @@ -1,19 +0,0 @@ -============================================================ -ETHUSDT ײܽ -============================================================ - -: ƸǶ & ͸̬ -: 1K -ʱ: 202512 -ʼʽ: $10000.00 -λС: 0.1 ETH -ֹ: 2.0% -ֹӯ: 3.0% - -ܽ״: 35 -ӯ: 15 -: 20 -ʤ: 42.86% -ӯ: $14.84 -ʽ: $10014.84 -: 0.15% diff --git a/bitmart/乌云盖顶和刺透形态.py b/bitmart/乌云盖顶和刺透形态.py deleted file mode 100644 index 75c2e05..0000000 --- a/bitmart/乌云盖顶和刺透形态.py +++ /dev/null @@ -1,448 +0,0 @@ -import time -import csv -from datetime import datetime -import pandas as pd -import numpy as np -from typing import List, Dict, Optional, Tuple - -# ------------------ 配置 ------------------ -START_YEAR = 2025 -CONTRACT_SYMBOL = "ETHUSDT" -STEP = 3 # K 线周期,单位分钟 -CSV_FILE = f"kline_{STEP}.csv" - - -# ------------------ 策略参数 ------------------ -class TradingStrategy: - def __init__(self, initial_balance: float = 10000): - self.positions = [] # 存储持仓信息 - self.trades = [] # 存储交易记录 - self.initial_balance = initial_balance - self.current_balance = initial_balance - self.position_size = 0.1 # 每次开仓数量(ETH) - self.stop_loss_pct = 0.02 # 止损比例 2% - self.take_profit_pct = 0.03 # 止盈比例 3% - self.max_positions = 1 # 最大同时持仓数 - - def detect_dark_cloud_cover(self, df: pd.DataFrame, i: int) -> bool: - """检测乌云盖顶形态(看跌)""" - if i < 1: - return False - - prev_candle = df.iloc[i - 1] - curr_candle = df.iloc[i] - - # 前一根是阳线 - if prev_candle['close'] <= prev_candle['open']: - return False - - # 当前是阴线 - if curr_candle['close'] >= curr_candle['open']: - return False - - # 当前开盘价高于前一根最高价 - if curr_candle['open'] <= prev_candle['high']: - return False - - # 当前收盘价低于前一根实体的50%以下 - prev_body_mid = (prev_candle['open'] + prev_candle['close']) / 2 - if curr_candle['close'] >= prev_body_mid: - return False - - # 可选:添加成交量确认 - # if curr_candle['volume'] < prev_candle['volume']: - # return False - - return True - - def detect_piercing_pattern(self, df: pd.DataFrame, i: int) -> bool: - """检测刺透形态(看涨)""" - if i < 1: - return False - - prev_candle = df.iloc[i - 1] - curr_candle = df.iloc[i] - - # 前一根是阴线 - if prev_candle['close'] >= prev_candle['open']: - return False - - # 当前是阳线 - if curr_candle['close'] <= curr_candle['open']: - return False - - # 当前开盘价低于前一根最低价 - if curr_candle['open'] >= prev_candle['low']: - return False - - # 当前收盘价高于前一根实体的50%以上 - prev_body_mid = (prev_candle['open'] + prev_candle['close']) / 2 - if curr_candle['close'] <= prev_body_mid: - return False - - # 可选:添加成交量确认 - # if curr_candle['volume'] < prev_candle['volume']: - # return False - - return True - - def open_position(self, direction: str, price: float, timestamp: int, reason: str): - """开仓""" - # 检查是否达到最大持仓限制 - if len(self.positions) >= self.max_positions: - return False - - position = { - 'direction': direction, # 'long' 或 'short' - 'open_price': price, - 'open_time': timestamp, - 'open_reason': reason, - 'size': self.position_size, - 'status': 'open', - 'stop_loss': self.calculate_stop_loss(direction, price), - 'take_profit': self.calculate_take_profit(direction, price) - } - self.positions.append(position) - - print(f"\n📈 开仓信号 @ {datetime.fromtimestamp(timestamp)}") - print(f" 方向: {'做多' if direction == 'long' else '做空'}") - print(f" 价格: ${price:.2f}") - print(f" 数量: {self.position_size} ETH") - print(f" 止损: ${position['stop_loss']:.2f}") - print(f" 止盈: ${position['take_profit']:.2f}") - print(f" 原因: {reason}") - - return True - - def calculate_stop_loss(self, direction: str, price: float) -> float: - """计算止损价""" - if direction == 'long': - return price * (1 - self.stop_loss_pct) - else: - return price * (1 + self.stop_loss_pct) - - def calculate_take_profit(self, direction: str, price: float) -> float: - """计算止盈价""" - if direction == 'long': - return price * (1 + self.take_profit_pct) - else: - return price * (1 - self.take_profit_pct) - - def close_position(self, position_idx: int, price: float, timestamp: int, reason: str): - """平仓""" - position = self.positions[position_idx] - - # 计算盈亏 - if position['direction'] == 'long': - pnl_pct = (price - position['open_price']) / position['open_price'] - else: - pnl_pct = (position['open_price'] - price) / position['open_price'] - - pnl_amount = self.position_size * position['open_price'] * pnl_pct - - # 更新持仓状态 - position.update({ - 'close_price': price, - 'close_time': timestamp, - 'close_reason': reason, - 'pnl_pct': pnl_pct * 100, # 百分比 - 'pnl_amount': pnl_amount, - 'status': 'closed' - }) - - # 更新余额 - self.current_balance += pnl_amount - - print(f"\n📉 平仓信号 @ {datetime.fromtimestamp(timestamp)}") - print(f" 方向: {'平多单' if position['direction'] == 'long' else '平空单'}") - print(f" 开仓价: ${position['open_price']:.2f}") - print(f" 平仓价: ${price:.2f}") - print(f" 盈亏: {pnl_pct * 100:.2f}% (${pnl_amount:.2f})") - print(f" 原因: {reason}") - print(f" 当前余额: ${self.current_balance:.2f}") - - # 记录交易 - trade_record = position.copy() - trade_record['duration'] = timestamp - position['open_time'] - trade_record['duration_minutes'] = trade_record['duration'] / 60 - self.trades.append(trade_record) - - # 从持仓列表中移除已平仓的仓位 - self.positions.pop(position_idx) - - return True - - def check_stop_loss_take_profit(self, df: pd.DataFrame, i: int): - """检查止损止盈""" - current_price = df.iloc[i]['close'] - current_time = df.iloc[i]['id'] - current_high = df.iloc[i]['high'] - current_low = df.iloc[i]['low'] - - positions_to_close = [] - - for idx, position in enumerate(self.positions): - if position['status'] != 'open': - continue - - close_reason = None - close_price = current_price - - if position['direction'] == 'long': - # 多头止损检查:最低价是否触及止损 - if current_low <= position['stop_loss']: - close_reason = "止损触发" - close_price = position['stop_loss'] # 使用止损价 - # 多头止盈检查:最高价是否触及止盈 - elif current_high >= position['take_profit']: - close_reason = "止盈触发" - close_price = position['take_profit'] # 使用止盈价 - else: - # 空头止损检查:最高价是否触及止损 - if current_high >= position['stop_loss']: - close_reason = "止损触发" - close_price = position['stop_loss'] # 使用止损价 - # 空头止盈检查:最低价是否触及止盈 - elif current_low <= position['take_profit']: - close_reason = "止盈触发" - close_price = position['take_profit'] # 使用止盈价 - - if close_reason: - positions_to_close.append((idx, close_price, close_reason)) - - # 按索引从大到小平仓,避免索引错乱 - positions_to_close.sort(reverse=True) - for idx, close_price, close_reason in positions_to_close: - self.close_position(idx, close_price, current_time, close_reason) - - def analyze_trades(self): - """分析交易结果""" - if not self.trades: - return { - 'total_trades': 0, - 'winning_trades': 0, - 'losing_trades': 0, - 'win_rate': 0, - 'total_pnl': 0, - 'avg_pnl': 0, - 'max_win': 0, - 'max_loss': 0, - 'profit_factor': 0 - } - - total_trades = len(self.trades) - winning_trades = [t for t in self.trades if t['pnl_amount'] > 0] - losing_trades = [t for t in self.trades if t['pnl_amount'] < 0] - - total_pnl = sum(t['pnl_amount'] for t in self.trades) - total_win = sum(t['pnl_amount'] for t in winning_trades) - total_loss = abs(sum(t['pnl_amount'] for t in losing_trades)) - - win_rate = len(winning_trades) / total_trades * 100 if total_trades > 0 else 0 - avg_pnl = total_pnl / total_trades if total_trades > 0 else 0 - max_win = max(t['pnl_amount'] for t in winning_trades) if winning_trades else 0 - max_loss = min(t['pnl_amount'] for t in losing_trades) if losing_trades else 0 - profit_factor = total_win / total_loss if total_loss > 0 else float('inf') - - return { - 'total_trades': total_trades, - 'winning_trades': len(winning_trades), - 'losing_trades': len(losing_trades), - 'win_rate': win_rate, - 'total_pnl': total_pnl, - 'avg_pnl': avg_pnl, - 'max_win': max_win, - 'max_loss': max_loss, - 'profit_factor': profit_factor, - 'final_balance': self.current_balance, - 'total_return': ((self.current_balance - self.initial_balance) / self.initial_balance * 100) - } - - -# ------------------ 主程序 ------------------ -def main(): - print("=" * 60) - print("ETHUSDT 交易策略 - 乌云盖顶 & 刺透形态") - print(f"数据周期: {STEP}分钟K线") - print(f"分析时间: 2025年12月") - print("=" * 60) - - # 1. 从CSV文件读取数据 - print(f"\n📥 正在从 {CSV_FILE} 读取数据...") - - try: - df = pd.read_csv(CSV_FILE) - print(f"成功读取 {len(df)} 条K线数据") - - # 转换时间戳为datetime - df['datetime'] = pd.to_datetime(df['id'], unit='s') - df.set_index('datetime', inplace=True) - - # 按时间排序 - df = df.sort_index() - - except FileNotFoundError: - print(f"❌ 错误: 文件 {CSV_FILE} 不存在") - return - except Exception as e: - print(f"❌ 读取文件时出错: {e}") - return - - # 2. 过滤出12月份数据 - dec_2025_start = pd.Timestamp('2025-12-01') - dec_2025_end = pd.Timestamp('2025-12-31 23:59:59') - - # 确保数据在指定范围内 - mask = (df.index >= dec_2025_start) & (df.index <= dec_2025_end) - dec_df = df.loc[mask].copy() - - if len(dec_df) == 0: - print("❌ 未找到2025年12月的数据") - print(f"数据时间范围: {df.index[0]} 到 {df.index[-1]}") - return - - print(f"\n📊 12月份数据: {len(dec_df)} 条K线") - print(f"时间范围: {dec_df.index[0]} 到 {dec_df.index[-1]}") - - # 显示数据预览 - print(f"\n数据预览:") - print(dec_df[['open', 'high', 'low', 'close', 'volume']].head()) - - # 3. 初始化策略 - strategy = TradingStrategy(initial_balance=10000) - - # 4. 运行策略 - print("\n" + "=" * 60) - print("开始执行交易策略...") - print("=" * 60) - - for i in range(1, len(dec_df)): - current_time = int(dec_df.iloc[i]['id']) - current_price = dec_df.iloc[i]['close'] - - # 首先检查止损止盈 - strategy.check_stop_loss_take_profit(dec_df, i) - - # 如果有持仓,跳过新信号(单次只持有一个仓位) - if len(strategy.positions) >= strategy.max_positions: - continue - - # 检测形态 - dark_cloud = strategy.detect_dark_cloud_cover(dec_df, i) - piercing = strategy.detect_piercing_pattern(dec_df, i) - - # 处理信号 - if dark_cloud: - # 乌云盖顶 - 看跌信号,开空仓 - strategy.open_position('short', current_price, current_time, "乌云盖顶形态") - - elif piercing: - # 刺透形态 - 看涨信号,开多仓 - strategy.open_position('long', current_price, current_time, "刺透形态") - - # 5. 强制平掉所有未平仓 - print("\n" + "=" * 60) - print("强制平仓所有持仓...") - print("=" * 60) - - if strategy.positions: - last_price = dec_df.iloc[-1]['close'] - last_time = dec_df.iloc[-1]['id'] - - # 按索引从大到小平仓 - for idx in range(len(strategy.positions) - 1, -1, -1): - strategy.close_position(idx, last_price, last_time, "策略结束强制平仓") - else: - print("没有需要平仓的持仓") - - # 6. 生成交易报告 - print("\n" + "=" * 60) - print("📊 交易汇总报告") - print("=" * 60) - - analysis = strategy.analyze_trades() - - if strategy.trades: - print(f"总交易次数: {analysis['total_trades']}") - print(f"盈利交易: {analysis['winning_trades']} 次") - print(f"亏损交易: {analysis['losing_trades']} 次") - print(f"胜率: {analysis['win_rate']:.2f}%") - print(f"总盈亏: ${analysis['total_pnl']:.2f}") - print(f"平均每笔盈亏: ${analysis['avg_pnl']:.2f}") - print(f"最大盈利: ${analysis['max_win']:.2f}") - print(f"最大亏损: ${analysis['max_loss']:.2f}") - print(f"盈利因子: {analysis['profit_factor']:.2f}") - print(f"初始资金: ${strategy.initial_balance:.2f}") - print(f"最终资金: ${analysis['final_balance']:.2f}") - print(f"总收益率: {analysis['total_return']:.2f}%") - - # 打印每笔交易详情 - print("\n" + "-" * 60) - print("详细交易记录:") - print("-" * 60) - - for i, trade in enumerate(strategy.trades, 1): - print(f"\n交易 #{i}:") - print(f" 方向: {'做多' if trade['direction'] == 'long' else '做空'}") - print(f" 开仓时间: {datetime.fromtimestamp(trade['open_time'])}") - print(f" 开仓价格: ${trade['open_price']:.2f}") - print(f" 平仓时间: {datetime.fromtimestamp(trade['close_time'])}") - print(f" 平仓价格: ${trade['close_price']:.2f}") - print(f" 持仓时间: {trade['duration_minutes']:.1f} 分钟") - print(f" 盈亏: ${trade['pnl_amount']:.2f} ({trade['pnl_pct']:.2f}%)") - print(f" 原因: {trade['open_reason']} -> {trade['close_reason']}") - else: - print("本月无交易记录") - - # 7. 保存交易记录到CSV - if strategy.trades: - trades_df = pd.DataFrame(strategy.trades) - - # 格式化时间列 - trades_df['open_datetime'] = trades_df['open_time'].apply(lambda x: datetime.fromtimestamp(x)) - trades_df['close_datetime'] = trades_df['close_time'].apply(lambda x: datetime.fromtimestamp(x)) - - # 选择要保存的列 - columns_to_save = [ - 'open_datetime', 'close_datetime', 'direction', 'open_price', - 'close_price', 'size', 'pnl_amount', 'pnl_pct', 'duration_minutes', - 'open_reason', 'close_reason' - ] - - trades_csv = f"eth_trades_{START_YEAR}_12.csv" - trades_df[columns_to_save].to_csv(trades_csv, index=False) - print(f"\n✅ 交易记录已保存到: {trades_csv}") - - # 8. 保存策略参数和结果 - with open(f"strategy_summary_{START_YEAR}_12.txt", 'w') as f: - f.write("=" * 60 + "\n") - f.write("ETHUSDT 交易策略总结\n") - f.write("=" * 60 + "\n\n") - f.write(f"策略名称: 乌云盖顶 & 刺透形态\n") - f.write(f"数据周期: {STEP}分钟K线\n") - f.write(f"分析时间: 2025年12月\n") - f.write(f"初始资金: ${strategy.initial_balance:.2f}\n") - f.write(f"仓位大小: {strategy.position_size} ETH\n") - f.write(f"止损比例: {strategy.stop_loss_pct * 100}%\n") - f.write(f"止盈比例: {strategy.take_profit_pct * 100}%\n\n") - - if strategy.trades: - f.write(f"总交易次数: {analysis['total_trades']}\n") - f.write(f"盈利交易: {analysis['winning_trades']} 次\n") - f.write(f"亏损交易: {analysis['losing_trades']} 次\n") - f.write(f"胜率: {analysis['win_rate']:.2f}%\n") - f.write(f"总盈亏: ${analysis['total_pnl']:.2f}\n") - f.write(f"最终资金: ${analysis['final_balance']:.2f}\n") - f.write(f"总收益率: {analysis['total_return']:.2f}%\n") - else: - f.write("本月无交易记录\n") - - print(f"\n✅ 策略总结已保存到: strategy_summary_{START_YEAR}_12.txt") - - print("\n" + "=" * 60) - print("策略执行完成!") - print("=" * 60) - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/bitmart/指数平均线.py b/bitmart/指数平均线.py deleted file mode 100644 index 7cfeef7..0000000 --- a/bitmart/指数平均线.py +++ /dev/null @@ -1,768 +0,0 @@ -""" -量化交易回测系统 - EMA交叉策略(双EMA/三EMA) -""" -import csv -import datetime -import numpy as np -from typing import List, Dict, Optional, Tuple - - -# ========================= EMA计算函数 ========================= - -def calculate_ema(prices: List[float], period: int) -> List[Optional[float]]: - """ - 计算指数移动平均线(EMA) - - Args: - prices: 价格列表(通常是收盘价) - period: EMA周期 - - Returns: - EMA值列表,前period-1个为None - """ - if len(prices) < period: - return [None] * len(prices) - - ema_values = [None] * (period - 1) - - # 计算初始SMA作为EMA的起点 - sma = sum(prices[:period]) / period - - # EMA计算公式:EMA_today = (Price_today * (2/(period+1))) + (EMA_yesterday * (1 - (2/(period+1)))) - multiplier = 2 / (period + 1) - - # 第一个EMA值 - ema = sma - ema_values.append(ema) - - # 计算后续EMA值 - for price in prices[period:]: - ema = (price * multiplier) + (ema * (1 - multiplier)) - ema_values.append(ema) - - return ema_values - - -# ========================= 策略核心函数 ========================= - -def check_ema_cross(ema_fast: List[Optional[float]], - ema_slow: List[Optional[float]], - idx: int) -> Optional[str]: - """ - 检查EMA金叉/死叉 - - Args: - ema_fast: 快线EMA值列表 - ema_slow: 慢线EMA值列表 - idx: 当前K线索引 - - Returns: - "golden" - 金叉(做多信号) - "dead" - 死叉(做空信号) - None - 无交叉 - """ - if idx < 1: - return None - - # 确保有足够的数据 - if ema_fast[idx] is None or ema_fast[idx - 1] is None: - return None - if ema_slow[idx] is None or ema_slow[idx - 1] is None: - return None - - # 前一根K线快线在慢线下方,当前K线快线上穿慢线 -> 金叉 - if ema_fast[idx - 1] < ema_slow[idx - 1] and ema_fast[idx] > ema_slow[idx]: - return "golden" - - # 前一根K线快线在慢线上方,当前K线快线下穿慢线 -> 死叉 - if ema_fast[idx - 1] > ema_slow[idx - 1] and ema_fast[idx] < ema_slow[idx]: - return "dead" - - return None - - -def check_triple_ema_cross(ema_fast: List[Optional[float]], - ema_mid: List[Optional[float]], - ema_slow: List[Optional[float]], - idx: int) -> Optional[str]: - """ - 检查三EMA交叉(更稳定的信号) - 规则:快线 > 中线 > 慢线 -> 多头排列 -> 做多 - 快线 < 中线 < 慢线 -> 空头排列 -> 做空 - - Args: - ema_fast: 快线(如EMA7) - ema_mid: 中线(如EMA14) - ema_slow: 慢线(如EMA30) - idx: 当前K线索引 - - Returns: - "golden" - 金叉多头排列 - "dead" - 死叉空头排列 - None - 无明确信号 - """ - if idx < 1: - return None - - # 确保有足够的数据 - if any(ema[idx] is None or ema[idx - 1] is None for ema in [ema_fast, ema_mid, ema_slow]): - return None - - # 检查是否形成多头排列(EMA快 > 中 > 慢) - current_fast = ema_fast[idx] - current_mid = ema_mid[idx] - current_slow = ema_slow[idx] - prev_fast = ema_fast[idx - 1] - prev_mid = ema_mid[idx - 1] - prev_slow = ema_slow[idx - 1] - - # 多头排列条件:快线 > 中线 > 慢线 - is_golden_triple = current_fast > current_mid > current_slow - was_not_golden = not (prev_fast > prev_mid > prev_slow) - - # 空头排列条件:快线 < 中线 < 慢线 - is_dead_triple = current_fast < current_mid < current_slow - was_not_dead = not (prev_fast < prev_mid < prev_slow) - - if is_golden_triple and was_not_golden: - return "golden" - elif is_dead_triple and was_not_dead: - return "dead" - - return None - - -# ========================= 回测引擎 ========================= - -class EMABacktester: - """EMA交叉策略回测器""" - - def __init__(self, - fast_period: int = 12, - slow_period: int = 26, - signal_period: int = 9, # 用于MACD信号线 - use_triple_ema: bool = False, - mid_period: int = 14, # 三EMA时的中间周期 - use_macd_confirmation: bool = False, - stop_loss_pct: float = 0.02, # 2%止损 - take_profit_pct: float = 0.05, # 5%止盈 - trailing_stop_pct: float = 0.03): # 3%移动止损 - - self.fast_period = fast_period - self.slow_period = slow_period - self.signal_period = signal_period - self.use_triple_ema = use_triple_ema - self.mid_period = mid_period - self.use_macd_confirmation = use_macd_confirmation - self.stop_loss_pct = stop_loss_pct - self.take_profit_pct = take_profit_pct - self.trailing_stop_pct = trailing_stop_pct - - self.stats = { - 'golden_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '金叉做多'}, - 'dead_cross': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '死叉做空'}, - } - - def calculate_macd(self, prices: List[float]) -> Tuple[List[Optional[float]], - List[Optional[float]], - List[Optional[float]]]: - """ - 计算MACD指标 - Returns: (MACD线, 信号线, 柱状图) - """ - # 计算快慢EMA - ema_fast = calculate_ema(prices, self.fast_period) - ema_slow = calculate_ema(prices, self.slow_period) - - # 计算MACD线 = EMA快线 - EMA慢线 - macd_line = [] - for i in range(len(prices)): - if ema_fast[i] is not None and ema_slow[i] is not None: - macd_line.append(ema_fast[i] - ema_slow[i]) - else: - macd_line.append(None) - - # 计算信号线(MACD的EMA) - signal_line = calculate_ema([x for x in macd_line if x is not None] if any(macd_line) else [], - self.signal_period) - - # 补全None值 - signal_line_extended = [None] * (len(prices) - len(signal_line)) + signal_line if len(signal_line) < len( - prices) else signal_line - - # 计算柱状图 - histogram = [] - for i in range(len(prices)): - if macd_line[i] is not None and signal_line_extended[i] is not None: - histogram.append(macd_line[i] - signal_line_extended[i]) - else: - histogram.append(None) - - return macd_line, signal_line_extended, histogram - - def backtest(self, data: List[Dict]) -> Tuple[List[Dict], Dict]: - """ - 执行EMA交叉策略回测 - - Args: - data: K线数据列表 - - Returns: - trades: 交易记录列表 - stats: 统计数据 - """ - # 提取收盘价 - close_prices = [float(c['close']) for c in data] - - # 计算EMA - ema_fast = calculate_ema(close_prices, self.fast_period) - ema_slow = calculate_ema(close_prices, self.slow_period) - - # 如果需要三EMA,计算中间EMA - ema_mid = None - if self.use_triple_ema: - ema_mid = calculate_ema(close_prices, self.mid_period) - - # 如果需要MACD确认,计算MACD - macd_line, signal_line, histogram = None, None, None - if self.use_macd_confirmation: - macd_line, signal_line, histogram = self.calculate_macd(close_prices) - - trades: List[Dict] = [] - current_position: Optional[Dict] = None - highest_price_since_entry = 0 # 用于移动止损 - lowest_price_since_entry = float('inf') # 用于移动止损 - - # 遍历K线数据(跳过前几个没有EMA值的) - start_idx = max(self.fast_period, self.slow_period) - if self.use_triple_ema: - start_idx = max(start_idx, self.mid_period) - - for idx in range(start_idx, len(data)): - current_bar = data[idx] - current_price = float(current_bar['close']) - open_price = float(current_bar['open']) - - # ========== 信号检测 ========== - signal = None - - # 基础双EMA交叉信号 - if not self.use_triple_ema: - signal = check_ema_cross(ema_fast, ema_slow, idx) - # 三EMA排列信号 - else: - signal = check_triple_ema_cross(ema_fast, ema_mid, ema_slow, idx) - - # MACD确认(可选) - if signal and self.use_macd_confirmation: - macd_confirmed = False - if signal == "golden": - # 金叉确认:MACD线在信号线上方且柱状图为正 - if macd_line[idx] is not None and signal_line[idx] is not None: - macd_confirmed = (macd_line[idx] > signal_line[idx]) and ( - histogram[idx] is not None and histogram[idx] > 0) - elif signal == "dead": - # 死叉确认:MACD线在信号线下方且柱状图为负 - if macd_line[idx] is not None and signal_line[idx] is not None: - macd_confirmed = (macd_line[idx] < signal_line[idx]) and ( - histogram[idx] is not None and histogram[idx] < 0) - - if not macd_confirmed: - signal = None - - # ========== 空仓时开仓 ========== - if current_position is None and signal: - # 下一根K线开盘价入场 - if idx + 1 < len(data): - entry_price = float(data[idx + 1]['open']) - - if signal == "golden": # 做多 - current_position = { - 'direction': 'long', - 'entry_price': entry_price, - 'entry_time': data[idx + 1]['id'], - 'entry_idx': idx + 1, - 'signal': 'golden_cross', - 'highest_price': entry_price, # 用于移动止损 - 'lowest_price': entry_price, # 用于空头的移动止损 - } - self.stats['golden_cross']['count'] += 1 - - elif signal == "dead": # 做空 - current_position = { - 'direction': 'short', - 'entry_price': entry_price, - 'entry_time': data[idx + 1]['id'], - 'entry_idx': idx + 1, - 'signal': 'dead_cross', - 'highest_price': entry_price, - 'lowest_price': entry_price, - } - self.stats['dead_cross']['count'] += 1 - - # 跳过下一根,因为已经在这根K线开盘入场 - continue - - # ========== 持仓时处理 ========== - if current_position: - pos_dir = current_position['direction'] - entry_price = current_position['entry_price'] - signal_key = current_position['signal'] - - # 更新最高/最低价(用于移动止损) - if pos_dir == 'long': - current_position['highest_price'] = max(current_position['highest_price'], current_price) - else: # short - current_position['lowest_price'] = min(current_position['lowest_price'], current_price) - - # ========== 止损止盈检查 ========== - should_exit = False - exit_reason = "" - exit_price = current_price # 默认用收盘价平仓 - - # 固定止损 - if pos_dir == 'long': - stop_loss_price = entry_price * (1 - self.stop_loss_pct) - if current_price <= stop_loss_price: - should_exit = True - exit_reason = "止损" - exit_price = stop_loss_price - - # 固定止盈 - take_profit_price = entry_price * (1 + self.take_profit_pct) - if current_price >= take_profit_price: - should_exit = True - exit_reason = "止盈" - exit_price = take_profit_price - - # 移动止损 - trailing_stop_price = current_position['highest_price'] * (1 - self.trailing_stop_pct) - if current_price <= trailing_stop_price: - should_exit = True - exit_reason = "移动止损" - exit_price = trailing_stop_price - - else: # short - stop_loss_price = entry_price * (1 + self.stop_loss_pct) - if current_price >= stop_loss_price: - should_exit = True - exit_reason = "止损" - exit_price = stop_loss_price - - # 固定止盈 - take_profit_price = entry_price * (1 - self.take_profit_pct) - if current_price <= take_profit_price: - should_exit = True - exit_reason = "止盈" - exit_price = take_profit_price - - # 移动止损 - trailing_stop_price = current_position['lowest_price'] * (1 + self.trailing_stop_pct) - if current_price >= trailing_stop_price: - should_exit = True - exit_reason = "移动止损" - exit_price = trailing_stop_price - - # ========== 反向信号检查 ========== - if signal and ( - (signal == "dead" and pos_dir == "long") or - (signal == "golden" and pos_dir == "short") - ): - should_exit = True - exit_reason = "反向信号" - # 反向信号用下一根开盘价平仓 - if idx + 1 < len(data): - exit_price = float(data[idx + 1]['open']) - - # ========== 执行平仓 ========== - if should_exit: - # 计算盈亏 - if pos_dir == 'long': - diff = exit_price - entry_price - else: # short - diff = entry_price - exit_price - - # 记录交易 - trade = { - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']), - 'exit_time': datetime.datetime.fromtimestamp(current_bar['id']), - 'signal': self.stats[signal_key]['name'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': entry_price, - 'exit': exit_price, - 'diff': diff, - 'exit_reason': exit_reason, - 'holding_bars': idx - current_position['entry_idx'] + 1, - } - trades.append(trade) - - # 更新统计 - self.stats[signal_key]['total_profit'] += diff - if diff > 0: - self.stats[signal_key]['wins'] += 1 - - # 平仓 - current_position = None - - # 如果是因为反向信号平仓,立即反手开仓 - if exit_reason == "反向信号" and signal and idx + 1 < len(data): - if signal == "golden": # 反手做多 - current_position = { - 'direction': 'long', - 'entry_price': exit_price, # 同价反手 - 'entry_time': data[idx + 1]['id'], - 'entry_idx': idx + 1, - 'signal': 'golden_cross', - 'highest_price': exit_price, - 'lowest_price': exit_price, - } - self.stats['golden_cross']['count'] += 1 - elif signal == "dead": # 反手做空 - current_position = { - 'direction': 'short', - 'entry_price': exit_price, - 'entry_time': data[idx + 1]['id'], - 'entry_idx': idx + 1, - 'signal': 'dead_cross', - 'highest_price': exit_price, - 'lowest_price': exit_price, - } - self.stats['dead_cross']['count'] += 1 - - # 跳过下一根K线 - continue - - # ========== 尾仓处理 ========== - if current_position: - last_bar = data[-1] - exit_price = float(last_bar['close']) - pos_dir = current_position['direction'] - entry_price = current_position['entry_price'] - signal_key = current_position['signal'] - - diff = (exit_price - entry_price) if pos_dir == 'long' else (entry_price - exit_price) - - trade = { - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time']), - 'exit_time': datetime.datetime.fromtimestamp(last_bar['id']), - 'signal': self.stats[signal_key]['name'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': entry_price, - 'exit': exit_price, - 'diff': diff, - 'exit_reason': "尾仓平仓", - 'holding_bars': len(data) - current_position['entry_idx'], - } - trades.append(trade) - - self.stats[signal_key]['total_profit'] += diff - if diff > 0: - self.stats[signal_key]['wins'] += 1 - - return trades, self.stats - - -# ========================= 可视化分析 ========================= - -def analyze_trades(trades: List[Dict], stats: Dict) -> Dict: - """深入分析交易结果""" - if not trades: - return {} - - # 基础统计 - total_trades = len(trades) - winning_trades = [t for t in trades if t['diff'] > 0] - losing_trades = [t for t in trades if t['diff'] <= 0] - - win_rate = len(winning_trades) / total_trades * 100 if total_trades > 0 else 0 - - # 盈亏统计 - total_profit = sum(t['diff'] for t in trades) - avg_profit_per_trade = total_profit / total_trades if total_trades > 0 else 0 - - # 胜率相关 - avg_win = np.mean([t['diff'] for t in winning_trades]) if winning_trades else 0 - avg_loss = np.mean([t['diff'] for t in losing_trades]) if losing_trades else 0 - - # 盈亏比 - profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf') - - # 最大连续盈利/亏损 - max_consecutive_wins = 0 - max_consecutive_losses = 0 - current_wins = 0 - current_losses = 0 - - for trade in trades: - if trade['diff'] > 0: - current_wins += 1 - current_losses = 0 - max_consecutive_wins = max(max_consecutive_wins, current_wins) - else: - current_losses += 1 - current_wins = 0 - max_consecutive_losses = max(max_consecutive_losses, current_losses) - - # 持仓时间分析 - holding_bars = [t.get('holding_bars', 0) for t in trades] - avg_holding_bars = np.mean(holding_bars) if holding_bars else 0 - - # 按平仓原因分析 - exit_reasons = {} - for trade in trades: - reason = trade.get('exit_reason', '未知') - exit_reasons[reason] = exit_reasons.get(reason, 0) + 1 - - return { - 'total_trades': total_trades, - 'win_rate': win_rate, - 'total_profit': total_profit, - 'avg_profit_per_trade': avg_profit_per_trade, - 'avg_win': avg_win, - 'avg_loss': avg_loss, - 'profit_factor': profit_factor, - 'max_consecutive_wins': max_consecutive_wins, - 'max_consecutive_losses': max_consecutive_losses, - 'avg_holding_bars': avg_holding_bars, - 'exit_reasons': exit_reasons, - 'winning_trades_count': len(winning_trades), - 'losing_trades_count': len(losing_trades), - } - - -# ========================= 主程序 ========================= - -if __name__ == '__main__': - # 从CSV文件读取数据 - csv_file = "kline_3.csv" # 请替换为你的CSV文件路径 - read_data = [] - - try: - with open(csv_file, 'r') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - read_data.append({ - 'id': int(row['id']), - 'open': float(row['open']), - 'high': float(row['high']), - 'low': float(row['low']), - 'close': float(row['close']) - }) - print(f"成功读取 {len(read_data)} 条K线数据") - except FileNotFoundError: - print(f"文件 {csv_file} 未找到,请检查路径") - exit(1) - except Exception as e: - print(f"读取CSV文件时出错: {e}") - exit(1) - - # 按时间排序 - read_data.sort(key=lambda x: x['id']) - - print("\n" + "=" * 60) - print("EMA交叉策略回测系统") - print("=" * 60) - - # 策略参数选择 - print("\n请选择EMA策略类型:") - print("1. 双EMA交叉策略 (默认: EMA12/EMA26)") - print("2. 三EMA排列策略 (默认: EMA7/EMA14/EMA30)") - print("3. EMA+MACD双重确认策略") - - choice = input("\n请输入选择 (1-3,默认1): ").strip() - - if choice == "2": - # 三EMA策略 - backtester = EMABacktester( - fast_period=7, - slow_period=30, - use_triple_ema=True, - mid_period=14, - use_macd_confirmation=False, - stop_loss_pct=0.01, # 1.5%止损 - take_profit_pct=0.4, # 4%止盈 - trailing_stop_pct=0.04 # 2%移动止损 - ) - strategy_name = "三EMA排列策略(7/14/30)" - - elif choice == "3": - # EMA+MACD策略 - backtester = EMABacktester( - fast_period=12, - slow_period=26, - signal_period=9, - use_triple_ema=False, - use_macd_confirmation=True, - stop_loss_pct=0.1, # 1%止损 - take_profit_pct=0.4, # 3%止盈 - trailing_stop_pct=0.04 # 1.5%移动止损 - ) - strategy_name = "EMA+MACD双重确认策略(12/26/9)" - - else: - # 默认双EMA策略 - backtester = EMABacktester( - fast_period=12, - slow_period=26, - use_triple_ema=False, - use_macd_confirmation=False, - stop_loss_pct=0.02, # 2%止损 - take_profit_pct=0.05, # 5%止盈 - trailing_stop_pct=0.03 # 3%移动止损 - ) - strategy_name = "双EMA交叉策略(12/26)" - - print(f"\n使用策略: {strategy_name}") - print("开始回测...") - - # 执行回测 - trades, stats = backtester.backtest(read_data) - - # ========== 交易详情和盈利计算 ========== - print(f"\n{'=' * 60}") - print(f"回测结果 - {strategy_name}") - print(f"{'=' * 60}") - - # 参数设定 - contract_size = 10000 # 合约规模 - open_fee_fixed = 5 # 固定开仓手续费 - close_fee_rate = 0.0005 # 平仓手续费率 - - total_points_profit = 0 # 累计点差 - total_money_profit = 0 # 累计金额盈利 - total_fee = 0 # 累计手续费 - - print(f"\n交易详情 (共{len(trades)}笔):") - print("-" * 100) - - for i, t in enumerate(trades, 1): - entry = t['entry'] - exit = t['exit'] - direction = t['direction'] - - # 原始价差 - point_diff = (exit - entry) if direction == '做多' else (entry - exit) - - # 金额盈利 - money_profit = point_diff / entry * contract_size - - # 手续费 - fee = open_fee_fixed + (contract_size / entry * exit * close_fee_rate) - - # 净利润 - net_profit = money_profit - fee - - # 保存结果 - t.update({ - 'point_diff': point_diff, - 'raw_profit': money_profit, - 'fee': fee, - 'net_profit': net_profit - }) - - total_points_profit += point_diff - total_money_profit += money_profit - total_fee += fee - - # 输出交易详情 - profit_color = "\033[92m" if net_profit > 0 else "\033[91m" - reset_color = "\033[0m" - - print(f"{i:3d}. {t['entry_time'].strftime('%Y-%m-%d %H:%M')} -> " - f"{t['exit_time'].strftime('%Y-%m-%d %H:%M')} " - f"{direction}({t['signal']}) " - f"入={entry:.2f} 出={exit:.2f} " - f"{profit_color}净利={net_profit:+.2f}{reset_color} " - f"(持有:{t.get('holding_bars', '?')}根K线, 原因:{t.get('exit_reason', '未知')})") - - # ========== 汇总统计 ========== - total_net_profit = total_money_profit - total_fee - - print(f"\n{'=' * 60}") - print("汇总统计") - print(f"{'=' * 60}") - - print(f"总交易笔数: {len(trades)}") - print(f"总点差: {total_points_profit:.2f}") - print(f"总原始盈利(未扣费): {total_money_profit:.2f}") - print(f"总手续费: {total_fee:.2f}") - print(f"总净利润: {total_net_profit:.2f}") - - # 深入分析 - analysis = analyze_trades(trades, stats) - - if analysis: - print(f"\n策略分析:") - print(f"- 胜率: {analysis['win_rate']:.2f}%") - print(f"- 平均每笔盈利: {analysis['avg_profit_per_trade']:.2f}") - print(f"- 平均盈利: {analysis['avg_win']:.2f}") - print(f"- 平均亏损: {analysis['avg_loss']:.2f}") - print(f"- 盈亏比: {analysis['profit_factor']:.2f}") - print(f"- 最大连续盈利: {analysis['max_consecutive_wins']} 笔") - print(f"- 最大连续亏损: {analysis['max_consecutive_losses']} 笔") - print(f"- 平均持仓K线数: {analysis['avg_holding_bars']:.1f} 根") - - print(f"\n平仓原因统计:") - for reason, count in analysis['exit_reasons'].items(): - percentage = count / len(trades) * 100 - print(f" - {reason}: {count} 笔 ({percentage:.1f}%)") - - # ========== 信号统计 ========== - print(f"\n{'=' * 60}") - print("信号统计") - print(f"{'=' * 60}") - - for k, v in stats.items(): - name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit'] - if count > 0: - win_rate = (wins / count * 100) - avg_p = total_p / count - profit_color = "\033[92m" if total_p > 0 else "\033[91m" - reset_color = "\033[0m" - - print(f"{name}:") - print(f" 信号次数: {count}") - print(f" 胜率: {win_rate:.2f}%") - print(f" 总价差: {profit_color}{total_p:.2f}{reset_color}") - print(f" 平均价差: {avg_p:.2f}") - - # ========== 风险指标 ========== - if len(trades) > 1: - returns = [t['net_profit'] for t in trades] - - # 夏普比率(简化版) - avg_return = np.mean(returns) - std_return = np.std(returns) - sharpe_ratio = avg_return / std_return if std_return > 0 else 0 - - # 最大回撤 - cumulative_returns = np.cumsum(returns) - running_max = np.maximum.accumulate(cumulative_returns) - drawdown = cumulative_returns - running_max - max_drawdown = abs(np.min(drawdown)) if len(drawdown) > 0 else 0 - - print(f"\n{'=' * 60}") - print("风险指标") - print(f"{'=' * 60}") - print(f"夏普比率(简化): {sharpe_ratio:.4f}") - print(f"最大回撤: {max_drawdown:.2f}") - - # 盈亏分布 - print(f"\n盈亏分布:") - profit_ranges = { - "大亏 (< -100)": len([r for r in returns if r < -100]), - "中亏 (-100 ~ -50)": len([r for r in returns if -100 <= r < -50]), - "小亏 (-50 ~ 0)": len([r for r in returns if -50 <= r < 0]), - "小盈 (0 ~ 50)": len([r for r in returns if 0 <= r < 50]), - "中盈 (50 ~ 100)": len([r for r in returns if 50 <= r < 100]), - "大盈 (> 100)": len([r for r in returns if r >= 100]), - } - - for range_name, count in profit_ranges.items(): - if count > 0: - percentage = count / len(returns) * 100 - print(f" {range_name}: {count} 笔 ({percentage:.1f}%)") - - print(f"\n{'=' * 60}") - print("回测完成!") - print(f"{'=' * 60}") \ No newline at end of file diff --git a/bitmart/锤子线和上吊线.py b/bitmart/锤子线和上吊线.py deleted file mode 100644 index 0183570..0000000 --- a/bitmart/锤子线和上吊线.py +++ /dev/null @@ -1,133 +0,0 @@ -import csv -from datetime import datetime, timezone -import matplotlib.pyplot as plt - -# ---------------- 配置 ---------------- -CSV_FILE = "kline_3.csv" # CSV 文件路径 -LEVERAGE = 100 -CAPITAL = 10000 -POSITION_RATIO = 0.01 -FEE_RATE = 0.0005 - -# ---------------- 读取 CSV ---------------- -data = [] -with open(CSV_FILE, 'r') as f: - reader = csv.DictReader(f) - for row in reader: - ts = int(row['id']) - dt = datetime.fromtimestamp(ts, tz=timezone.utc) - if dt.year == 2025 and dt.month == 1: - data.append({ - 'time': dt, - 'open': float(row['open']), - 'high': float(row['high']), - 'low': float(row['low']), - 'close': float(row['close']), - }) - -# ---------------- 策略回测 ---------------- -total_profit = 0 -total_fee = 0 -trades = [] # 保存每笔交易详情 - -for i in range(len(data) - 1): - k = data[i] - k_next = data[i + 1] - - body = abs(k['close'] - k['open']) - upper_shadow = k['high'] - max(k['close'], k['open']) - lower_shadow = min(k['close'], k['open']) - k['low'] - position_usdt = CAPITAL * POSITION_RATIO - leveraged_position = position_usdt * LEVERAGE - - # ---------------- 锤子线 → 做多 ---------------- - if body != 0 and lower_shadow >= 2 * body: - profit_raw = (k_next['close'] - k['close']) / k['close'] * leveraged_position - fee_open = leveraged_position * FEE_RATE - fee_close = leveraged_position * FEE_RATE - profit_net = profit_raw - fee_open - fee_close - - total_profit += profit_raw - total_fee += fee_open + fee_close - - trades.append({ - '方向': '多', - '开仓时间': k['time'].strftime("%Y-%m-%d %H:%M"), - '开仓价格': k['close'], - '平仓时间': k_next['time'].strftime("%Y-%m-%d %H:%M"), - '平仓价格': k_next['close'], - '本金': position_usdt, - '杠杆仓位': leveraged_position, - '开仓手续费': fee_open, - '平仓手续费': fee_close, - '原始盈亏': profit_raw, - '净盈亏': profit_net - }) - - # ---------------- 上吊线 → 做空 ---------------- - elif body != 0 and upper_shadow >= 2 * body: - profit_raw = (k['close'] - k_next['close']) / k['close'] * leveraged_position - fee_open = leveraged_position * FEE_RATE - fee_close = leveraged_position * FEE_RATE - profit_net = profit_raw - fee_open - fee_close - - total_profit += profit_raw - total_fee += fee_open + fee_close - - trades.append({ - '方向': '空', - '开仓时间': k['time'].strftime("%Y-%m-%d %H:%M"), - '开仓价格': k['close'], - '平仓时间': k_next['time'].strftime("%Y-%m-%d %H:%M"), - '平仓价格': k_next['close'], - '本金': position_usdt, - '杠杆仓位': leveraged_position, - '开仓手续费': fee_open, - '平仓手续费': fee_close, - '原始盈亏': profit_raw, - '净盈亏': profit_net - }) - -# ---------------- 输出统计 ---------------- -print(f"1月总原始盈亏: {total_profit:.2f} USDT") -print(f"1月总手续费: {total_fee:.2f} USDT") -print(f"1月净盈亏: {total_profit - total_fee:.2f} USDT") -print("\n交易明细(前10笔示例):") - -n = 0 -for t in trades: - if t['原始盈亏'] > 10 or t['原始盈亏'] < -10: - print(f"{t['方向']}仓 | 开仓: {t['开仓时间']} @ {t['开仓价格']:.2f} | " - f"平仓: {t['平仓时间']} @ {t['平仓价格']:.2f} | " - f"本金: {t['本金']:.2f} | 杠杆仓位: {t['杠杆仓位']:.2f} | " - f"开仓手续费: {t['开仓手续费']:.2f} | 平仓手续费: {t['平仓手续费']:.2f} | " - f"原始盈亏: {t['原始盈亏']:.2f} | 净盈亏: {t['净盈亏']:.2f}") - - n += t['原始盈亏'] - -print(n) - -# # ---------------- 绘制 K 线图 + 交易点 ---------------- -# times = [k['time'] for k in data] -# closes = [k['close'] for k in data] -# -# plt.figure(figsize=(16,6)) -# plt.plot(times, closes, color='black', label='ETH 收盘价') -# -# for t in trades: -# open_time = datetime.strptime(t['开仓时间'], "%Y-%m-%d %H:%M") -# close_time = datetime.strptime(t['平仓时间'], "%Y-%m-%d %H:%M") -# if t['方向'] == '多': -# plt.scatter(open_time, t['开仓价格'], color='green', marker='^', s=100, label='多开' if '多开' not in plt.gca().get_legend_handles_labels()[1] else "") -# plt.scatter(close_time, t['平仓价格'], color='red', marker='v', s=100, label='多平' if '多平' not in plt.gca().get_legend_handles_labels()[1] else "") -# else: -# plt.scatter(open_time, t['开仓价格'], color='red', marker='v', s=100, label='空开' if '空开' not in plt.gca().get_legend_handles_labels()[1] else "") -# plt.scatter(close_time, t['平仓价格'], color='green', marker='^', s=100, label='空平' if '空平' not in plt.gca().get_legend_handles_labels()[1] else "") -# -# plt.xlabel('时间') -# plt.ylabel('价格(USDT)') -# plt.title('ETH 永续合约 1 月交易回测(100倍杠杆)') -# plt.legend() -# plt.grid(True) -# plt.gcf().autofmt_xdate() -# plt.show() diff --git a/test.py b/test.py index 3fb1ec7..c4474cb 100644 --- a/test.py +++ b/test.py @@ -18,7 +18,7 @@ class StrategyConfig: # ===== 合约 ===== contract_symbol: str = "ETHUSDT" open_type: str = "cross" - leverage: str = "2" # 1~2 更稳 + leverage: str = "50" # 1~2 更稳 # ===== K线与指标 ===== step_min: int = 1 @@ -40,7 +40,7 @@ class StrategyConfig: cooldown_sec_after_exit: int = 10 # 平仓后冷却10秒,防抖 # ===== 下单/仓位 ===== - risk_percent: float = 0.0015 # 每次用可用余额的0.15%作为保证金预算 + risk_percent: float = 0.005 # 每次用可用余额的0.15%作为保证金预算 min_size: int = 1 max_size: int = 5000 @@ -530,3 +530,4 @@ if __name__ == "__main__": bot.action() # 9274.08 +# 9260.59 diff --git a/test1.py b/test1.py index 12d247f..b5fce02 100644 --- a/test1.py +++ b/test1.py @@ -1,92 +1,692 @@ -import csv -from datetime import datetime, timezone -import matplotlib.pyplot as plt +import os +import time +import uuid +import datetime +from dataclasses import dataclass -# ---------------- 配置 ---------------- -CSV_FILE = "bitmart/kline_3.csv" # 你的 CSV 文件路径 -LEVERAGE = 100 -CAPITAL = 10000 -POSITION_RATIO = 0.01 -FEE_RATE = 0.00015 +from tqdm import tqdm +from loguru import logger -# ---------------- 读取 CSV ---------------- -data = [] -with open(CSV_FILE, 'r') as f: - reader = csv.DictReader(f) - for row in reader: - ts = int(row['id']) - dt = datetime.fromtimestamp(ts, tz=timezone.utc) - if dt.year == 2025 and dt.month == 1: - data.append({ - 'time': dt, - 'open': float(row['open']), - 'high': float(row['high']), - 'low': float(row['low']), - 'close': float(row['close']), - }) +from bitmart.api_contract import APIContract +from bitmart.lib.cloud_exceptions import APIException -# ---------------- 识别交易信号 ---------------- -trades = [] -position_usdt = CAPITAL * POSITION_RATIO -leveraged_position = position_usdt * LEVERAGE +from 交易.tools import send_dingtalk_message -for i in range(len(data)-1): - k1 = data[i] - k2 = data[i+1] - # 刺透形态(Piercing Line,多头) - if k1['close'] < k1['open'] and k2['close'] > k2['open']: - midpoint = (k1['open'] + k1['close']) / 2 - if k2['open'] < k1['close'] and k2['close'] > midpoint: - trades.append({ - '方向': '多', - '开仓时间': k2['time'], - '开仓价格': k2['open'], - '平仓时间': k2['time'], - '平仓价格': k2['close'] - }) +@dataclass +class StrategyConfig: + # ============================= + # 1m | ETH 永续 | 控止损≤5/日 + # ============================= - # 乌云盖顶(Dark Cloud Cover,空头) - elif k1['close'] > k1['open'] and k2['close'] < k2['open']: - midpoint = (k1['open'] + k1['close']) / 2 - if k2['open'] > k1['close'] and k2['close'] < midpoint: - trades.append({ - '方向': '空', - '开仓时间': k2['time'], - '开仓价格': k2['open'], - '平仓时间': k2['time'], - '平仓价格': k2['close'] - }) + # ===== 合约 ===== + contract_symbol: str = "ETHUSDT" + open_type: str = "cross" + leverage: str = "30" # 50 -> 30:显著降低1m噪声导致的连环止损与回撤波动 -# ---------------- 绘制 K 线图 ---------------- -plt.figure(figsize=(16,6)) + # ===== K线与指标 ===== + step_min: int = 1 + lookback_min: int = 240 + ema_len: int = 36 # 30 -> 36:均值更稳,信号更挑剔 + atr_len: int = 14 -times = [k['time'] for k in data] -opens = [k['open'] for k in data] -closes = [k['close'] for k in data] -highs = [k['high'] for k in data] -lows = [k['low'] for k in data] + # ===== 动态阈值基础(自适应行情)===== + entry_dev_floor: float = 0.0012 # 0.10% -> 0.12%:过滤小噪声进场 + tp_floor: float = 0.0006 # 0.05% -> 0.06%:更接近“净盈利” + sl_floor: float = 0.0018 # 0.15% -> 0.18%:ETH 1m插针多,底线略放宽 -# 绘制 K 线(用竖线表示最高最低价,用矩形表示开收盘价) -for i in range(len(data)): - color = 'green' if closes[i] >= opens[i] else 'red' - plt.plot([times[i], times[i]], [lows[i], highs[i]], color='black') # 高低价 - plt.plot([times[i]-0.0005, times[i]+0.0005], [opens[i], opens[i]], color=color, linewidth=5) # 开盘价 - plt.plot([times[i]-0.0005, times[i]+0.0005], [closes[i], closes[i]], color=color, linewidth=5) # 收盘价 + # 更挑剔、更少止损(进场更苛刻;止损不过度随波动放大) + entry_k: float = 1.45 # 1.20 -> 1.45:减少进场频率 + tp_k: float = 0.65 # 0.60 -> 0.65:略抬止盈 + sl_k: float = 1.05 # 1.20 -> 1.05:配合sl_floor,避免高波动下止损无限变大 -# ---------------- 标注交易信号 ---------------- -for t in trades: - if t['方向'] == '多': - plt.scatter(t['开仓时间'], t['开仓价格'], color='green', marker='^', s=100, label='多开' if '多开' not in plt.gca().get_legend_handles_labels()[1] else "") - plt.scatter(t['平仓时间'], t['平仓价格'], color='red', marker='v', s=100, label='多平' if '多平' not in plt.gca().get_legend_handles_labels()[1] else "") - else: - plt.scatter(t['开仓时间'], t['开仓价格'], color='red', marker='v', s=100, label='空开' if '空开' not in plt.gca().get_legend_handles_labels()[1] else "") - plt.scatter(t['平仓时间'], t['平仓价格'], color='green', marker='^', s=100, label='空平' if '空平' not in plt.gca().get_legend_handles_labels()[1] else "") + # ===== 时间/冷却 ===== + max_hold_sec: int = 75 # 90/120 -> 75:1m回归不恋战 + cooldown_sec_after_exit: int = 20 # 10 -> 20:减少“刚出又进”连环单 -plt.xlabel('时间') -plt.ylabel('价格(USDT)') -plt.title('ETH 永续合约 1 月交易回测(刺透 & 乌云形态)') -plt.legend() -plt.grid(True) -plt.gcf().autofmt_xdate() -plt.show() + # ===== 下单/仓位 ===== + risk_percent: float = 0.004 # 0.005 -> 0.004:再压一点波动,更贴合止损≤5/日 + min_size: int = 1 + max_size: int = 5000 + + # ===== 日内风控 ===== + daily_loss_limit: float = 0.02 # -2% 停机 + daily_profit_cap: float = 0.01 # +1% 封顶停机 + + # ===== 危险模式过滤(1m ETH 更敏感)===== + atr_ratio_kill: float = 0.0038 # 0.0045 -> 0.0038:更早暂停开仓 + big_body_kill: float = 0.010 # 0.012 -> 0.010:更敏感 + + # ===== 轮询节奏 ===== + klines_refresh_sec: int = 10 + tick_refresh_sec: int = 1 + status_notify_sec: int = 60 + + # ========================================================= + # ✅ 止损后同向入场加门槛(但不禁止同向重入) + # ========================================================= + reentry_penalty_mult: float = 1.55 # 同向入场门槛×1.55:大幅降低连环止损概率 + reentry_penalty_max_sec: int = 180 # 罚时最长持续 + reset_band_k: float = 0.45 # dev回到更靠近均值才解除罚则 + reset_band_floor: float = 0.0006 # 最小复位带宽(0.06%) + + # ========================================================= + # ✅ 自动阈值:ATR/Price 分位数基准(更稳,不被短时噪声带跑) + # ========================================================= + vol_baseline_window: int = 120 + vol_baseline_quantile: float = 0.65 + vol_scale_min: float = 0.80 + vol_scale_max: float = 1.60 + + # ========================================================= + # ✅ 升级:止损后同方向 SL 放宽幅度与“止损时 vol_scale”联动 + # ========================================================= + post_sl_sl_max_sec: int = 90 # 只照顾“扫损后很快反弹”的窗口 + post_sl_mult_min: float = 1.02 + post_sl_mult_max: float = 1.16 + post_sl_vol_alpha: float = 0.20 # mult = 1 + alpha*(vol_scale_at_sl - 1) + + +class BitmartFuturesMeanReversionBot: + def __init__(self, cfg: StrategyConfig): + self.cfg = cfg + + # ✅ 只从环境变量读(请务必更换曾经硬编码泄露过的 key) + self.api_key = os.getenv("BITMART_API_KEY", "").strip() + self.secret_key = os.getenv("BITMART_SECRET_KEY", "").strip() + self.memo = os.getenv("BITMART_MEMO", "合约交易").strip() + + if not self.api_key or not self.secret_key: + raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)") + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + # 持仓状态: -1 空, 0 无, 1 多 + self.pos = 0 + self.entry_price = None + self.entry_ts = None + self.last_exit_ts = 0 + + # 日内权益基准 + self.day_start_equity = None + self.trading_enabled = True + self.day_tag = datetime.date.today() + + # 缓存 + self._klines_cache = None + self._klines_cache_ts = 0 + self._last_status_notify_ts = 0 + + # ✅ 止损后“同向入场加门槛”状态 + self.last_sl_dir = 0 # 1=多止损,-1=空止损,0=无 + self.last_sl_ts = 0.0 + + # ✅ 止损后“同方向 SL 联动放宽”状态 + self.post_sl_dir = 0 + self.post_sl_ts = 0.0 + self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale + + self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90) + + # ----------------- 通用工具 ----------------- + def ding(self, msg, error=False): + prefix = "❌bitmart:" if error else "🔔bitmart:" + if error: + for _ in range(3): + send_dingtalk_message(f"{prefix}{msg}") + else: + send_dingtalk_message(f"{prefix}{msg}") + + def set_leverage(self) -> bool: + try: + resp = self.contractAPI.post_submit_leverage( + contract_symbol=self.cfg.contract_symbol, + leverage=self.cfg.leverage, + open_type=self.cfg.open_type + )[0] + if resp.get("code") == 1000: + logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x") + return True + logger.error(f"设置杠杆失败: {resp}") + self.ding(f"设置杠杆失败: {resp}", error=True) + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + self.ding(f"设置杠杆异常: {e}", error=True) + return False + + # ----------------- 行情/指标 ----------------- + def get_klines_cached(self): + now = time.time() + if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec: + return self._klines_cache + + kl = self.get_klines() + if kl: + self._klines_cache = kl + self._klines_cache_ts = now + return self._klines_cache + + def get_klines(self): + try: + end_time = int(time.time()) + start_time = end_time - 60 * self.cfg.lookback_min + + resp = self.contractAPI.get_kline( + contract_symbol=self.cfg.contract_symbol, + step=self.cfg.step_min, + start_time=start_time, + end_time=end_time + )[0] + + if resp.get("code") != 1000: + logger.error(f"获取K线失败: {resp}") + return None + + data = resp.get("data", []) + formatted = [] + for k in data: + formatted.append({ + "id": int(k["timestamp"]), + "open": float(k["open_price"]), + "high": float(k["high_price"]), + "low": float(k["low_price"]), + "close": float(k["close_price"]), + }) + formatted.sort(key=lambda x: x["id"]) + return formatted + except Exception as e: + logger.error(f"获取K线异常: {e}") + self.ding(f"获取K线异常: {e}", error=True) + return None + + def get_last_price(self, fallback_close: float) -> float: + """ + 优先取更实时的最新价;若SDK不支持/字段不同,回退到K线close。 + """ + try: + if hasattr(self.contractAPI, "get_contract_details"): + r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0] + d = r.get("data") if isinstance(r, dict) else None + if isinstance(d, dict): + for key in ("last_price", "mark_price", "index_price"): + if key in d and d[key] is not None: + return float(d[key]) + + if hasattr(self.contractAPI, "get_ticker"): + r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0] + d = r.get("data") if isinstance(r, dict) else None + if isinstance(d, dict): + for key in ("last_price", "price", "last", "close"): + if key in d and d[key] is not None: + return float(d[key]) + except Exception: + pass + + return float(fallback_close) + + @staticmethod + def ema(values, n: int) -> float: + k = 2 / (n + 1) + e = values[0] + for v in values[1:]: + e = v * k + e * (1 - k) + return e + + @staticmethod + def atr(klines, n: int) -> float: + if len(klines) < n + 1: + return 0.0 + trs = [] + for i in range(-n, 0): + cur = klines[i] + prev = klines[i - 1] + tr = max( + cur["high"] - cur["low"], + abs(cur["high"] - prev["close"]), + abs(cur["low"] - prev["close"]), + ) + trs.append(tr) + return sum(trs) / len(trs) + + def is_danger_market(self, klines, price: float) -> bool: + last = klines[-1] + body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0 + if body >= self.cfg.big_body_kill: + return True + + a = self.atr(klines, self.cfg.atr_len) + atr_ratio = (a / price) if price > 0 else 0.0 + if atr_ratio >= self.cfg.atr_ratio_kill: + return True + + return False + + def atr_ratio_baseline(self, klines) -> float: + """ + 自动阈值基准:最近 window 根的 atr_ratio 分布的 quantile 作为“典型波动” + """ + window = self.cfg.vol_baseline_window + if len(klines) < (window + self.cfg.atr_len + 5): + return 0.0 + + ratios = [] + for i in range(-window, 0): + sub = klines[:i] if i != 0 else klines + a = self.atr(sub, self.cfg.atr_len) + p = sub[-1]["close"] + if p > 0 and a > 0: + ratios.append(a / p) + + if not ratios: + return 0.0 + + ratios.sort() + q = max(0.0, min(1.0, self.cfg.vol_baseline_quantile)) + idx = int(q * (len(ratios) - 1)) + return ratios[idx] + + def dynamic_thresholds(self, atr_ratio: float, base_ratio: float): + """ + 动态阈值:atr_ratio * vol_scale,并带 floor + """ + if base_ratio <= 0: + vol_scale = 1.0 + else: + raw = atr_ratio / base_ratio + vol_scale = max(self.cfg.vol_scale_min, min(self.cfg.vol_scale_max, raw)) + + entry_dev = max(self.cfg.entry_dev_floor, self.cfg.entry_k * vol_scale * atr_ratio) + tp = max(self.cfg.tp_floor, self.cfg.tp_k * vol_scale * atr_ratio) + sl = max(self.cfg.sl_floor, self.cfg.sl_k * vol_scale * atr_ratio) + return entry_dev, tp, sl, vol_scale + + # ----------------- 账户/仓位 ----------------- + def get_assets_available(self) -> float: + try: + resp = self.contractAPI.get_assets_detail()[0] + if resp.get("code") != 1000: + return 0.0 + data = resp.get("data") + if isinstance(data, dict): + return float(data.get("available_balance", 0)) + if isinstance(data, list): + for asset in data: + if asset.get("currency") == "USDT": + return float(asset.get("available_balance", 0)) + return 0.0 + except Exception as e: + logger.error(f"余额查询异常: {e}") + return 0.0 + + def get_position_status(self) -> bool: + try: + resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0] + if resp.get("code") != 1000: + return False + + positions = resp.get("data", []) + if not positions: + self.pos = 0 + return True + + p = positions[0] + self.pos = 1 if p["position_type"] == 1 else -1 + return True + except Exception as e: + logger.error(f"持仓查询异常: {e}") + self.ding(f"持仓查询异常: {e}", error=True) + return False + + def get_equity_proxy(self) -> float: + return self.get_assets_available() + + def refresh_daily_baseline(self): + today = datetime.date.today() + if today != self.day_tag: + self.day_tag = today + self.day_start_equity = None + self.trading_enabled = True + self.ding(f"新的一天({today}):重置日内风控基准") + + def risk_kill_switch(self): + self.refresh_daily_baseline() + equity = self.get_equity_proxy() + if equity <= 0: + return + + if self.day_start_equity is None: + self.day_start_equity = equity + logger.info(f"日内权益基准设定:{equity:.2f} USDT") + return + + pnl = (equity - self.day_start_equity) / self.day_start_equity + if pnl <= -self.cfg.daily_loss_limit: + self.trading_enabled = False + self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True) + + if pnl >= self.cfg.daily_profit_cap: + self.trading_enabled = False + self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机") + + # ----------------- 下单 ----------------- + def calculate_size(self, price: float) -> int: + """ + 保守仓位估算:按 1张≈0.001ETH(沿用你原假设) + """ + bal = self.get_assets_available() + if bal < 10: + return 0 + + margin = bal * self.cfg.risk_percent + lev = int(self.cfg.leverage) + + size = int((margin * lev) / (price * 0.001)) + size = max(self.cfg.min_size, size) + size = min(self.cfg.max_size, size) + return size + + def place_market_order(self, side: int, size: int) -> bool: + """ + side: + 1 开多 + 2 平空 + 3 平多 + 4 开空 + """ + if size <= 0: + return False + + client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}" + try: + resp = self.contractAPI.post_submit_order( + contract_symbol=self.cfg.contract_symbol, + client_order_id=client_order_id, + side=side, + mode=1, + type="market", + leverage=self.cfg.leverage, + open_type=self.cfg.open_type, + size=size + )[0] + + logger.info(f"order_resp: {resp}") + + if resp.get("code") == 1000: + return True + + self.ding(f"下单失败: {resp}", error=True) + return False + + except APIException as e: + logger.error(f"API下单异常: {e}") + self.ding(f"API下单异常: {e}", error=True) + return False + + except Exception as e: + logger.error(f"下单未知异常: {e}") + self.ding(f"下单未知异常: {e}", error=True) + return False + + def close_position_all(self): + if self.pos == 1: + ok = self.place_market_order(3, 999999) + if ok: + self.pos = 0 + elif self.pos == -1: + ok = self.place_market_order(2, 999999) + if ok: + self.pos = 0 + + # ----------------- 止损后机制(核心优化) ----------------- + def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool: + """ + 止损后同向入场加门槛: + - 只要 dev 还没有回到中性区,就对“上次止损方向”的同向入场门槛提高 + - dev 回到 abs(dev) <= reset_band 后自动解除 + - 超过 max_sec 自动解除(避免一直卡住) + """ + if self.last_sl_dir == 0: + return False + + if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec: + self.last_sl_dir = 0 + return False + + reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev) + if abs(dev) <= reset_band: + self.last_sl_dir = 0 + return False + + return True + + def _post_sl_dynamic_mult(self) -> float: + """ + 止损后同方向 SL 放宽倍数与“止损时 vol_scale”联动: + mult = 1 + alpha*(vol_scale_at_sl - 1) + 并做上下限裁剪 + 有效期控制 + """ + if self.post_sl_dir == 0: + return 1.0 + + if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec: + self.post_sl_dir = 0 + self.post_sl_vol_scale = 1.0 + return 1.0 + + raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0) + raw = max(1.0, raw) # 不缩小止损,只放宽 + return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw)) + + # ----------------- 交易逻辑 ----------------- + def in_cooldown(self) -> bool: + return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit + + def maybe_enter(self, price: float, ema_value: float, entry_dev: float): + if self.pos != 0: + return + if self.in_cooldown(): + return + + dev = (price - ema_value) / ema_value if ema_value else 0.0 + size = self.calculate_size(price) + if size <= 0: + return + + penalty_active = self._reentry_penalty_active(dev, entry_dev) + + # 基础阈值 + long_th = -entry_dev + short_th = entry_dev + + # 若罚则生效:对“上次止损方向”的同向阈值提高 + if penalty_active: + if self.last_sl_dir == 1: + long_th = -entry_dev * self.cfg.reentry_penalty_mult + elif self.last_sl_dir == -1: + short_th = entry_dev * self.cfg.reentry_penalty_mult + + logger.info( + f"enter_check: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% " + f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) " + f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}" + ) + + if dev <= long_th: + if self.place_market_order(1, size): # 开多 + self.pos = 1 + self.entry_price = price + self.entry_ts = time.time() + self.ding(f"✅开多:dev={dev * 100:.3f}% size={size} entry={price:.2f}") + + elif dev >= short_th: + if self.place_market_order(4, size): # 开空 + self.pos = -1 + self.entry_price = price + self.entry_ts = time.time() + self.ding(f"✅开空:dev={dev * 100:.3f}% size={size} entry={price:.2f}") + + def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float): + if self.pos == 0 or self.entry_price is None or self.entry_ts is None: + return + + hold = time.time() - self.entry_ts + + if self.pos == 1: + pnl = (price - self.entry_price) / self.entry_price + else: + pnl = (self.entry_price - price) / self.entry_price + + # ✅ 同方向止损后:在有效期内放宽 SL(与止损时 vol_scale 联动) + sl_mult = 1.0 + if self.post_sl_dir == self.pos and self.post_sl_dir != 0: + sl_mult = self._post_sl_dynamic_mult() + effective_sl = sl * sl_mult + + if pnl >= tp: + self.close_position_all() + self.ding(f"🎯止盈:pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%") + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + elif pnl <= -effective_sl: + # 记录止损方向 + sl_dir = self.pos # 1=多止损,-1=空止损 + + self.close_position_all() + self.ding( + f"🛑止损:pnl={pnl * 100:.3f}% price={price:.2f} " + f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})", + error=True + ) + + # ✅ 开启:同向入场加门槛 + self.last_sl_dir = sl_dir + self.last_sl_ts = time.time() + + # ✅ 开启:同向 SL 联动放宽(记录止损时 vol_scale) + self.post_sl_dir = sl_dir + self.post_sl_ts = time.time() + self.post_sl_vol_scale = float(vol_scale) + + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + elif hold >= self.cfg.max_hold_sec: + self.close_position_all() + self.ding(f"⏱超时:hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}") + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float, + atr_ratio: float, base_ratio: float, vol_scale: float, + entry_dev: float, tp: float, sl: float): + now = time.time() + if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec: + return + self._last_status_notify_ts = now + + direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无") + penalty_active = self._reentry_penalty_active(dev, entry_dev) + + sl_mult = 1.0 + if self.pos != 0 and self.post_sl_dir == self.pos: + sl_mult = self._post_sl_dynamic_mult() + + msg = ( + f"【BitMart {self.cfg.contract_symbol}|1m均值回归(自动阈值+止损智能)】\n" + f"方向:{direction_str}\n" + f"现价:{price:.2f}\n" + f"EMA{self.cfg.ema_len}:{ema_value:.2f}\n" + f"dev:{dev * 100:.3f}%(entry_dev={entry_dev * 100:.3f}%)\n" + f"ATR比:{atr_ratio * 100:.3f}% 基准:{base_ratio * 100:.3f}% vol_scale={vol_scale:.2f}\n" + f"tp/sl:{tp * 100:.3f}% / {sl * 100:.3f}%(postSL×{sl_mult:.2f}, sl@scale={self.post_sl_vol_scale:.2f})\n" + f"止损同向加门槛:{'ON' if penalty_active else 'OFF'}(last_sl_dir={self.last_sl_dir})\n" + f"可用余额:{bal:.2f} USDT 杠杆:{self.cfg.leverage}x\n" + f"超时:{self.cfg.max_hold_sec}s 冷却:{self.cfg.cooldown_sec_after_exit}s" + ) + self.ding(msg) + + def action(self): + if not self.set_leverage(): + self.ding("杠杆设置失败,停止运行", error=True) + return + + while True: + now_dt = datetime.datetime.now() + self.pbar.n = now_dt.second + self.pbar.refresh() + + klines = self.get_klines_cached() + if not klines or len(klines) < (self.cfg.ema_len + 5): + time.sleep(1) + continue + + last_k = klines[-1] + closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]] + ema_value = self.ema(closes, self.cfg.ema_len) + + price = self.get_last_price(fallback_close=float(last_k["close"])) + dev = (price - ema_value) / ema_value if ema_value else 0.0 + + # 自动阈值 + a = self.atr(klines, self.cfg.atr_len) + atr_ratio = (a / price) if price > 0 else 0.0 + base_ratio = self.atr_ratio_baseline(klines) + entry_dev, tp, sl, vol_scale = self.dynamic_thresholds(atr_ratio, base_ratio) + + # 日内风控 + self.risk_kill_switch() + + # 刷新仓位 + if not self.get_position_status(): + time.sleep(1) + continue + + # 停机:平仓+不再开仓 + if not self.trading_enabled: + if self.pos != 0: + self.close_position_all() + time.sleep(5) + continue + + # 危险市场:不新开仓(允许已有仓按 tp/sl/超时 退出) + if self.is_danger_market(klines, price): + logger.warning("危险模式:高波动/大实体K,暂停开仓") + self.maybe_exit(price, tp, sl, vol_scale) + time.sleep(self.cfg.tick_refresh_sec) + continue + + # 先出场再入场 + self.maybe_exit(price, tp, sl, vol_scale) + self.maybe_enter(price, ema_value, entry_dev) + + # 状态通知(限频) + bal = self.get_assets_available() + self.notify_status_throttled( + price, ema_value, dev, bal, + atr_ratio, base_ratio, vol_scale, + entry_dev, tp, sl + ) + + time.sleep(self.cfg.tick_refresh_sec) + + +if __name__ == "__main__": + """ + Windows PowerShell: + setx BITMART_API_KEY "你的key" + setx BITMART_SECRET_KEY "你的secret" + setx BITMART_MEMO "合约交易" + 重新打开终端再运行。 + + Linux/macOS: + export BITMART_API_KEY="你的key" + export BITMART_SECRET_KEY="你的secret" + export BITMART_MEMO="合约交易" + """ + cfg = StrategyConfig() + bot = BitmartFuturesMeanReversionBot(cfg) + bot.action() + +# 9208.96 diff --git a/test2.py b/test2.py index c3885c7..85da5f0 100644 --- a/test2.py +++ b/test2.py @@ -1,63 +1,949 @@ -import oss2 import os +import time import uuid -from pathlib import Path +import datetime +from dataclasses import dataclass -def upload_file_to_oss(local_file_path): - # 建议使用环境变量存储敏感信息,这里为了示例方便,假设已通过环境变量设置 - endpoint = 'https://oss-cn-beijing.aliyuncs.com' - access_key_id = "LTAI5tRMxrM95Pi8JEEmqRcg" - access_key_secret = "8vueGCsRVeFyQMcAA7sysO7LSnuJDG" +from tqdm import tqdm +from loguru import logger - if not access_key_id or not access_key_secret: - print('❌ 错误: 未找到有效的 OSS 访问密钥,请检查环境变量。') - return None +from bitmart.api_contract import APIContract +from bitmart.lib.cloud_exceptions import APIException - # 生成唯一 Bucket 名称 - bucket_name = f'oss-bucket-yj' - print(f"创建 Bucket: {bucket_name}") +from 交易.tools import send_dingtalk_message - # 初始化 Bucket 对象 - auth = oss2.Auth(access_key_id, access_key_secret) - bucket = oss2.Bucket(auth, endpoint, bucket_name) + +@dataclass +class StrategyConfig: + # ============================= + # 1m | ETH 永续 | 控止损≤5/日 + # ============================= + + # ===== 合约 ===== + contract_symbol: str = "ETHUSDT" + open_type: str = "cross" + leverage: str = "30" + + # ===== K线与指标 ===== + step_min: int = 1 + lookback_min: int = 240 + ema_len: int = 36 + atr_len: int = 14 + + # ===== ADX 趋势过滤(新增)===== + # 目的:单边趋势(ADX高)时,抑制/禁止逆势均值回归单,避免反复反向开仓止损 + enable_adx_filter: bool = True + adx_len: int = 14 + adx_threshold: float = 25.0 # 常用:20~30区间,你可按回测调整 + # 过滤模式: + # - "block_countertrend": 只禁止逆着 DI 的方向开仓(推荐,既防反手又不完全停机) + # - "block_all": ADX 高时直接不允许任何新开仓(更保守) + adx_mode: str = "block_countertrend" + # 趋势保护冷却:当 ADX 高且刚止损,延长冷却,减少“止损->立刻反手”的连环 + cooldown_sec_after_sl_extra: int = 40 + + # ========================================================= + # ✅ 自动阈值:ATR/Price 分位数基准(更稳,不被短时噪声带跑) + # ========================================================= + vol_baseline_window: int = 60 + vol_baseline_quantile: float = 0.65 + vol_scale_min: float = 0.80 + vol_scale_max: float = 1.60 + + # ✅ baseline 每 60 秒刷新一次(体感更明显、也省CPU) + base_ratio_refresh_sec: int = 180 + + # ========================================================= + # ✅ 动态 floor(方案一) + # floor = clamp(min, base_k * base_ratio, max) + # 目的:跟着典型波动变,过滤小噪声;tp/sl 也随环境自适应 + # ========================================================= + # entry_dev_floor 动态 + entry_dev_floor_min: float = 0.0012 # 0.12% + entry_dev_floor_max: float = 0.0030 # 0.30% + entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio + + # tp_floor 动态 + tp_floor_min: float = 0.0006 # 0.06% + tp_floor_max: float = 0.0020 # 0.20% + tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio + + # sl_floor 动态 + sl_floor_min: float = 0.0018 # 0.18% + sl_floor_max: float = 0.0060 # 0.60% + sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratio + + # ========================================================= + # ✅ 动态阈值倍率 + # ========================================================= + entry_k: float = 1.45 + tp_k: float = 0.65 + sl_k: float = 1.05 + + # ===== 时间/冷却 ===== + max_hold_sec: int = 75 + cooldown_sec_after_exit: int = 20 + + # ===== 下单/仓位 ===== + risk_percent: float = 0.004 + min_size: int = 1 + max_size: int = 5000 + + # ===== 日内风控 ===== + daily_loss_limit: float = 0.02 + daily_profit_cap: float = 0.01 + + # ===== 危险模式过滤 ===== + atr_ratio_kill: float = 0.0038 + big_body_kill: float = 0.010 + + # ===== 轮询节奏 ===== + klines_refresh_sec: int = 10 + tick_refresh_sec: int = 1 + status_notify_sec: int = 60 + + # ========================================================= + # ✅ 止损后同向入场加门槛(你原来的逻辑保留) + # ========================================================= + reentry_penalty_mult: float = 1.55 + reentry_penalty_max_sec: int = 180 + reset_band_k: float = 0.45 + reset_band_floor: float = 0.0006 + + # ========================================================= + # ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动 + # ========================================================= + post_sl_sl_max_sec: int = 90 + post_sl_mult_min: float = 1.02 + post_sl_mult_max: float = 1.16 + post_sl_vol_alpha: float = 0.20 + + +class BitmartFuturesMeanReversionBot: + def __init__(self, cfg: StrategyConfig): + self.cfg = cfg + + self.api_key = os.getenv("BITMART_API_KEY", "").strip() + self.secret_key = os.getenv("BITMART_SECRET_KEY", "").strip() + self.memo = os.getenv("BITMART_MEMO", "合约交易").strip() + + if not self.api_key or not self.secret_key: + raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)") + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + # 持仓状态: -1 空, 0 无, 1 多 + self.pos = 0 + self.entry_price = None + self.entry_ts = None + self.last_exit_ts = 0 + + # 日内权益基准 + self.day_start_equity = None + self.trading_enabled = True + self.day_tag = datetime.date.today() + + # 缓存 + self._klines_cache = None + self._klines_cache_ts = 0 + self._last_status_notify_ts = 0 + + # ✅ base_ratio 缓存 + self._base_ratio_cached = 0.0015 # 初始化默认值 0.15% + self._base_ratio_ts = 0.0 + + # ✅ 止损后"同向入场加门槛"状态 + self.last_sl_dir = 0 # 1=多止损,-1=空止损,0=无 + self.last_sl_ts = 0.0 + + # ✅ 止损后"同方向 SL 联动放宽"状态 + self.post_sl_dir = 0 + self.post_sl_ts = 0.0 + self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale + + self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90) + logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%") + + # ----------------- 通用工具 ----------------- + def ding(self, msg, error=False): + prefix = "❌bitmart:" if error else "🔔bitmart:" + if error: + for _ in range(3): + send_dingtalk_message(f"{prefix}{msg}") + else: + send_dingtalk_message(f"{prefix}{msg}") + + def set_leverage(self) -> bool: + try: + resp = self.contractAPI.post_submit_leverage( + contract_symbol=self.cfg.contract_symbol, + leverage=self.cfg.leverage, + open_type=self.cfg.open_type + )[0] + if resp.get("code") == 1000: + logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x") + return True + logger.error(f"设置杠杆失败: {resp}") + self.ding(f"设置杠杆失败: {resp}", error=True) + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + self.ding(f"设置杠杆异常: {e}", error=True) + return False + + # ----------------- 行情/指标 ----------------- + def get_klines_cached(self): + now = time.time() + if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec: + return self._klines_cache + + kl = self.get_klines() + if kl: + self._klines_cache = kl + self._klines_cache_ts = now + return self._klines_cache + + def get_klines(self): + try: + end_time = int(time.time()) + start_time = end_time - 60 * self.cfg.lookback_min + + resp = self.contractAPI.get_kline( + contract_symbol=self.cfg.contract_symbol, + step=self.cfg.step_min, + start_time=start_time, + end_time=end_time + )[0] + + if resp.get("code") != 1000: + logger.error(f"获取K线失败: {resp}") + return None + + data = resp.get("data", []) + formatted = [] + for k in data: + formatted.append({ + "id": int(k["timestamp"]), + "open": float(k["open_price"]), + "high": float(k["high_price"]), + "low": float(k["low_price"]), + "close": float(k["close_price"]), + }) + formatted.sort(key=lambda x: x["id"]) + return formatted + except Exception as e: + logger.error(f"获取K线异常: {e}") + self.ding(f"获取K线异常: {e}", error=True) + return None + + def get_last_price(self, fallback_close: float) -> float: + try: + if hasattr(self.contractAPI, "get_contract_details"): + r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0] + d = r.get("data") if isinstance(r, dict) else None + if isinstance(d, dict): + for key in ("last_price", "mark_price", "index_price"): + if key in d and d[key] is not None: + return float(d[key]) + + if hasattr(self.contractAPI, "get_ticker"): + r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0] + d = r.get("data") if isinstance(r, dict) else None + if isinstance(d, dict): + for key in ("last_price", "price", "last", "close"): + if key in d and d[key] is not None: + return float(d[key]) + except Exception: + pass + + return float(fallback_close) + + @staticmethod + def ema(values, n: int) -> float: + k = 2 / (n + 1) + e = values[0] + for v in values[1:]: + e = v * k + e * (1 - k) + return e + + @staticmethod + def atr(klines, n: int) -> float: + if len(klines) < n + 1: + return 0.0 + trs = [] + for i in range(-n, 0): + cur = klines[i] + prev = klines[i - 1] + tr = max( + cur["high"] - cur["low"], + abs(cur["high"] - prev["close"]), + abs(cur["low"] - prev["close"]), + ) + trs.append(tr) + return sum(trs) / len(trs) + + @staticmethod + def _wilder_smooth(prev: float, val: float, n: int) -> float: + # Wilder smoothing: prev - prev/n + val + return prev - (prev / n) + val + + def adx(self, klines, n: int): + """ + 返回 (adx, plus_di, minus_di) + 采用经典 Wilder ADX/DI 计算(足够稳,避免趋势期逆势反复开仓) + """ + if len(klines) < (2 * n + 2): + return 0.0, 0.0, 0.0 + + highs = [k["high"] for k in klines] + lows = [k["low"] for k in klines] + closes = [k["close"] for k in klines] + + # 计算 TR, +DM, -DM 序列(从1开始) + tr_list = [] + pdm_list = [] + mdm_list = [] + + for i in range(1, len(klines)): + high = highs[i] + low = lows[i] + prev_close = closes[i - 1] + prev_high = highs[i - 1] + prev_low = lows[i - 1] + + tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) + up_move = high - prev_high + down_move = prev_low - low + + pdm = up_move if (up_move > down_move and up_move > 0) else 0.0 + mdm = down_move if (down_move > up_move and down_move > 0) else 0.0 + + tr_list.append(tr) + pdm_list.append(pdm) + mdm_list.append(mdm) + + # 初始平滑值(第一个n段的和) + tr14 = sum(tr_list[:n]) + pdm14 = sum(pdm_list[:n]) + mdm14 = sum(mdm_list[:n]) + + def safe_div(a, b): + return (a / b) if b != 0 else 0.0 + + plus_di = 100.0 * safe_div(pdm14, tr14) + minus_di = 100.0 * safe_div(mdm14, tr14) + dx = 100.0 * safe_div(abs(plus_di - minus_di), (plus_di + minus_di)) + dx_list = [dx] + + # 继续平滑并计算后续 DX + for i in range(n, len(tr_list)): + tr14 = self._wilder_smooth(tr14, tr_list[i], n) + pdm14 = self._wilder_smooth(pdm14, pdm_list[i], n) + mdm14 = self._wilder_smooth(mdm14, mdm_list[i], n) + + plus_di = 100.0 * safe_div(pdm14, tr14) + minus_di = 100.0 * safe_div(mdm14, tr14) + dx = 100.0 * safe_div(abs(plus_di - minus_di), (plus_di + minus_di)) + dx_list.append(dx) + + # ADX 是 DX 的 Wilder 平滑,常见做法:先取前 n 个 DX 的均值作为初值 + if len(dx_list) < (n + 1): + return 0.0, plus_di, minus_di + + adx0 = sum(dx_list[:n]) / n + adx_val = adx0 + for j in range(n, len(dx_list)): + adx_val = (adx_val * (n - 1) + dx_list[j]) / n + + return float(adx_val), float(plus_di), float(minus_di) + + def is_danger_market(self, klines, price: float) -> bool: + last = klines[-1] + body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0 + if body >= self.cfg.big_body_kill: + return True + + a = self.atr(klines, self.cfg.atr_len) + atr_ratio = (a / price) if price > 0 else 0.0 + if atr_ratio >= self.cfg.atr_ratio_kill: + return True + + return False + + def atr_ratio_baseline(self, klines) -> float: + """简化版ATR基准计算""" + window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1) + if window <= 10: + logger.warning(f"数据不足计算基准: {len(klines)}根K线") + return 0.0 + + ratios = [] + step = 3 + for i in range(-window, 0, step): + if len(klines) + i < self.cfg.atr_len + 1: + continue + + start_idx = len(klines) + i - self.cfg.atr_len + end_idx = len(klines) + i + if start_idx < 0 or end_idx <= start_idx: + continue + + sub_klines = klines[start_idx:end_idx] + if len(sub_klines) >= self.cfg.atr_len + 1: + a = self.atr(sub_klines, self.cfg.atr_len) + price = klines[end_idx - 1]["close"] + if a > 0 and price > 0: + ratio = a / price + if 0.0001 < ratio < 0.01: + ratios.append(ratio) + + if len(ratios) < 5: + a = self.atr(klines[-60:], self.cfg.atr_len) + price = klines[-1]["close"] + if a > 0 and price > 0: + baseline = a / price + logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%") + return baseline + return 0.0 + + ratios.sort() + idx = min(len(ratios) - 1, max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1)))) + baseline = ratios[idx] + logger.debug( + f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, " + f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]" + ) + return baseline + + def get_base_ratio_cached(self, klines) -> float: + """获取缓存的基准波动率,定期刷新""" + now = time.time() + refresh_sec = self.cfg.base_ratio_refresh_sec + + if (self._base_ratio_cached is None or (now - self._base_ratio_ts) >= refresh_sec): + baseline = self.atr_ratio_baseline(klines) + if baseline > 0.0001: + self._base_ratio_cached = baseline + self._base_ratio_ts = now + logger.info(f"基准波动率更新: {baseline * 100:.4f}%") + else: + current_price = klines[-1]["close"] if klines else 3000 + if current_price > 4000: + default_baseline = 0.0010 + elif current_price > 3500: + default_baseline = 0.0012 + elif current_price > 3000: + default_baseline = 0.0015 + elif current_price > 2500: + default_baseline = 0.0018 + else: + default_baseline = 0.0020 + + self._base_ratio_cached = default_baseline + self._base_ratio_ts = now + logger.warning( + f"使用价格动态默认基准: {default_baseline * 100:.4f}% (价格=${current_price:.0f})" + ) + + return self._base_ratio_cached + + @staticmethod + def _clamp(x: float, lo: float, hi: float) -> float: + return max(lo, min(hi, x)) + + def dynamic_thresholds(self, atr_ratio: float, base_ratio: float): + """ + ✅ entry/tp/sl 全部动态(修复版): + - vol_scale:atr_ratio/base_ratio 限幅 + - floor:方案一 (floor = clamp(min, k*base_ratio, max)) + - 最终阈值:max(floor, k * vol_scale * atr_ratio) + """ + if atr_ratio <= 0: + logger.warning(f"ATR比率异常: {atr_ratio}") + atr_ratio = 0.001 + + if base_ratio < 0.0005: + base_ratio = max(0.001, atr_ratio * 1.2) + logger.debug(f"基准太小,使用调整后的atr_ratio: {base_ratio * 100:.4f}%") + + if base_ratio > 0: + raw_scale = atr_ratio / base_ratio + vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max) + logger.debug( + f"vol_scale: {raw_scale:.2f} → {vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)" + ) + else: + vol_scale = 1.0 + logger.warning("基准无效,使用默认vol_scale=1.0") + + entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio + entry_floor = self._clamp(entry_floor_raw, self.cfg.entry_dev_floor_min, self.cfg.entry_dev_floor_max) + + tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio + tp_floor = self._clamp(tp_floor_raw, self.cfg.tp_floor_min, self.cfg.tp_floor_max) + + sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio + sl_floor = self._clamp(sl_floor_raw, self.cfg.sl_floor_min, self.cfg.sl_floor_max) + + entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio + entry_dev = max(entry_floor, entry_dev_atr_part) + + tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio + tp = max(tp_floor, tp_atr_part) + + sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio + sl = max(sl_floor, sl_atr_part) + + entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min) + + logger.info( + f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, " + f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, " + f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%" + ) + + return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor + + # ----------------- 账户/仓位 ----------------- + def get_assets_available(self) -> float: + try: + resp = self.contractAPI.get_assets_detail()[0] + if resp.get("code") != 1000: + return 0.0 + data = resp.get("data") + if isinstance(data, dict): + return float(data.get("available_balance", 0)) + if isinstance(data, list): + for asset in data: + if asset.get("currency") == "USDT": + return float(asset.get("available_balance", 0)) + return 0.0 + except Exception as e: + logger.error(f"余额查询异常: {e}") + return 0.0 + + def get_position_status(self) -> bool: + try: + resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0] + if resp.get("code") != 1000: + return False + + positions = resp.get("data", []) + if not positions: + self.pos = 0 + return True + + p = positions[0] + self.pos = 1 if p["position_type"] == 1 else -1 + return True + except Exception as e: + logger.error(f"持仓查询异常: {e}") + self.ding(f"持仓查询异常: {e}", error=True) + return False + + def get_equity_proxy(self) -> float: + return self.get_assets_available() + + def refresh_daily_baseline(self): + today = datetime.date.today() + if today != self.day_tag: + self.day_tag = today + self.day_start_equity = None + self.trading_enabled = True + self.ding(f"新的一天({today}):重置日内风控基准") + + def risk_kill_switch(self): + self.refresh_daily_baseline() + equity = self.get_equity_proxy() + if equity <= 0: + return + + if self.day_start_equity is None: + self.day_start_equity = equity + logger.info(f"日内权益基准设定:{equity:.2f} USDT") + return + + pnl = (equity - self.day_start_equity) / self.day_start_equity + if pnl <= -self.cfg.daily_loss_limit: + self.trading_enabled = False + self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True) + + if pnl >= self.cfg.daily_profit_cap: + self.trading_enabled = False + self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机") + + # ----------------- 下单 ----------------- + def calculate_size(self, price: float) -> int: + bal = self.get_assets_available() + if bal < 10: + return 0 + + margin = bal * self.cfg.risk_percent + lev = int(self.cfg.leverage) + + # ⚠️ 沿用你的原假设:1张≈0.001ETH + size = int((margin * lev) / (price * 0.001)) + size = max(self.cfg.min_size, size) + size = min(self.cfg.max_size, size) + return size + + def place_market_order(self, side: int, size: int) -> bool: + if size <= 0: + return False + + client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}" + try: + resp = self.contractAPI.post_submit_order( + contract_symbol=self.cfg.contract_symbol, + client_order_id=client_order_id, + side=side, + mode=1, + type="market", + leverage=self.cfg.leverage, + open_type=self.cfg.open_type, + size=size + )[0] + + logger.info(f"order_resp: {resp}") + + if resp.get("code") == 1000: + return True + + self.ding(f"下单失败: {resp}", error=True) + return False + + except APIException as e: + logger.error(f"API下单异常: {e}") + self.ding(f"API下单异常: {e}", error=True) + return False + + except Exception as e: + logger.error(f"下单未知异常: {e}") + self.ding(f"下单未知异常: {e}", error=True) + return False + + def close_position_all(self): + if self.pos == 1: + ok = self.place_market_order(3, 999999) + if ok: + self.pos = 0 + elif self.pos == -1: + ok = self.place_market_order(2, 999999) + if ok: + self.pos = 0 + + # ----------------- 止损后机制 ----------------- + def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool: + """检查是否需要应用重新入场惩罚(你原逻辑保留)""" + if self.last_sl_dir == 0: + return False + + if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec: + self.last_sl_dir = 0 + return False + + reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev) + if abs(dev) <= reset_band: + self.last_sl_dir = 0 + return False + + return True + + def _post_sl_dynamic_mult(self) -> float: + """计算止损后SL放宽倍数""" + if self.post_sl_dir == 0: + return 1.0 + + if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec: + self.post_sl_dir = 0 + self.post_sl_vol_scale = 1.0 + return 1.0 + + raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0) + raw = max(1.0, raw) # 不缩小止损,只放宽 + return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw)) + + # ----------------- 交易逻辑 ----------------- + def in_cooldown(self) -> bool: + """检查是否在冷却期内(新增:止损后可额外延长冷却,用于抑制反手连环)""" + cd = self.cfg.cooldown_sec_after_exit + if self.last_sl_ts and (time.time() - self.last_sl_ts) < self.cfg.reentry_penalty_max_sec: + cd += max(0, self.cfg.cooldown_sec_after_sl_extra) + return (time.time() - self.last_exit_ts) < cd + + def _adx_blocks_entry(self, adx_val: float, plus_di: float, minus_di: float, want_dir: int) -> bool: + """ + ADX 趋势过滤: + - want_dir: 1=想开多, -1=想开空 + """ + if not self.cfg.enable_adx_filter: + return False + if adx_val < self.cfg.adx_threshold: + return False + + if self.cfg.adx_mode == "block_all": + return True + + # block_countertrend:只禁止逆 DI 方向 + # 上升趋势:+DI > -DI => 禁止开空 + # 下降趋势:-DI > +DI => 禁止开多 + if plus_di > minus_di and want_dir == -1: + return True + if minus_di > plus_di and want_dir == 1: + return True + return False + + def maybe_enter(self, price: float, ema_value: float, entry_dev: float, + adx_val: float, plus_di: float, minus_di: float): + """检查并执行入场(新增:ADX趋势过滤,防止趋势期逆势反复开仓)""" + if self.pos != 0: + return + if self.in_cooldown(): + return + + dev = (price - ema_value) / ema_value if ema_value else 0.0 + size = self.calculate_size(price) + if size <= 0: + return + + penalty_active = self._reentry_penalty_active(dev, entry_dev) + + long_th = -entry_dev + short_th = entry_dev + + if penalty_active: + if self.last_sl_dir == 1: + long_th = -entry_dev * self.cfg.reentry_penalty_mult + logger.info(f"多头止损后惩罚生效: long_th={long_th * 100:.3f}%") + elif self.last_sl_dir == -1: + short_th = entry_dev * self.cfg.reentry_penalty_mult + logger.info(f"空头止损后惩罚生效: short_th={short_th * 100:.3f}%") + + logger.info( + f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% " + f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) " + f"ADX={adx_val:.2f} +DI={plus_di:.2f} -DI={minus_di:.2f} " + f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}" + ) + + # 先判断信号,再用 ADX 过滤(这样日志更直观) + if dev <= long_th: + if self._adx_blocks_entry(adx_val, plus_di, minus_di, want_dir=1): + logger.warning( + f"ADX过滤:趋势期禁止逆势开多(ADX={adx_val:.2f}, +DI={plus_di:.2f}, -DI={minus_di:.2f})" + ) + return + + if self.place_market_order(1, size): + self.pos = 1 + self.entry_price = price + self.entry_ts = time.time() + self.ding(f"✅开多:dev={dev * 100:.3f}% size={size} entry={price:.2f}") + + elif dev >= short_th: + if self._adx_blocks_entry(adx_val, plus_di, minus_di, want_dir=-1): + logger.warning( + f"ADX过滤:趋势期禁止逆势开空(ADX={adx_val:.2f}, +DI={plus_di:.2f}, -DI={minus_di:.2f})" + ) + return + + if self.place_market_order(4, size): + self.pos = -1 + self.entry_price = price + self.entry_ts = time.time() + self.ding(f"✅开空:dev={dev * 100:.3f}% size={size} entry={price:.2f}") + + def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float): + """检查并执行出场""" + if self.pos == 0 or self.entry_price is None or self.entry_ts is None: + return + + hold = time.time() - self.entry_ts + + if self.pos == 1: + pnl = (price - self.entry_price) / self.entry_price + else: + pnl = (self.entry_price - price) / self.entry_price + + sl_mult = 1.0 + if self.post_sl_dir == self.pos and self.post_sl_dir != 0: + sl_mult = self._post_sl_dynamic_mult() + effective_sl = sl * sl_mult + + if pnl >= tp: + self.close_position_all() + self.ding(f"🎯止盈:pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%") + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + elif pnl <= -effective_sl: + sl_dir = self.pos + self.close_position_all() + self.ding( + f"🛑止损:pnl={pnl * 100:.3f}% price={price:.2f} " + f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})", + error=True + ) + + # 记录止损方向与时间 + self.last_sl_dir = sl_dir + self.last_sl_ts = time.time() + + self.post_sl_dir = sl_dir + self.post_sl_ts = time.time() + self.post_sl_vol_scale = float(vol_scale) + + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + elif hold >= self.cfg.max_hold_sec: + self.close_position_all() + self.ding(f"⏱超时:hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}") + self.entry_price, self.entry_ts = None, None + self.last_exit_ts = time.time() + + def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float, + atr_ratio: float, base_ratio: float, vol_scale: float, + entry_dev: float, tp: float, sl: float, + entry_floor: float, tp_floor: float, sl_floor: float, + adx_val: float, plus_di: float, minus_di: float): + """限频状态通知""" + now = time.time() + if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec: + return + self._last_status_notify_ts = now + + direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无") + penalty_active = self._reentry_penalty_active(dev, entry_dev) + + sl_mult = 1.0 + if self.pos != 0 and self.post_sl_dir == self.pos: + sl_mult = self._post_sl_dynamic_mult() + + base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1 + + msg = ( + f"【BitMart {self.cfg.contract_symbol}|1m均值回归(动态阈值+ADX过滤)】\n" + f"📊 状态:{direction_str}\n" + f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}:{ema_value:.2f}\n" + f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n" + f"🌊 波动率:ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n" + f"🧭 趋势:ADX={adx_val:.2f} | +DI={plus_di:.2f} | -DI={minus_di:.2f} " + f"(阈值={self.cfg.adx_threshold:.1f}, 模式={self.cfg.adx_mode})\n" + f"🎯 动态Floor:入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n" + f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n" + f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n" + f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n" + f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x\n" + f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s" + ) + self.ding(msg) + + def action(self): + """主循环""" + if not self.set_leverage(): + self.ding("杠杆设置失败,停止运行", error=True) + return + + while True: + now_dt = datetime.datetime.now() + self.pbar.n = now_dt.second + self.pbar.refresh() + + # 1. 获取K线数据 + klines = self.get_klines_cached() + if not klines or len(klines) < (max(self.cfg.ema_len + 5, 2 * self.cfg.adx_len + 5)): + logger.warning("K线数据不足,等待...") + time.sleep(1) + continue + + # 2. 计算技术指标 + last_k = klines[-1] + closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]] + ema_value = self.ema(closes, self.cfg.ema_len) + + price = self.get_last_price(fallback_close=float(last_k["close"])) + dev = (price - ema_value) / ema_value if ema_value else 0.0 + + # 3. 计算波动率相关指标 + a = self.atr(klines, self.cfg.atr_len) + atr_ratio = (a / price) if price > 0 else 0.0 + base_ratio = self.get_base_ratio_cached(klines) + + # 4. 计算动态阈值 + entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds( + atr_ratio, base_ratio + ) + + # 5. 计算 ADX(新增) + adx_val, plus_di, minus_di = self.adx(klines, self.cfg.adx_len) + logger.info(f"ADX: {adx_val:.2f} (+DI={plus_di:.2f}, -DI={minus_di:.2f})") + + # 6. 风控检查 + self.risk_kill_switch() + + # 7. 获取持仓状态 + if not self.get_position_status(): + time.sleep(1) + continue + + # 8. 检查交易是否启用 + if not self.trading_enabled: + if self.pos != 0: + self.close_position_all() + logger.warning("交易被禁用(风控触发),等待...") + time.sleep(5) + continue + + # 9. 检查危险市场 + if self.is_danger_market(klines, price): + logger.warning("危险模式:高波动/大实体K,暂停开仓") + self.maybe_exit(price, tp, sl, vol_scale) + time.sleep(self.cfg.tick_refresh_sec) + continue + + # 10. 执行交易逻辑 + self.maybe_exit(price, tp, sl, vol_scale) + self.maybe_enter(price, ema_value, entry_dev, adx_val, plus_di, minus_di) + + # 11. 状态通知 + bal = self.get_assets_available() + self.notify_status_throttled( + price, ema_value, dev, bal, + atr_ratio, base_ratio, vol_scale, + entry_dev, tp, sl, + entry_floor, tp_floor, sl_floor, + adx_val, plus_di, minus_di + ) + + time.sleep(self.cfg.tick_refresh_sec) + + +if __name__ == "__main__": + """ + Windows PowerShell: + setx BITMART_API_KEY "你的key" + setx BITMART_SECRET_KEY "你的secret" + setx BITMART_MEMO "合约交易" + 重新打开终端再运行。 + + Linux/macOS: + export BITMART_API_KEY="你的key" + export BITMART_SECRET_KEY="你的secret" + export BITMART_MEMO "合约交易" + """ + cfg = StrategyConfig() + bot = BitmartFuturesMeanReversionBot(cfg) + + logger.remove() + logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO") try: - # 1. 创建 Bucket - bucket.create_bucket(oss2.models.BUCKET_ACL_PUBLIC_READ) # 设置 Bucket 为公共读权限 - print(f'✅ 成功创建 Bucket: {bucket_name}') - except oss2.exceptions.BucketAlreadyExists: - print(f'⚠️ Bucket {bucket_name} 已经存在') - print('提示:请使用不同的 Bucket 名称或使用现有 Bucket') - except oss2.exceptions.OssError as e: - print(f'❌ OSS 错误: {e}') - print(f' 错误码: {e.code}') - print(f' 请求 ID: {e.request_id}') - return None - - # 2. 验证本地文件是否存在 - if not Path(local_file_path).exists(): - print(f'❌ 文件错误: 本地文件 {local_file_path} 不存在') - return None - - file_name = os.path.basename(local_file_path) - oss_object_name = uuid.uuid4().hex[:12] + file_name - result = bucket.put_object_from_file(oss_object_name, local_file_path) - - if result.status == 200: - print(f'✅ 文件上传成功') - print(f' ETag: {result.etag}') - print(f' 文件大小: {result.resp.headers.get("content-length")} bytes') - - # 生成长期可访问的 URL - public_url = f'https://{bucket_name}.{endpoint.replace("https://", "")}/{oss_object_name}' - print(f' 文件公共 URL(长期有效): {public_url}') - return public_url - else: - print(f'❌ 文件上传失败,状态码: {result.status}') - return None - -# 示例调用 -local_file = r'C:\Users\27942\Desktop\codes\lm_code\test.py' -signed_url = upload_file_to_oss(local_file) -if signed_url: - print(f"可访问文件的 URL: {signed_url}") \ No newline at end of file + bot.action() + except KeyboardInterrupt: + logger.info("程序被用户中断") + bot.ding("🤖 策略已手动停止") + except Exception as e: + logger.error(f"程序异常退出: {e}") + bot.ding(f"❌ 策略异常退出: {e}", error=True) + raise diff --git a/websea/api交易.py b/websea/api交易.py new file mode 100644 index 0000000..4fea0b8 --- /dev/null +++ b/websea/api交易.py @@ -0,0 +1,390 @@ +import os +import time +import random +import string +import hashlib +import requests +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + + +# ------------------------- +# Config +# ------------------------- +@dataclass +class BotConfig: + symbol: str = "ETH-USDT" + quote_offset: float = 0.0006 + refresh_threshold: float = 0.0005 + amount: float = 1.0 + lever_rate: int = 3 + is_full: int = 1 + depth_limit: int = 20 + loop_interval: float = 2.0 + dry_run: bool = True + max_net_pos_contracts: float = 5.0 + min_spread_frac: float = 0.00015 + max_spread_frac: float = 0.0030 + close_threshold_frac: float = 0.8 # New: Fraction of max_net to trigger closing + close_offset: float = 0.001 # New: Offset for closing prices + + +# ------------------------- +# Helper Functions +# ------------------------- +def best_bid_ask(depth: Dict[str, Any]) -> Tuple[float, float]: + """Extract best bid and ask from depth data.""" + result = depth.get("result", {}) + bids = result.get("bids", []) # Assuming list of dicts: [{'price': str, 'number': str}, ...], bids descending + asks = result.get("asks", []) # asks ascending + if not bids or not asks: + raise ValueError("Invalid depth data") + best_bid = float(bids[0]["price"]) + best_ask = float(asks[0]["price"]) + return best_bid, best_ask + + +def compute_net_position(pos_data: Dict[str, Any], symbol: str) -> float: + """Compute net position contracts (long - short) for the symbol.""" + result = pos_data.get("result", []) + net = 0.0 + for p in result: + if p.get("symbol") == symbol: + side = p.get("side", "").lower() # Assuming 'buy' or 'long' for long, 'sell' or 'short' for short + amount = float(p.get("hold_vol", 0) or p.get("position", 0)) # Common field names + if "buy" in side or "long" in side: + net += amount + elif "sell" in side or "short" in side: + net -= amount + return net + + +def classify_current_orders(cur_data: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]: + """Classify current orders into buy_orders and sell_orders.""" + orders = cur_data.get("result", {}).get("list", []) + buy_orders = [o for o in orders if o.get("type", "").lower().startswith("buy")] + sell_orders = [o for o in orders if o.get("type", "").lower().startswith("sell")] + return buy_orders, sell_orders + + +# ------------------------- +# Websea REST Client (optimized) +# ------------------------- +class WebseaAPIError(RuntimeError): + def __init__(self, errno: int, errmsg: str, payload: Dict[str, Any]): + super().__init__(f"Websea API error: errno={errno} errmsg={errmsg}") + self.errno = errno + self.errmsg = errmsg + self.payload = payload + + +class WebseaClient: + def __init__(self, token: str, secret: str, base_url: str = "https://coapi.websea.com", timeout: int = 15): + self.token = token.strip() + self.secret = secret.strip() + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.session = requests.Session() + + @staticmethod + def _nonce() -> str: + ts = str(int(time.time())) + rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5)) + return f"{ts}_{rand}" + + @staticmethod + def _stringify_params(d: Optional[Dict[str, Any]]) -> Dict[str, str]: + """Convert non-None values to str for consistent signing/parsing.""" + if not d: + return {} + out: Dict[str, str] = {} + for k, v in d.items(): + if v is None: + continue + if isinstance(v, bool): + out[k] = "1" if v else "0" + else: + out[k] = str(v) + return out + + def _signature(self, nonce: str, params: Dict[str, str]) -> str: + tmp: List[str] = [self.token, self.secret, nonce] + for k, v in params.items(): + tmp.append(f"{k}={v}") + tmp.sort() + payload = "".join(tmp) + return hashlib.sha1(payload.encode("utf-8")).hexdigest() + + def _request( + self, + method: str, + path: str, + *, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + auth: bool = False, + ) -> Dict[str, Any]: + method = method.upper() + url = f"{self.base_url}{path}" + + q = self._stringify_params(params) + b = self._stringify_params(data) + + sig_params: Dict[str, str] = {**q, **b} + + headers: Dict[str, str] = { + "Content-Type": "application/x-www-form-urlencoded" if method in ("POST", "PUT", "DELETE") else "application/json" + } + + if auth: + nonce = self._nonce() + signature = self._signature(nonce, sig_params) + headers.update({ + "Token": self.token, + "Nonce": nonce, + "Signature": signature, + }) + + resp = self.session.request( + method=method, + url=url, + params=q if q and method == "GET" else None, + data=b if (method in ("POST", "PUT", "DELETE") and b) else None, + json=None if b else data, # Fallback if needed + timeout=self.timeout, + headers=headers, + ) + try: + resp.raise_for_status() + except requests.HTTPError as e: + print(f"HTTP Error: {e} - Response: {resp.text}") + raise + + try: + payload = resp.json() + except ValueError: + print(f"Invalid JSON: {resp.text}") + raise + + if isinstance(payload, dict): + errno = payload.get("errno") + if errno not in (None, 0): + raise WebseaAPIError(int(errno), str(payload.get("errmsg")), payload) + return payload + + def futures_depth(self, symbol: str, limit: int = 20) -> Dict[str, Any]: + return self._request("GET", "/openApi/contract/depth", params={"symbol": symbol, "limit": limit}, auth=False) + + def futures_24hr(self, symbol: str) -> Dict[str, Any]: + return self._request("GET", "/openApi/market/24hr", params={"symbol": symbol}, auth=False) + + def positions(self, symbol: str, is_full: int = 1) -> Dict[str, Any]: + return self._request("GET", "/openApi/contract/position", params={"symbol": symbol, "is_full": is_full}, auth=True) + + def current_orders(self, symbol: str, is_full: int = 1, limit: int = 100, direct: str = "prev") -> Dict[str, Any]: + return self._request("GET", "/openApi/contract/currentList", + params={"symbol": symbol, "is_full": is_full, "limit": limit, "direct": direct}, auth=True) + + def add_order( + self, + *, + contract_type: str, # open/close + order_type: str, # buy-limit / sell-limit + symbol: str, + amount: float, + price: Optional[float] = None, + lever_rate: Optional[int] = None, + is_full: int = 1, + ) -> Dict[str, Any]: + return self._request( + "POST", + "/openApi/contract/add", + data={ + "contract_type": contract_type, + "type": order_type, + "symbol": symbol, + "amount": amount, + "price": price, + "lever_rate": lever_rate, + "is_full": is_full, + }, + auth=True, + ) + + def cancel_orders(self, *, symbol: str, order_ids: List[str]) -> Dict[str, Any]: + return self._request( + "POST", + "/openApi/contract/cancel", + data={"symbol": symbol, "order_ids": ",".join(order_ids)}, + auth=True, + ) + + def auth_ping(self, symbol: str, is_full: int) -> None: + """Minimal auth test: fetch positions to confirm user recognition.""" + _ = self.positions(symbol=symbol, is_full=is_full) + + +# ------------------------- +# Bot loop +# ------------------------- +def run_bot(client: WebseaClient, cfg: BotConfig) -> None: + last_mid: Optional[float] = None + print(f"[BOT] symbol={cfg.symbol} dry_run={cfg.dry_run} max_net={cfg.max_net_pos_contracts}") + + while True: + try: + depth = client.futures_depth(cfg.symbol, limit=cfg.depth_limit) + bid, ask = best_bid_ask(depth) + mid = (bid + ask) / 2.0 + spread_frac = (ask - bid) / mid + + if spread_frac < cfg.min_spread_frac or spread_frac > cfg.max_spread_frac: + print(f"[SKIP] spread={spread_frac:.6f} bid={bid:.2f} ask={ask:.2f}") + time.sleep(cfg.loop_interval) + continue + + pos = client.positions(symbol=cfg.symbol, is_full=cfg.is_full) + net = compute_net_position(pos, cfg.symbol) + + cur = client.current_orders(symbol=cfg.symbol, is_full=cfg.is_full, limit=100) + buy_orders, sell_orders = classify_current_orders(cur) + + buy_px = bid * (1.0 - cfg.quote_offset) + sell_px = ask * (1.0 + cfg.quote_offset) + + need_refresh = last_mid is None or abs(mid - last_mid) / last_mid >= cfg.refresh_threshold + + def far(orders: List[Dict[str, Any]], target: float) -> bool: + if not orders: + return True + prices = [float(o.get("price") or 0) for o in orders if o.get("price")] + if not prices: + return True + best = min(prices, key=lambda x: abs(x - target)) + return abs(best - target) / target > (cfg.refresh_threshold * 0.8) + + if far(buy_orders, buy_px) or far(sell_orders, sell_px): + need_refresh = True + + allow_buy = net < cfg.max_net_pos_contracts + allow_sell = net > -cfg.max_net_pos_contracts + + if need_refresh: + ids: List[str] = [str(o.get("order_id")) for o in (buy_orders + sell_orders) if o.get("order_id")] + if ids: + if cfg.dry_run: + print(f"[DRY] cancel {len(ids)} orders") + else: + client.cancel_orders(symbol=cfg.symbol, order_ids=ids) + print(f"[OK] canceled {len(ids)} orders") + + rounded_buy_px = round(buy_px, 2) + rounded_sell_px = round(sell_px, 2) + + if cfg.dry_run: + print(f"[DRY] mid={mid:.2f} net={net:.2f} buy@{rounded_buy_px:.2f} sell@{rounded_sell_px:.2f}") + else: + if allow_buy: + r1 = client.add_order( + contract_type="open", + order_type="buy-limit", + symbol=cfg.symbol, + amount=cfg.amount, + price=rounded_buy_px, + lever_rate=cfg.lever_rate, + is_full=cfg.is_full, + ) + print("[OK] buy:", r1) + + if allow_sell: + r2 = client.add_order( + contract_type="open", + order_type="sell-limit", + symbol=cfg.symbol, + amount=cfg.amount, + price=rounded_sell_px, + lever_rate=cfg.lever_rate, + is_full=cfg.is_full, + ) + print("[OK] sell:", r2) + + last_mid = mid + else: + print(f"[HOLD] mid={mid:.2f} spread={spread_frac:.6f} net={net:.2f} " + f"orders(buy={len(buy_orders)} sell={len(sell_orders)})") + + # Enhanced closing logic if net is extreme + if abs(net) > cfg.max_net_pos_contracts * cfg.close_threshold_frac and not cfg.dry_run: + if net > 0: + close_type = "close" + close_order_type = "sell-limit" + close_px = mid * (1.0 - cfg.close_offset) + close_amt = net # Close all long + else: + close_type = "close" + close_order_type = "buy-limit" + close_px = mid * (1.0 + cfg.close_offset) + close_amt = abs(net) # Close all short + print(f"[WARN] Net position high ({net:.2f}), attempting to {close_type} with {close_order_type} amt={close_amt:.2f} @ {round(close_px, 2):.2f}") + r_close = client.add_order( + contract_type=close_type, + order_type=close_order_type, + symbol=cfg.symbol, + amount=close_amt, + price=round(close_px, 2), + lever_rate=cfg.lever_rate, + is_full=cfg.is_full, + ) + print("[OK] close:", r_close) + + except WebseaAPIError as e: + if e.errno == 20522: + print("[AUTH] User not recognized: Check Token/Secret or API permissions (enable Futures Trading).") + else: + print(f"[API] {e} payload={e.payload}") + except requests.HTTPError as e: + print(f"[HTTP] {e}") + except Exception as e: + print(f"[ERR] {e}") + + time.sleep(cfg.loop_interval) + + +def env_bool(name: str, default: bool) -> bool: + v = os.environ.get(name) + if v is None: + return default + return v.strip().lower() in ("1", "true", "yes", "y", "on") + + +def main() -> None: + token = os.environ.get("WEBSEA_TOKEN", "") + secret = os.environ.get("WEBSEA_SECRET", "") + if not token or not secret: + raise SystemExit("Please set environment variables WEBSEA_TOKEN and WEBSEA_SECRET") + + cfg = BotConfig(dry_run=env_bool("DRY_RUN", True)) + + client = WebseaClient(token=token, secret=secret) + + # Public check + try: + t = client.futures_24hr(cfg.symbol) + print("[API] 24hr ok:", t) + except Exception as e: + print("[API CHECK FAILED]", e) + + # Auth check + try: + client.auth_ping(cfg.symbol, cfg.is_full) + print("[AUTH] ok (user recognized)") + except WebseaAPIError as e: + print(f"[AUTH FAILED] errno={e.errno} errmsg={e.errmsg} payload={e.payload}") + print("Tips: Ensure API Key has Futures permissions enabled in Websea dashboard.") + raise + + run_bot(client, cfg) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/推特/main.py b/推特/main.py index aaffad0..d375673 100644 --- a/推特/main.py +++ b/推特/main.py @@ -66,11 +66,12 @@ class Hub_Web: bit_id = createBrowser( name=self.x_info.user_name, + proxyType="http", groupId=fz_datas['推特'], - host=self.ips_info.host, - port=int(self.ips_info.port), - proxyUserName=self.ips_info.username, - proxyPassword=self.ips_info.password, + host="104.168.59.92", + port=int(random.randint(20001, 25000)), + # proxyUserName=self.ips_info.username, + # proxyPassword=self.ips_info.password, ) self.xstart_info.bit_id = bit_id