This commit is contained in:
27942
2026-01-28 10:23:55 +08:00
parent c472a0c0c9
commit c855ee7006
4 changed files with 4641 additions and 0 deletions

View File

@@ -0,0 +1,656 @@
"""
量化交易回测系统 - 三分之一回归策略(优化版)
========== 策略规则 ==========
1. 开多条件:
- 找到实体>=0.1的前一根K线如果前一根实体<0.1,继续往前找)
- 前一根是阴线close < open
- 当前K线的最高价包括影线涨到前一根阴线实体的 1/3 处
- 即当前high >= prev_close + (prev_open - prev_close) / 3
2. 平多/开空条件:
- 找到实体>=0.1的前一根K线如果前一根实体<0.1,继续往前找)
- 前一根是阳线close > open
- 当前K线的最低价包括影线跌到前一根阳线实体的 1/3 处
- 即当前low <= prev_close - (prev_close - prev_open) / 3
3. 执行逻辑:
- 做多时遇到开空信号 -> 平多并反手开空
- 做空时遇到开多信号 -> 平空并反手开多
4. 实体过滤:
- 如果前一根K线的实体部分|open - close|< 0.1,继续往前查找
- 直到找到实体>=0.1的K线再用那根K线来判断1/3位置
"""
import datetime
import calendar
import os
from typing import List, Dict, Optional
from loguru import logger
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
import matplotlib
try:
import plotly.graph_objects as go
except Exception:
go = None
from models.bitmart_klines import BitMartETH5M
# 配置中文字体
import matplotlib.font_manager as fm
import warnings
# 忽略matplotlib的字体警告
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib.font_manager')
warnings.filterwarnings('ignore', message='.*Glyph.*missing.*', category=UserWarning)
# 尝试设置中文字体,按优先级尝试
chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi', 'FangSong', 'STSong', 'STHeiti']
available_fonts = [f.name for f in fm.fontManager.ttflist]
# 找到第一个可用的中文字体
font_found = None
for font_name in chinese_fonts:
if font_name in available_fonts:
font_found = font_name
break
if font_found:
plt.rcParams['font.sans-serif'] = [font_found] + ['DejaVu Sans']
logger.info(f"使用中文字体: {font_found}")
else:
# 如果没有找到中文字体,尝试使用系统默认字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS', 'DejaVu Sans']
logger.warning("未找到中文字体,使用默认配置")
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
plt.rcParams['font.size'] = 10 # 设置默认字体大小
# 尝试清除字体缓存(如果可能)
try:
# 不强制重建,避免性能问题
pass
except:
pass
# 获取当前脚本所在目录
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# ========================= 工具函数 =========================
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_level(prev):
"""
计算前一根K线实体的 1/3 回归位置
返回:(触发价格, 方向)
- 如果前一根是阴线返回向上1/3价格方向为 'long'
- 如果前一根是阳线返回向下1/3价格方向为 'short'
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
if is_bearish(prev): # 阴线,向上回归
# 阴线实体 = open - close
body = p_open - p_close
trigger_price = p_close + body / 3 # 从低点涨 1/3
return trigger_price, 'long'
elif is_bullish(prev): # 阳线,向下回归
# 阳线实体 = close - open
body = p_close - p_open
trigger_price = p_close - body / 3 # 从高点跌 1/3
return trigger_price, 'short'
return None, None
def check_trigger(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
检查当前K线是否触发了交易信号
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则考虑影线部分high/low因为实际交易中价格会到达影线位置
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=0.1的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data, current_idx, min_body_size)
if prev is None:
return None, None, None
trigger_price, direction = get_one_third_level(prev)
if trigger_price is None:
return None, None, None
# 使用影线部分high/low来判断因为实际交易中价格会到达这些位置
c_high = float(curr['high'])
c_low = float(curr['low'])
# 做多前一根阴线当前K线的最高价包括影线达到触发价格
if direction == 'long' and c_high >= trigger_price:
return 'long', trigger_price, valid_prev_idx
# 做空前一根阳线当前K线的最低价包括影线达到触发价格
if direction == 'short' and c_low <= trigger_price:
return 'short', trigger_price, valid_prev_idx
return None, None, None
def get_data_by_date(model, date_str: str):
"""按天获取指定表的数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(dates: List[str]):
"""三分之一回归策略回测"""
all_data: List[Dict] = []
for d in dates:
day_data = get_data_by_date(BitMartETH5M, d)
all_data.extend(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {len(all_data)} 条K线数据")
if not all_data:
logger.warning("未获取到任何数据")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_time = datetime.datetime.fromtimestamp(all_data[0]['id'] / 1000)
last_time = datetime.datetime.fromtimestamp(all_data[-1]['id'] / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M')}{last_time.strftime('%Y-%m-%d %H:%M')}")
# 验证排序打印前5条数据
logger.info("===== 前5条数据验证排序=====")
for i in range(min(5, len(all_data))):
d = all_data[i]
t = datetime.datetime.fromtimestamp(d['id'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
k_type = "阳线" if is_bullish(d) else ("阴线" if is_bearish(d) else "十字星")
logger.info(f" [{i}] {t} | {k_type} | O={d['open']:.2f} H={d['high']:.2f} L={d['low']:.2f} C={d['close']:.2f}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
# 使用新的check_trigger函数它会自动查找实体>=0.1的前一根K线
direction, trigger_price, valid_prev_idx = check_trigger(all_data, idx, min_body_size=0.1)
# 获取有效的前一根K线用于日志输出
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
# 空仓时,有信号就开仓
if current_position is None:
if direction and valid_prev is not None:
# 调试打印开仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【开仓】{direction} @ {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
continue
# 有仓位时,检查是否有反向信号
pos_dir = current_position['direction']
if direction and direction != pos_dir and valid_prev is not None:
# 平仓
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
# 调试打印平仓时的K线信息
prev_time = datetime.datetime.fromtimestamp(valid_prev['id'] / 1000).strftime('%Y-%m-%d %H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
prev_type = "阳线" if is_bullish(valid_prev) else ("阴线" if is_bearish(valid_prev) else "十字星")
curr_type = "阳线" if is_bullish(curr) else ("阴线" if is_bearish(curr) else "十字星")
prev_body = get_body_size(valid_prev)
logger.info(f"【平仓反手】{pos_dir} -> {direction} @ {exit_price:.2f}, 盈亏: {diff:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} H={valid_prev['high']:.2f} L={valid_prev['low']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前根[{curr_time}]: {curr_type} O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f}")
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx
}
stats[direction]['count'] += 1
# 尾仓处理
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
return trades, stats, all_data
# ========================= 绘图函数 =========================
def plot_trades(all_data: List[Dict], trades: List[Dict], save_path: str = None):
"""
绘制K线图并标注交易点位
"""
if not all_data:
logger.warning("没有数据可绘制")
return
# 转换为 DataFrame
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.set_index('datetime', inplace=True)
df = df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'})
df = df[['Open', 'High', 'Low', 'Close']]
# 准备标记点
buy_signals = [] # 做多开仓
sell_signals = [] # 做空开仓
buy_exits = [] # 做多平仓
sell_exits = [] # 做空平仓
for trade in trades:
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
exit_time = pd.to_datetime(trade['exit_time_ms'], unit='ms')
direction = trade['direction']
entry_price = trade['entry']
exit_price = trade['exit']
if direction == '做多':
buy_signals.append((entry_time, entry_price))
buy_exits.append((exit_time, exit_price))
else:
sell_signals.append((entry_time, entry_price))
sell_exits.append((exit_time, exit_price))
# 创建标记序列
buy_markers = pd.Series(index=df.index, dtype=float)
sell_markers = pd.Series(index=df.index, dtype=float)
buy_exit_markers = pd.Series(index=df.index, dtype=float)
sell_exit_markers = pd.Series(index=df.index, dtype=float)
for t, p in buy_signals:
if t in buy_markers.index:
buy_markers[t] = p
for t, p in sell_signals:
if t in sell_markers.index:
sell_markers[t] = p
for t, p in buy_exits:
if t in buy_exit_markers.index:
buy_exit_markers[t] = p
for t, p in sell_exits:
if t in sell_exit_markers.index:
sell_exit_markers[t] = p
# 添加标记
add_plots = []
if buy_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_markers, type='scatter', markersize=100,
marker='^', color='green', label='做多开仓'))
if sell_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_markers, type='scatter', markersize=100,
marker='v', color='red', label='做空开仓'))
if buy_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(buy_exit_markers, type='scatter', markersize=80,
marker='x', color='darkgreen', label='做多平仓'))
if sell_exit_markers.notna().any():
add_plots.append(mpf.make_addplot(sell_exit_markers, type='scatter', markersize=80,
marker='x', color='darkred', label='做空平仓'))
# 绘制K线图更接近交易所风格
market_colors = mpf.make_marketcolors(
up='#26a69a', # 常见交易所绿色
down='#ef5350', # 常见交易所红色
edge='inherit',
wick='inherit',
volume='inherit'
)
style = mpf.make_mpf_style(
base_mpf_style='binance',
marketcolors=market_colors,
gridstyle='-',
gridcolor='#e6e6e6'
)
fig, axes = mpf.plot(
df,
type='candle',
style=style,
title='三分之一回归策略回测',
ylabel='价格',
addplot=add_plots if add_plots else None,
figsize=(16, 9),
returnfig=True
)
# 添加图例
axes[0].legend(['做多开仓 ▲', '做空开仓 ▼', '做多平仓 ✕', '做空平仓 ✕'], loc='upper left')
# 标注开仓细节(方向、价格、时间)
if trades:
ax = axes[0]
max_annotate = 60 # 过多会拥挤,可按需调大/调小
annotated = 0
for i, trade in enumerate(trades):
if annotated >= max_annotate:
break
entry_time = pd.to_datetime(trade['entry_time_ms'], unit='ms')
if entry_time not in df.index:
continue
entry_price = trade['entry']
direction = trade['direction']
color = 'green' if direction == '做多' else 'red'
text = f"{direction} @ {entry_price:.2f}\n{entry_time.strftime('%m-%d %H:%M')}"
y_offset = 20 if (i % 2 == 0) else -30
ax.annotate(
text,
xy=(entry_time, entry_price),
xytext=(0, y_offset),
textcoords='offset points',
ha='center',
va='bottom' if y_offset > 0 else 'top',
fontsize=8,
color=color,
arrowprops=dict(arrowstyle='->', color=color, lw=0.6, alpha=0.6)
)
annotated += 1
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
logger.info(f"图表已保存到: {save_path}")
plt.show()
def plot_trades_interactive(all_data: List[Dict], trades: List[Dict], html_path: str = None):
"""
交互式K线图TradingView风格支持缩放、时间区间/价格区间平移缩放)
"""
if not all_data:
logger.warning("没有数据可绘制")
return
if go is None:
logger.warning("未安装 plotly无法绘制交互式图。请先安装pip install plotly")
return
df = pd.DataFrame(all_data)
df['datetime'] = pd.to_datetime(df['id'], unit='ms')
df.sort_values('datetime', inplace=True)
fig = go.Figure(
data=[
go.Candlestick(
x=df['datetime'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
increasing_line_color='#26a69a',
decreasing_line_color='#ef5350',
name='K线'
)
]
)
# 标注开仓点
if trades:
entry_x = []
entry_y = []
entry_text = []
entry_color = []
for t in trades:
entry_x.append(pd.to_datetime(t['entry_time_ms'], unit='ms'))
entry_y.append(t['entry'])
entry_text.append(f"{t['direction']} @ {t['entry']:.2f}<br>{t['entry_time']}")
entry_color.append('#26a69a' if t['direction'] == '做多' else '#ef5350')
fig.add_trace(
go.Scatter(
x=entry_x,
y=entry_y,
mode='markers',
marker=dict(size=8, color=entry_color),
name='开仓',
text=entry_text,
hoverinfo='text'
)
)
# TradingView风格深色背景 + 交互缩放
fig.update_layout(
title='三分之一回归策略回测(交互式)',
xaxis=dict(
rangeslider=dict(visible=True),
type='date',
showgrid=False
),
yaxis=dict(
showgrid=False,
fixedrange=False
),
plot_bgcolor='#0b0e11',
paper_bgcolor='#0b0e11',
font=dict(color='#d1d4dc'),
hovermode='x unified',
dragmode='zoom'
)
fig.show()
if html_path:
fig.write_html(html_path)
logger.info(f"交互图已保存到: {html_path}")
# ========================= 主程序 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
# ==================== 生成日期列表 ====================
dates = []
if START_DATE and END_DATE:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"查询日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
# ==================== 执行回测 ====================
trades, stats, all_data = backtest_one_third_strategy(dates)
# ==================== 输出结果 ====================
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = t['diff']
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
print(f"\n{'='*60}")
print(f"【三分之一回归策略 回测结果】")
print(f"{'='*60}")
print(f"交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利:{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{v['name']}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
# ==================== 绘制图表 ====================
if trades and all_data:
# 如果数据太多只绘制最近一部分比如最近500根K线
max_bars = 500
if len(all_data) > max_bars:
logger.info(f"数据量较大({len(all_data)}条),只绘制最近 {max_bars} 根K线")
plot_data = all_data[-max_bars:]
# 过滤出在这个时间范围内的交易
min_time = datetime.datetime.fromtimestamp(plot_data[0]['id'] / 1000)
plot_trades_filtered = [t for t in trades if t['entry_time'] >= min_time]
else:
plot_data = all_data
plot_trades_filtered = trades
save_path = os.path.join(SCRIPT_DIR, '回测图表.png')
plot_trades(plot_data, plot_trades_filtered, save_path=save_path)
# 交互式版本TradingView风格支持时间区间/价格缩放
html_path = os.path.join(SCRIPT_DIR, '回测图表_交互式.html')
plot_trades_interactive(plot_data, plot_trades_filtered, html_path=html_path)

BIN
bitmart/回测图表.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 134 KiB

File diff suppressed because one or more lines are too long

97
models/bitmart_klines.py Normal file
View File

@@ -0,0 +1,97 @@
"""
BitMart 多周期K线数据模型
包含 1分钟、3分钟、5分钟、15分钟、30分钟、1小时 K线数据表
"""
from peewee import *
from models import db
# ==================== 1分钟 K线 ====================
class BitMartETH1M(Model):
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
# ==================== 3分钟 K线 ====================
class BitMartETH3M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
# ==================== 5分钟 K线 ====================
class BitMartETH5M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
# ==================== 15分钟 K线 ====================
class BitMartETH15M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_15m'
# ==================== 30分钟 K线 ====================
class BitMartETH30M(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_30m'
# ==================== 1小时 K线 ====================
class BitMartETH1H(Model):
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1h'
# 连接数据库并创建表
db.connect(reuse_if_open=True)
db.create_tables([
BitMartETH1M,
BitMartETH3M,
BitMartETH5M,
BitMartETH15M,
BitMartETH30M,
BitMartETH1H,
], safe=True)