添加本地文件

This commit is contained in:
Administrator
2026-01-28 16:32:54 +08:00
parent e48e11de45
commit 73931aa69a
7 changed files with 5506 additions and 0 deletions

29
bitmart/test_api.py Normal file
View File

@@ -0,0 +1,29 @@
"""测试 BitMart API K线获取"""
import time
from bitmart.api_contract import APIContract
api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
memo = "数据抓取"
contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15))
# 测试获取最近1小时的15分钟K线
end_time = int(time.time())
start_time = end_time - 3600 # 1小时前
print(f"当前时间戳: {end_time}")
print(f"开始时间戳: {start_time}")
print(f"当前时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
try:
response = contractAPI.get_kline(
contract_symbol="ETHUSDT",
step=15,
start_time=start_time,
end_time=end_time
)
print(f"\n响应: {response}")
except Exception as e:
print(f"\n错误: {e}")

View File

@@ -0,0 +1,656 @@
"""
量化交易回测系统 - 三分之一回归策略(优化版)
========== 策略规则 ==========
1. 开多条件:
- 找到实体>=0.1的前一根K线如果前一根实体<0.1,继续往前找)
- 前一根是阴线close < open
- 当前K线的最高价包括影线涨到前一根阴线实体的 1/3 处
- 即当前high >= prev_close + (prev_open - prev_close) / 3
2. 平多/开空条件:
- 找到实体>=0.1的前一根K线如果前一根实体<0.1,继续往前找)
- 前一根是阳线close > open
- 当前K线的最低价包括影线跌到前一根阳线实体的 1/3 处
- 即当前low <= prev_close - (prev_close - prev_open) / 3
3. 执行逻辑:
- 做多时遇到开空信号 -> 平多并反手开空
- 做空时遇到开多信号 -> 平空并反手开多
4. 实体过滤:
- 如果前一根K线的实体部分|open - close|< 0.1,继续往前查找
- 直到找到实体>=0.1的K线再用那根K线来判断1/3位置
"""
import datetime
import calendar
import os
from typing import List, Dict, Optional
from loguru import logger
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
import matplotlib
try:
import plotly.graph_objects as go
except Exception:
go = None
from models.bitmart_klines import BitMartETH15M
# 配置中文字体
import matplotlib.font_manager as fm
import warnings
# 忽略matplotlib的字体警告
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib.font_manager')
warnings.filterwarnings('ignore', message='.*Glyph.*missing.*', category=UserWarning)
# 尝试设置中文字体,按优先级尝试
chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi', 'FangSong', 'STSong', 'STHeiti']
available_fonts = [f.name for f in fm.fontManager.ttflist]
# 找到第一个可用的中文字体
font_found = None
for font_name in chinese_fonts:
if font_name in available_fonts:
font_found = font_name
break
if font_found:
plt.rcParams['font.sans-serif'] = [font_found] + ['DejaVu Sans']
logger.info(f"使用中文字体: {font_found}")
else:
# 如果没有找到中文字体,尝试使用系统默认字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS', 'DejaVu Sans']
logger.warning("未找到中文字体,使用默认配置")
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
plt.rcParams['font.size'] = 10 # 设置默认字体大小
# 尝试清除字体缓存(如果可能)
try:
# 不强制重建,避免性能问题
pass
except:
pass
# 获取当前脚本所在目录
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_level(prev):
"""
计算前一根K线实体的 1/3 回归位置
返回:(触发价格, 方向)
- 如果前一根是阴线返回向上1/3价格方向为 'long'
- 如果前一根是阳线返回向下1/3价格方向为 'short'
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
if is_bearish(prev): # 阴线,向上回归
# 阴线实体 = open - close
body = p_open - p_close
trigger_price = p_close + body / 3 # 从低点涨 1/3
return trigger_price, 'long'
elif is_bullish(prev): # 阳线,向下回归
# 阳线实体 = close - open
body = p_close - p_open
trigger_price = p_close - body / 3 # 从高点跌 1/3
return trigger_price, 'short'
return None, None
def check_trigger(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
检查当前K线是否触发了交易信号
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则考虑影线部分high/low因为实际交易中价格会到达影线位置
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=0.1的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data, current_idx, min_body_size)
if prev is None:
return None, None, None
trigger_price, direction = get_one_third_level(prev)
if trigger_price is None:
return None, None, None
# 使用影线部分high/low来判断因为实际交易中价格会到达这些位置
c_high = float(curr['high'])
c_low = float(curr['low'])
# 做多前一根阴线当前K线的最高价包括影线达到触发价格
if direction == 'long' and c_high >= trigger_price:
return 'long', trigger_price, valid_prev_idx
# 做空前一根阳线当前K线的最低价包括影线达到触发价格
if direction == 'short' and c_low <= trigger_price:
return 'short', trigger_price, valid_prev_idx
return None, None, None
def get_data_by_date(model, date_str: str):
"""按天获取指定表的数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(dates: List[str]):
"""三分之一回归策略回测"""
all_data: List[Dict] = []
for d in dates:
day_data = get_data_by_date(BitMartETH15M, d)
all_data.extend(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {len(all_data)} 条K线数据")
if not all_data:
logger.warning("未获取到任何数据")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_time = datetime.datetime.fromtimestamp(all_data[0]['id'] / 1000)
last_time = datetime.datetime.fromtimestamp(all_data[-1]['id'] / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M')}{last_time.strftime('%Y-%m-%d %H:%M')}")
# 验证排序打印前5条数据
logger.info("===== 前5条数据验证排序=====")
for i in range(min(5, len(all_data))):
d = all_data[i]
t = datetime.datetime.fromtimestamp(d['id'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
k_type = "阳线" if is_bullish(d) else ("阴线" if is_bearish(d) else "十字星")
logger.info(f" [{i}] {t} | {k_type} | O={d['open']:.2f} H={d['high']:.2f} L={d['low']:.2f} C={d['close']:.2f}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
# 使用新的check_trigger函数它会自动查找实体>=0.1的前一根K线
direction, trigger_price, valid_prev_idx = check_trigger(all_data, idx, min_body_size=0.1)
# 获取有效的前一根K线用于日志输出
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
# 空仓时,有信号就开仓
if current_position is None:
if direction and valid_prev is not None:
# 调试打印开仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【开仓】{direction} @ {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
continue
# 有仓位时,检查是否有反向信号
pos_dir = current_position['direction']
if direction and direction != pos_dir and valid_prev is not None:
# 平仓
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
# 调试打印平仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【平仓反手】{pos_dir} -> {direction} @ {exit_price:.2f}, 盈亏: {diff:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
# 尾仓处理
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
return trades, stats, all_data
# ========================= 绘图函数 =========================
def plot_trades(all_data: List[Dict], trades: List[Dict], save_path: str = None):
"""
绘制K线图并标注交易点位
"""
if not all_data:
logger.warning("没有数据可绘制")
return
# 转换为 DataFrame
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.set_index('datetime', inplace=True)
df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'})
df = df[['Open', 'High', 'Low', 'Close']]
# 准备标记点
buy_signals = [] # 做多开仓
sell_signals = [] # 做空开仓
buy_exits = [] # 做多平仓
sell_exits = [] # 做空平仓
for trade in trades:
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
exit_time = pd.to_datetime(trade['exit_time_ms'], unit='ms')
direction = trade['direction']
entry_price = trade['entry']
exit_price = trade['exit']
if direction == '做多':
buy_signals.append((entry_time, entry_price))
buy_exits.append((exit_time, exit_price))
else:
sell_signals.append((entry_time, entry_price))
sell_exits.append((exit_time, exit_price))
# 创建标记序列
buy_markers = pd.Series(index=df.index, dtype=float)
sell_markers = pd.Series(index=df.index, dtype=float)
buy_exit_markers = pd.Series(index=df.index, dtype=float)
sell_exit_markers = pd.Series(index=df.index, dtype=float)
for t, p in buy_signals:
if t in buy_markers.index:
buy_markers[t] = p
for t, p in sell_signals:
if t in sell_markers.index:
sell_markers[t] = p
for t, p in buy_exits:
if t in buy_exit_markers.index:
buy_exit_markers[t] = p
for t, p in sell_exits:
if t in sell_exit_markers.index:
sell_exit_markers[t] = p
# 添加标记
add_plots = []
if buy_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_markers, type='scatter', markersize=100,
marker='^', color='green', label='做多开仓'))
if sell_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_markers, type='scatter', markersize=100,
marker='v', color='red', label='做空开仓'))
if buy_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_exit_markers, type='scatter', markersize=80,
marker='x', color='darkgreen', label='做多平仓'))
if sell_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_exit_markers, type='scatter', markersize=80,
marker='x', color='darkred', label='做空平仓'))
# 绘制K线图更接近交易所风格
market_colors = mpf.make_marketcolors(
up='#26a69a', # 常见交易所绿色
down='#ef5350', # 常见交易所红色
edge='inherit',
wick='inherit',
volume='inherit'
)
style = mpf.make_mpf_style(
base_mpf_style='binance',
marketcolors=market_colors,
gridstyle='-',
gridcolor='#e6e6e6'
)
fig, axes = mpf.plot(
df,
type='candle',
style=style,
title='三分之一回归策略回测',
ylabel='价格',
addplot=add_plots if add_plots else None,
figsize=(16, 9),
returnfig=True
)
# 添加图例
axes[0].legend(['做多开仓 ▲', '做空开仓 ▼', '做多平仓 ✕', '做空平仓 ✕'], loc='upper left')
# 标注开仓细节(方向、价格、时间)
if trades:
ax = axes[0]
max_annotate = 60 # 过多会拥挤,可按需调大/调小
annotated = 0
for i, trade in enumerate(trades):
if annotated >= max_annotate:
break
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
if entry_time not in df.index:
continue
entry_price = trade['entry']
direction = trade['direction']
color = 'green' if direction == '做多' else 'red'
text = f"{direction} @ {entry_price:.2f}\n{entry_time.strftime('%m-%d %H:%M')}"
y_offset = 20 if (i % 2 == 0) else -30
ax.annotate(
text,
xy=(entry_time, entry_price),
xytext=(0, y_offset),
textcoords='offset points',
ha='center',
va='bottom' if y_offset > 0 else 'top',
fontsize=8,
color=color,
arrowprops=dict(arrowstyle='->', color=color, lw=0.6, alpha=0.6)
)
annotated += 1
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
logger.info(f"图表已保存到: {save_path}")
plt.show()
def plot_trades_interactive(all_data: List[Dict], trades: List[Dict], html_path: str = None):
"""
交互式K线图TradingView风格支持缩放、时间区间/价格区间平移缩放)
"""
if not all_data:
logger.warning("没有数据可绘制")
return
if go is None:
logger.warning("未安装 plotly无法绘制交互式图。请先安装pip install plotly")
return
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.sort_values('datetime', inplace=True)
fig = go.Figure(
data=[
go.Candlestick(
x=df['datetime'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
increasing_line_color='#26a69a',
decreasing_line_color='#ef5350',
name='K线'
)
]
)
# 标注开仓点
if trades:
entry_x = []
entry_y = []
entry_text = []
entry_color = []
for t in trades:
entry_x.append(pd.to_datetime(t['entry_time_ms'], unit='ms'))
entry_y.append(t['entry'])
entry_text.append(f"{t['direction']} @ {t['entry']:.2f}<br>{t['entry_time']}")
entry_color.append('#26a69a' if t['direction'] == '做多' else '#ef5350')
fig.add_trace(
go.Scatter(
x=entry_x,
y=entry_y,
mode='markers',
marker=dict(size=8, color=entry_color),
name='开仓',
text=entry_text,
hoverinfo='text'
)
)
# TradingView风格深色背景 + 交互缩放
fig.update_layout(
title='三分之一回归策略回测(交互式)',
xaxis=dict(
rangeslider=dict(visible=True),
type='date',
showgrid=False
),
yaxis=dict(
showgrid=False,
fixedrange=False
),
plot_bgcolor='#0b0e11',
paper_bgcolor='#0b0e11',
font=dict(color='#d1d4dc'),
hovermode='x unified',
dragmode='zoom'
)
fig.show()
if html_path:
fig.write_html(html_path)
logger.info(f"交互图已保存到: {html_path}")
# ========================= 主程序 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
# ==================== 生成日期列表 ====================
dates = []
if START_DATE and END_DATE:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"查询日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
# ==================== 执行回测 ====================
trades, stats, all_data = backtest_one_third_strategy(dates)
# ==================== 输出结果 ====================
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = t['diff']
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
print(f"\n{'='*60}")
print(f"【三分之一回归策略 回测结果】")
print(f"{'='*60}")
print(f"交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利:{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{v['name']}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
# ==================== 绘制图表 ====================
if trades and all_data:
# 如果数据太多只绘制最近一部分比如最近500根K线
max_bars = 500
if len(all_data) > max_bars:
logger.info(f"数据量较大({len(all_data)}条),只绘制最近 {max_bars} 根K线")
plot_data = all_data[-max_bars:]
# 过滤出在这个时间范围内的交易
min_time = datetime.datetime.fromtimestamp(plot_data[0]['id'] / 1000)
plot_trades_filtered = [t for t in trades if t['entry_time'] >= min_time]
else:
plot_data = all_data
plot_trades_filtered = trades
save_path = os.path.join(SCRIPT_DIR, '回测图表.png')
plot_trades(plot_data, plot_trades_filtered, save_path=save_path)
# 交互式版本TradingView风格支持时间区间/价格缩放
html_path = os.path.join(SCRIPT_DIR, '回测图表_交互式.html')
plot_trades_interactive(plot_data, plot_trades_filtered, html_path=html_path)

BIN
bitmart/回测图表.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 149 KiB

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,279 @@
"""
BitMart 多周期K线数据抓取脚本
支持同时获取 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据
自动创建对应的数据库表
"""
import time
import datetime
from pathlib import Path
from loguru import logger
from peewee import *
from bitmart.api_contract import APIContract
# 数据库配置
DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# K线周期配置step值 -> 表名后缀
KLINE_CONFIGS = {
1: '1m', # 1分钟
3: '3m', # 3分钟
5: '5m', # 5分钟
15: '15m', # 15分钟
30: '30m', # 30分钟
60: '1h', # 1小时
}
def create_kline_model(step: int):
"""
动态创建K线数据模型
:param step: K线周期分钟
:return: Model类
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
tbl_name = f'bitmart_eth_{suffix}'
# 使用 type() 动态创建类,避免闭包问题
attrs = {
'id': BigIntegerField(primary_key=True),
'open': FloatField(null=True),
'high': FloatField(null=True),
'low': FloatField(null=True),
'close': FloatField(null=True),
}
# 创建 Meta 类
meta_attrs = {
'database': db,
'table_name': tbl_name,
}
Meta = type('Meta', (), meta_attrs)
attrs['Meta'] = Meta
# 动态创建 Model 类
model_name = f'BitMartETH{suffix.upper()}'
KlineModel = type(model_name, (Model,), attrs)
return KlineModel
class BitMartMultiKlineCollector:
"""多周期K线数据抓取器"""
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "数据抓取"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 存储各周期的模型
self.models = {}
# 初始化数据库连接和表
self._init_database()
def _init_database(self):
"""初始化数据库,创建所有周期的表"""
db.connect(reuse_if_open=True)
for step in KLINE_CONFIGS.keys():
model = create_kline_model(step)
self.models[step] = model
# 创建表(如果不存在)
db.create_tables([model], safe=True)
logger.info(f"初始化表: {model._meta.table_name}")
def get_klines(self, step: int, start_time: int, end_time: int):
"""
获取K线数据
:param step: K线周期分钟
:param start_time: 开始时间戳(秒级)
:param end_time: 结束时间戳(秒级)
:return: K线数据列表
"""
try:
# 确保是整数
start_time = int(start_time)
end_time = int(end_time)
logger.debug(f"API请求: step={step}, start={start_time}, end={end_time}")
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=step,
start_time=start_time,
end_time=end_time
)[0]
if response['code'] != 1000:
logger.error(f"获取 {step}分钟 K线失败: {response}")
return []
klines = response.get('data', [])
formatted = []
for k in klines:
timestamp_ms = int(k["timestamp"]) * 1000
formatted.append({
'id': timestamp_ms,
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取 {step}分钟 K线异常: {e}")
return []
def save_klines(self, step: int, klines: list):
"""
保存K线数据到数据库
:param step: K线周期
:param klines: K线数据列表
:return: 保存的数量
"""
model = self.models.get(step)
if not model:
logger.error(f"未找到 {step}分钟 的数据模型")
return 0
saved_count = 0
for kline in klines:
try:
model.get_or_create(
id=kline['id'],
defaults={
'open': kline['open'],
'high': kline['high'],
'low': kline['low'],
'close': kline['close'],
}
)
saved_count += 1
except Exception as e:
logger.error(f"保存 {step}分钟 K线数据失败 {kline['id']}: {e}")
return saved_count
def collect_single_period(self, step: int, start_date: str = None, days: int = None):
"""
抓取单个周期的历史数据从当前时间向前抓取直到遇到API限制
:param step: K线周期分钟
:param start_date: 起始日期 'YYYY-MM-DD'(目标,可能无法达到)
:param days: 抓取天数(目标,可能无法达到)
"""
suffix = KLINE_CONFIGS.get(step, f'{step}m')
now = int(time.time())
if start_date:
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
target_start_time = int(start_dt.timestamp())
logger.info(f"开始抓取 {suffix} K线数据: 目标从 {start_date} 开始(从现在向前抓取)")
elif days:
target_start_time = now - 3600 * 24 * days
logger.info(f"开始抓取 {suffix} K线数据: 目标最近 {days}")
else:
target_start_time = now - 3600 * 24 * 30
logger.info(f"开始抓取 {suffix} K线数据: 目标最近 30 天")
# 根据周期调整批次大小
if step <= 5:
batch_seconds = 3600 * 6 # 小周期每次6小时
elif step <= 30:
batch_seconds = 3600 * 24 # 中周期每次1天
else:
batch_seconds = 3600 * 24 * 3 # 大周期每次3天
total_saved = 0
fail_count = 0
max_fail = 3
# 从当前时间向前抓取
current_end = now
while current_end > target_start_time:
current_start = current_end - batch_seconds
# 打印时间范围
start_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))
end_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))
logger.info(f"[{suffix}] 抓取: {start_str} -> {end_str}")
klines = self.get_klines(step, current_start, current_end)
if klines:
saved = self.save_klines(step, klines)
total_saved += saved
logger.info(f"[{suffix}] 保存 {saved} 条,累计 {total_saved}")
fail_count = 0
else:
fail_count += 1
logger.warning(f"[{suffix}] 未获取到数据 (连续失败 {fail_count} 次)")
if fail_count >= max_fail:
earliest = time.strftime('%Y-%m-%d', time.localtime(current_end))
logger.warning(f"[{suffix}] 达到API历史数据限制最早可用数据约 {earliest}")
break
current_end = current_start
time.sleep(0.3) # API请求间隔
logger.success(f"[{suffix}] 抓取完成,共保存 {total_saved} 条数据")
return total_saved
def collect_all_periods(self, start_date: str = None, days: int = None,
periods: list = None):
"""
抓取所有周期的历史数据
:param start_date: 起始日期 'YYYY-MM-DD'
:param days: 抓取天数
:param periods: 要抓取的周期列表,如 [1, 5, 15],默认全部
"""
if periods is None:
periods = list(KLINE_CONFIGS.keys())
logger.info(f"开始抓取多周期K线数据周期: {[KLINE_CONFIGS[p] for p in periods]}")
results = {}
for step in periods:
if step not in KLINE_CONFIGS:
logger.warning(f"不支持的周期: {step}分钟,跳过")
continue
logger.info(f"\n{'='*50}")
logger.info(f"开始抓取 {KLINE_CONFIGS[step]} K线")
logger.info(f"{'='*50}")
saved = self.collect_single_period(step, start_date, days)
results[KLINE_CONFIGS[step]] = saved
time.sleep(1) # 不同周期之间间隔
logger.info(f"\n{'='*50}")
logger.info("所有周期抓取完成!统计:")
for period, count in results.items():
logger.info(f" {period}: {count}")
logger.info(f"{'='*50}")
return results
def close(self):
"""关闭数据库连接"""
if not db.is_closed():
db.close()
if __name__ == '__main__':
collector = BitMartMultiKlineCollector()
try:
# 抓取尽可能多的历史数据从现在向前直到遇到API限制自动停止
# 目标2025-01-01但实际能抓取多少取决于 BitMart API 的历史数据限制
collector.collect_all_periods(
start_date='2025-01-01', # 目标起始日期
periods=[1, 3, 5, 15, 30, 60] # 所有周期
)
finally:
collector.close()

97
models/bitmart_klines.py Normal file
View File

@@ -0,0 +1,97 @@
"""
BitMart 多周期K线数据模型
包含 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据表
"""
from peewee import *
from models import db
# ==================== 1分钟 K线 ====================
class BitMartETH1M(Model):
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
# ==================== 3分钟 K线 ====================
class BitMartETH3M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
# ==================== 5分钟 K线 ====================
class BitMartETH5M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
# ==================== 15分钟 K线 ====================
class BitMartETH15M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_15m'
# ==================== 30分钟 K线 ====================
class BitMartETH30M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_30m'
# ==================== 1小时 K线 ====================
class BitMartETH1H(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1h'
# 连接数据库并创建表
db.connect(reuse_if_open=True)
db.create_tables([
BitMartETH1M,
BitMartETH3M,
BitMartETH5M,
BitMartETH15M,
BitMartETH30M,
BitMartETH1H,
], safe=True)

View File

@@ -0,0 +1,557 @@
"""
BitMart 三分之一回归策略交易
使用5分钟K线周期
策略规则:
1. 开多条件:
- 找到实体>=0.1的前一根K线如果前一根实体<0.1,继续往前找)
- 前一根是阴线close < open
- 当前K线的最高价包括影线涨到前一根阴线实体的 1/3 处
- 即当前high >= prev_close + (prev_open - prev_close) / 3
2. 平多/开空条件:
- 找到实体>=0.1的前一根K线
- 前一根是阳线close > open
- 当前K线的最低价包括影线跌到前一根阳线实体的 1/3 处
- 即当前low <= prev_close - (prev_close - prev_open) / 3
3. 执行逻辑:
- 做多时遇到开空信号 -> 平多并反手开空
- 做空时遇到开多信号 -> 平空并反手开多
"""
import time
import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
class BitmartOneThirdStrategy:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
# 三分之一策略参数
self.min_body_size = 0.1 # 最小实体大小
self.kline_step = 5 # K线周期5分钟
self.kline_count = 20 # 获取的K线数量用于向前查找有效K线
# ========================= 三分之一策略核心函数 =========================
def is_bullish(self, c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(self, candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = self.get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_level(self, prev):
"""
计算前一根K线实体的 1/3 回归位置
返回:(触发价格, 方向)
- 如果前一根是阴线返回向上1/3价格方向为 'long'
- 如果前一根是阳线返回向下1/3价格方向为 'short'
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
if self.is_bearish(prev): # 阴线,向上回归
# 阴线实体 = open - close
body = p_open - p_close
trigger_price = p_close + body / 3 # 从低点涨 1/3
return trigger_price, 'long'
elif self.is_bullish(prev): # 阳线,向下回归
# 阳线实体 = close - open
body = p_close - p_open
trigger_price = p_close - body / 3 # 从高点跌 1/3
return trigger_price, 'short'
return None, None
def check_trigger(self, all_data, current_idx):
"""
检查当前K线是否触发了交易信号
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则考虑影线部分high/low因为实际交易中价格会到达影线位置
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size)
if prev is None:
return None, None, None
trigger_price, direction = self.get_one_third_level(prev)
if trigger_price is None:
return None, None, None
# 使用影线部分high/low来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 做多前一根阴线当前K线的最高价包括影线达到触发价格
if direction == 'long' and c_high >= trigger_price:
return 'long', trigger_price, valid_prev_idx
# 做空前一根阳线当前K线的最低价包括影线达到触发价格
if direction == 'short' and c_low <= trigger_price:
return 'short', trigger_price, valid_prev_idx
return None, None, None
# ========================= BitMart API 函数 =========================
def get_klines(self):
"""获取最近N根5分钟K线"""
try:
end_time = int(time.time())
# 获取足够多的K线用于向前查找有效K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
error_msg = str(e)
# 检查是否是429限流错误
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器自动化函数 =========================
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def close_extra_tabs_in_browser(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""市价平仓"""
logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
"""
市价开单
marketPriceLongOrder: 1 做多, -1 做空
"""
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}")
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌三分之一策略:" if error else "🔔三分之一策略:"
if error:
logger.error(msg)
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}")
# ========================= 时间计算函数 =========================
def get_now_time(self):
"""获取当前5分钟整点时间戳"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的5分钟整点
minute = current_datetime.minute
target_minute = (minute // 5) * 5 # 向下取整到5分钟
target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0)
return int(target_datetime.timestamp())
def get_time_to_next_5min(self):
"""获取距离下一个5分钟的秒数"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
minute = current_datetime.minute
next_5min = ((minute // 5) + 1) * 5
if next_5min >= 60:
next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)
else:
next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0)
return (next_datetime - current_datetime).total_seconds()
# ========================= 主运行函数 =========================
def action(self):
"""主运行逻辑"""
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
self.time_start = None # 时间状态,避免同一时段重复处理
while True:
# 更新进度条
current_time = time.localtime()
current_minute = current_time.tm_min
self.pbar.n = current_minute % 5
self.pbar.refresh()
# 检查是否已处理过当前时间段
if self.time_start == self.get_now_time():
time.sleep(3)
continue
# 获取K线数据
kline_data = self.get_klines()
if not kline_data:
logger.warning("获取K线数据失败")
time.sleep(5)
continue
# 检查数据是否是最新的
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(5)
continue
# 判断最新K线时间
latest_kline_time = kline_data[-1]['id']
if self.get_now_time() != latest_kline_time:
time.sleep(3)
continue
self.time_start = self.get_now_time()
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败")
self.ding(msg="获取仓位信息失败!", error=True)
continue
logger.info(f"当前持仓状态: {self.start} (1=多, -1=空, 0=无)")
# ========== 三分之一策略信号检测 ==========
current_idx = len(kline_data) - 1
direction, trigger_price, valid_prev_idx = self.check_trigger(kline_data, current_idx)
if direction:
# 获取有效前一根K线用于日志
valid_prev = kline_data[valid_prev_idx] if valid_prev_idx is not None else None
curr = kline_data[current_idx]
if valid_prev:
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
logger.info(f"检测到{direction}信号,触发价格: {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f}")
logger.info(f" 当前根[{curr_time}]: H={curr['high']:.2f} L={curr['low']:.2f}")
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance()
if balance is None:
balance = 0
trade_size = balance * self.risk_percent
if direction == "long":
if self.start == -1: # 当前空仓,平空开多
logger.info("平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
elif self.start == 0: # 当前无仓,直接开多
logger.info("无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
# 已有多仓则不操作
elif direction == "short":
if self.start == 1: # 当前多仓,平多开空
logger.info("平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
elif self.start == 0: # 当前无仓,直接开空
logger.info("无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
# 已有空仓则不操作
# ========== 发送持仓信息 ==========
self._send_position_message(kline_data[-1])
self.pbar.reset()
def _send_position_message(self, latest_kline):
"""发送持仓信息到钉钉"""
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if hasattr(self, 'position_cross') and self.position_cross else 0.0
# 计算浮动盈亏
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
# 计算收益率
if open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eth){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
self.ding(msg=msg)
if __name__ == '__main__':
# 启动三分之一策略交易
BitmartOneThirdStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()