diff --git a/bitmart/抓取数据_30分钟.py b/bitmart/抓取数据_30分钟.py new file mode 100644 index 0000000..3101693 --- /dev/null +++ b/bitmart/抓取数据_30分钟.py @@ -0,0 +1,150 @@ +""" +BitMart 30分钟K线数据抓取脚本 +从 BitMart API 获取30分钟K线数据并存储到数据库 +""" + +import time +from loguru import logger +from bitmart.api_contract import APIContract +from models.bitmart import BitMart30 + + +class BitMartDataCollector: + def __init__(self): + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "数据抓取" + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + def get_klines(self, start_time=None, end_time=None, limit=200): + """ + 获取K线数据 + :param start_time: 开始时间戳(秒级) + :param end_time: 结束时间戳(秒级) + :param limit: 获取数量限制 + :return: K线数据列表 + """ + try: + if not end_time: + end_time = int(time.time()) + if not start_time: + start_time = end_time - 3600 * 24 * 7 # 默认获取最近7天 + + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=30, # 30分钟 + start_time=start_time, + end_time=end_time + )[0] + + if response['code'] != 1000: + logger.error(f"获取K线失败: {response}") + return [] + + klines = response.get('data', []) + formatted = [] + for k in klines: + # BitMart API 返回的时间戳是秒级,需要转换为毫秒级 + # 根据 bitmart/框架.py 中的使用方式,API返回的是秒级时间戳 + timestamp_ms = int(k["timestamp"]) * 1000 + + formatted.append({ + 'id': timestamp_ms, + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + + # 按时间戳排序 + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + logger.error(f"获取K线异常: {e}") + return [] + + def save_klines(self, klines): + """ + 保存K线数据到数据库 + :param klines: K线数据列表 + :return: 保存的数量 + """ + saved_count = 0 + for kline in klines: + try: + BitMart30.get_or_create( + id=kline['id'], + defaults={ + 'open': kline['open'], + 'high': kline['high'], + 'low': kline['low'], + 'close': kline['close'], + } + ) + saved_count += 1 + except Exception as e: + logger.error(f"保存K线数据失败 {kline['id']}: {e}") + + return saved_count + + def collect_historical_data(self, days=30): + """ + 抓取历史数据 + :param days: 抓取最近多少天的数据 + """ + logger.info(f"开始抓取 BitMart {self.contract_symbol} 最近 {days} 天的30分钟K线数据") + + end_time = int(time.time()) + start_time = end_time - 3600 * 24 * days + + # 分批获取,每次获取7天的数据 + batch_days = 7 + total_saved = 0 + + current_start = start_time + while current_start < end_time: + current_end = min(current_start + 3600 * 24 * batch_days, end_time) + + logger.info(f"抓取时间段: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_start))} " + f"到 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_end))}") + + klines = self.get_klines(start_time=current_start, end_time=current_end) + if klines: + saved = self.save_klines(klines) + total_saved += saved + logger.info(f"本批次保存 {saved} 条数据,累计 {total_saved} 条") + else: + logger.warning("本批次未获取到数据") + + current_start = current_end + time.sleep(1) # 避免请求过快 + + logger.success(f"数据抓取完成,共保存 {total_saved} 条K线数据") + + def collect_realtime_data(self): + """ + 实时抓取最新数据(用于定时任务) + """ + logger.info("开始抓取 BitMart 最新30分钟K线数据") + + # 获取最近1小时的数据(确保能获取到最新的K线) + end_time = int(time.time()) + start_time = end_time - 3600 * 2 # 最近2小时 + + klines = self.get_klines(start_time=start_time, end_time=end_time) + if klines: + saved = self.save_klines(klines) + logger.success(f"保存 {saved} 条最新K线数据") + else: + logger.warning("未获取到最新数据") + + +if __name__ == '__main__': + collector = BitMartDataCollector() + + # 抓取最近30天的历史数据 + collector.collect_historical_data(days=30) + + # 如果需要实时抓取,可以取消下面的注释 + # collector.collect_realtime_data() diff --git a/models/bitmart.py b/models/bitmart.py new file mode 100644 index 0000000..106fb7a --- /dev/null +++ b/models/bitmart.py @@ -0,0 +1,21 @@ +from peewee import * + +from models import db + + +class BitMart30(Model): + id = IntegerField(primary_key=True) # 时间戳(毫秒级) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'bitmart_30' + + +# 连接到数据库 +db.connect() +# 创建表(如果表不存在) +db.create_tables([BitMart30]) diff --git a/models/database.db b/models/database.db index e28aa4b..787d267 100644 Binary files a/models/database.db and b/models/database.db differ diff --git a/weex/自动化抓取数据.py b/weex/自动化抓取数据.py index 4f129fc..1bf8e70 100644 --- a/weex/自动化抓取数据.py +++ b/weex/自动化抓取数据.py @@ -6,26 +6,24 @@ from models.weex import Weex1, Weex1Hour, Weex15, Weex30 if __name__ == '__main__': - # bit_port = openBrowser(id="8dcb4f744cf64ab190e465e153088515") + bit_port = openBrowser(id="f2320f57e24c45529a009e1541e25961") - response = requests.post( - f"http://127.0.0.1:50326/api/browser/start", - json={"envId": 146473}, - headers={ - "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", - "Content-Type": "application/json" - } - ) + # response = requests.post( + # f"http://127.0.0.1:50326/api/browser/start", + # json={"envId": 146473}, + # headers={ + # "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", + # "Content-Type": "application/json" + # } + # ) co = ChromiumOptions() - co.set_local_port(port=response.json()["data"]["port"]) - + co.set_local_port(port=bit_port) page = ChromiumPage(addr_or_opts=co) page.set.window.max() - page.listen.start("public/quote/v1/getKlineV2") page.listen.start("public/quote/v1/getKlineV2") page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") diff --git a/weex/长期持有信号/database.db b/weex/长期持有信号/database.db deleted file mode 100644 index df6f204..0000000 Binary files a/weex/长期持有信号/database.db and /dev/null differ diff --git a/weex/长期持有信号/读取数据库数据-30分钟版.py b/weex/长期持有信号/读取数据库数据-30分钟版.py index 5d16fcb..74be8e0 100644 --- a/weex/长期持有信号/读取数据库数据-30分钟版.py +++ b/weex/长期持有信号/读取数据库数据-30分钟版.py @@ -1,5 +1,23 @@ """ -量化交易回测系统 - 仅15分钟K线 & 信号续持/反手/单根反色平仓逻辑(完整版) +量化交易回测系统 - 30分钟K线策略回测(Weex数据源) + +========== 策略规则 ========== +重要:所有开仓和平仓操作都在下一根K线的开盘价执行 + +1. 开仓条件(信号出现时,下一根K线开盘价开仓): + - 阳包阴(涨包跌):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价 + -> 下一根K线开盘价开多 + - 阴包阳(跌包涨):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价 + -> 下一根K线开盘价开空 + +2. 平仓条件(所有平仓都在下一根K线开盘价执行): + - 持有多单时:遇到两根连续的阴线 -> 下一根K线开盘价平仓 + - 持有空单时:遇到两根连续的阳线 -> 下一根K线开盘价平仓 + - 遇到反向信号:下一根K线开盘价平仓并反手开仓 + +3. 续持条件: + - 遇到同向信号:续持 + - 未满足平仓条件:续持 """ import datetime @@ -23,9 +41,17 @@ def is_bearish(c): # 阴线 def check_signal(prev, curr): """ 包住形态信号判定(优化版): - 只看两种信号: - - 跌包涨(前涨后跌):前一根是涨(阳线),后一根是跌(阴线),且跌的收盘价 < 涨的开盘价 -> 做空 - - 涨包跌(前跌后涨):前一根是跌(阴线),后一根是涨(阳线),且涨的收盘价 > 跌的开盘价 -> 做多 + 只看两种信号,严格按照收盘价与开盘价的比较: + + 1. 跌包涨(前涨后跌)-> 做空: + - 前一根是涨(阳线:close > open) + - 后一根是跌(阴线:close < open) + - 且:跌的收盘价 < 涨的开盘价(curr['close'] < prev['open']) + + 2. 涨包跌(前跌后涨)-> 做多: + - 前一根是跌(阴线:close < open) + - 后一根是涨(阳线:close > open) + - 且:涨的收盘价 > 跌的开盘价(curr['close'] > prev['open']) """ p_open = float(prev['open']) c_close = float(curr['close']) @@ -42,18 +68,32 @@ def check_signal(prev, curr): def get_data_by_date(model, date_str: str): - """按天获取指定表的数据(15分钟)""" + """ + 按天获取指定表的数据(30分钟K线) + 数据格式:时间戳(毫秒级) 开盘价 最高价 最低价 收盘价 + 例如:1767461400000 3106.68 3109.1 3106.22 3107.22 + + 注意:返回的数据已按时间戳(id)升序排序 + """ try: target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') except ValueError: logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") return [] + # 将日期转换为毫秒级时间戳进行查询 start_ts = int(target_date.timestamp() * 1000) end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + # 查询时按时间戳升序排序 query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) - return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + + # 确保数据已排序 + if data: + data.sort(key=lambda x: x['id']) + + return data # ========================= 回测逻辑 ========================= @@ -77,82 +117,76 @@ def backtest_15m_trend_optimized(dates: List[str]): trades: List[Dict] = [] current_position: Optional[Dict] = None # 开仓信息 + consecutive_opposite_count = 0 # 连续反色K线计数 idx = 1 while idx < len(all_data) - 1: prev, curr, next_bar = all_data[idx - 1], all_data[idx], all_data[idx + 1] direction, signal_key = check_signal(prev, curr) - # 空仓 -> 碰到信号则开仓 - if current_position is None and direction: - entry_price = float(next_bar['open']) - current_position = { - 'direction': direction, - 'signal': stats[signal_key]['name'], - 'signal_key': signal_key, - 'entry_price': entry_price, - 'entry_time': next_bar['id'] - } - stats[signal_key]['count'] += 1 - idx += 1 - continue - - if current_position: - pos_dir = current_position['direction'] - pos_sig_key = current_position['signal_key'] - - # 反向信号 -> 下一根开盘平仓 + 同价反手 - if direction and direction != pos_dir: - exit_price = float(next_bar['open']) - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) - trades.append({ - 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), - 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', - 'entry': current_position['entry_price'], - 'exit': exit_price, - 'diff': diff - }) - stats[pos_sig_key]['total_profit'] += diff - if diff > 0: stats[pos_sig_key]['wins'] += 1 - + # 空仓 -> 碰到信号则开仓(下一根K线开盘价) + if current_position is None: + if direction: + entry_price = float(next_bar['open']) current_position = { 'direction': direction, 'signal': stats[signal_key]['name'], 'signal_key': signal_key, - 'entry_price': exit_price, + 'entry_price': entry_price, 'entry_time': next_bar['id'] } + consecutive_opposite_count = 0 # 重置连续反色计数 stats[signal_key]['count'] += 1 - idx += 1 - continue + idx += 1 + continue - # 同向信号 -> 续持 - if direction and direction == pos_dir: - idx += 1 - continue + # 有仓位状态:检查平仓条件 + pos_dir = current_position['direction'] + pos_sig_key = current_position['signal_key'] - # 单根反色K线 -> 判断后续是否能组成信号 - curr_is_opposite = (pos_dir == 'long' and is_bearish(curr)) or (pos_dir == 'short' and is_bullish(curr)) - if curr_is_opposite: - can_peek = idx + 1 < len(all_data) - if can_peek: - lookahead_dir, _ = check_signal(curr, all_data[idx + 1]) - if lookahead_dir is not None: - idx += 1 - continue # 后续可组成信号,等待信号处理 + # 1. 反向信号 -> 下一根K线开盘价平仓并反手开仓 + if direction and direction != pos_dir: + exit_price = float(next_bar['open']) + diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( + current_position['entry_price'] - exit_price) + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做多' if pos_dir == 'long' else '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 - # 否则按收盘价平仓 - exit_price = float(next_bar['close']) - diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else ( - current_position['entry_price'] - exit_price) + # 反手开仓 + current_position = { + 'direction': direction, + 'signal': stats[signal_key]['name'], + 'signal_key': signal_key, + 'entry_price': exit_price, + 'entry_time': next_bar['id'] + } + consecutive_opposite_count = 0 # 重置连续反色计数 + stats[signal_key]['count'] += 1 + idx += 1 + continue + + # 2. 检查连续反色K线平仓条件(下一根K线开盘价平仓) + # 持有多单:检查是否连续两根阴线 + if pos_dir == 'long' and is_bearish(curr): + consecutive_opposite_count += 1 + # 如果已经连续两根阴线,下一根K线开盘价平仓 + if consecutive_opposite_count >= 2: + exit_price = float(next_bar['open']) + diff = exit_price - current_position['entry_price'] trades.append({ 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), - 'exit_time': datetime.datetime.fromtimestamp(all_data[idx + 1]['id'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), 'signal': current_position['signal'], - 'direction': '做多' if pos_dir == 'long' else '做空', + 'direction': '做多', 'entry': current_position['entry_price'], 'exit': exit_price, 'diff': diff @@ -160,6 +194,50 @@ def backtest_15m_trend_optimized(dates: List[str]): stats[pos_sig_key]['total_profit'] += diff if diff > 0: stats[pos_sig_key]['wins'] += 1 current_position = None + consecutive_opposite_count = 0 + idx += 1 + continue + else: + # 只有一根阴线,续持 + idx += 1 + continue + + # 持有空单:检查是否连续两根阳线 + elif pos_dir == 'short' and is_bullish(curr): + consecutive_opposite_count += 1 + # 如果已经连续两根阳线,下一根K线开盘价平仓 + if consecutive_opposite_count >= 2: + exit_price = float(next_bar['open']) + diff = current_position['entry_price'] - exit_price + trades.append({ + 'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + 'exit_time': datetime.datetime.fromtimestamp(next_bar['id'] / 1000), + 'signal': current_position['signal'], + 'direction': '做空', + 'entry': current_position['entry_price'], + 'exit': exit_price, + 'diff': diff + }) + stats[pos_sig_key]['total_profit'] += diff + if diff > 0: stats[pos_sig_key]['wins'] += 1 + current_position = None + consecutive_opposite_count = 0 + idx += 1 + continue + else: + # 只有一根阳线,续持 + idx += 1 + continue + + # 3. 同向K线或同向信号 -> 续持,重置连续反色计数 + if (pos_dir == 'long' and is_bullish(curr)) or (pos_dir == 'short' and is_bearish(curr)): + consecutive_opposite_count = 0 # 重置连续反色计数 + + # 同向信号 -> 续持 + if direction and direction == pos_dir: + consecutive_opposite_count = 0 # 重置连续反色计数 + idx += 1 + continue idx += 1 @@ -188,7 +266,7 @@ def backtest_15m_trend_optimized(dates: List[str]): # ========================= 运行示例(优化版盈利计算) ========================= if __name__ == '__main__': dates = [] - for month in range(1, 11): + for month in range(1, 13): # 获取该月的实际天数 days_in_month = calendar.monthrange(2025, month)[1] for day in range(1, days_in_month + 1):