diff --git a/mexc/获取数据.py b/mexc/获取数据.py new file mode 100644 index 0000000..158d622 --- /dev/null +++ b/mexc/获取数据.py @@ -0,0 +1,659 @@ +import requests +import pandas as pd +import datetime +import time +from models.mexc import Mexc1, Mexc15, Mexc30, Mexc1Hour + + +def get_mexc_klines(symbol="SOLUSDT", interval="30m", limit=500): + url = "https://api.mexc.com/api/v3/klines" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + } + params = { + "symbol": symbol.upper(), + "interval": interval, + "limit": limit + } + + try: + resp = requests.get(url, params=params, headers=headers, timeout=10) + resp.raise_for_status() # 提前抛出 HTTP 错误 + data = resp.json() + + if not isinstance(data, list) or not data: + print("返回数据异常:", data) + return None + + # 使用正确的 8 列 + columns = [ + "open_time", "open", "high", "low", "close", + "volume", "close_time", "quote_volume" + ] + + df = pd.DataFrame(data, columns=columns) + + # 保存原始时间戳(用于数据库id) + df["open_time_ms"] = df["open_time"].astype(int) + + # 时间戳转可读时间 + df["open_time"] = pd.to_datetime(df["open_time"], unit="ms") + df["close_time"] = pd.to_datetime(df["close_time"], unit="ms") + + # 常用字段排序 + 类型转换 + df = df[[ + "open_time", "open", "high", "low", "close", + "volume", "quote_volume", "open_time_ms" + ]] + + # 转成浮点数(方便后续计算) + for col in ["open", "high", "low", "close", "volume", "quote_volume"]: + df[col] = pd.to_numeric(df[col], errors="coerce") + + print(f"成功获取 {len(df)} 条 {interval} K线({symbol})") + return df + + except Exception as e: + print("请求失败:", str(e)) + return None + + +def normalize_futures_symbol(symbol): + """将现货格式转换为合约格式,如 SOLUSDT -> SOL_USDT""" + if "_" in symbol: + return symbol.upper() + symbol = symbol.upper() + for quote in ["USDT", "USDC", "USD", "BTC", "ETH"]: + if symbol.endswith(quote): + base = symbol[:-len(quote)] + return f"{base}_{quote}" + return symbol + + +def map_futures_interval(interval): + """合约K线周期映射""" + mapping = { + "1m": "Min1", + "5m": "Min5", + "15m": "Min15", + "30m": "Min30", + "1h": "Min60", + "4h": "Hour4", + "8h": "Hour8", + "1d": "Day1", + "1w": "Week1", + "1M": "Month1", + } + return mapping.get(interval, interval) + + +def get_mexc_klines_with_time(symbol="SOLUSDT", interval="30m", start_time_ms=None, end_time_ms=None, limit=1000): + """ + 获取指定时间范围内的K线数据 + :param symbol: 交易对符号 + :param interval: 时间间隔,如 "30m", "1m", "15m", "1h" + :param start_time_ms: 开始时间(毫秒级时间戳) + :param end_time_ms: 结束时间(毫秒级时间戳) + :param limit: 单次获取数量(最大1000) + :return: DataFrame或None + """ + def _fetch(params): + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + } + resp = requests.get(url, params=params, headers=headers, timeout=10) + resp.raise_for_status() + data = resp.json() + return resp, data + + url = "https://api.mexc.com/api/v3/klines" + params = { + "symbol": symbol.upper(), + "interval": interval, + "limit": limit + } + if start_time_ms: + params["startTime"] = start_time_ms + if end_time_ms: + params["endTime"] = end_time_ms + + try: + resp, data = _fetch(params) + + # 检查是否是错误响应 + if isinstance(data, dict) and 'code' in data: + print(f"API返回错误: {data}, 请求参数: {params}") + return None + + # 尝试秒级时间戳回退(部分接口可能只接受秒) + if (not isinstance(data, list) or not data) and (start_time_ms or end_time_ms): + params_s = params.copy() + if start_time_ms: + params_s["startTime"] = int(start_time_ms // 1000) + if end_time_ms: + params_s["endTime"] = int(end_time_ms // 1000) + resp_s, data_s = _fetch(params_s) + if isinstance(data_s, list) and data_s: + print("检测到秒级时间戳可用,已自动回退到秒级参数") + resp, data = resp_s, data_s + params = params_s + + if not isinstance(data, list) or not data: + print(f"API返回数据异常: {data}, 请求参数: {params}") + print(f"完整URL: {resp.url}") + return None + + columns = [ + "open_time", "open", "high", "low", "close", + "volume", "close_time", "quote_volume" + ] + + df = pd.DataFrame(data, columns=columns) + # 判断时间单位(秒/毫秒) + max_open = pd.to_numeric(df["open_time"], errors="coerce").max() + if max_open < 1_000_000_000_000: # 小于1e12视为秒 + df["open_time_ms"] = (df["open_time"].astype(int) * 1000) + df["open_time"] = pd.to_datetime(df["open_time"], unit="s") + df["close_time"] = pd.to_datetime(df["close_time"], unit="s") + else: + df["open_time_ms"] = df["open_time"].astype(int) + df["open_time"] = pd.to_datetime(df["open_time"], unit="ms") + df["close_time"] = pd.to_datetime(df["close_time"], unit="ms") + + df = df[[ + "open_time", "open", "high", "low", "close", + "volume", "quote_volume", "open_time_ms" + ]] + + for col in ["open", "high", "low", "close", "volume", "quote_volume"]: + df[col] = pd.to_numeric(df[col], errors="coerce") + + return df + + except Exception as e: + print(f"请求失败: {str(e)}") + return None + + +def get_mexc_futures_klines(symbol="SOLUSDT", interval="30m", limit=500): + return get_mexc_futures_klines_with_time( + symbol=symbol, + interval=interval, + start_time_ms=None, + end_time_ms=None, + limit=limit + ) + + +def get_mexc_futures_klines_with_time(symbol="SOLUSDT", interval="30m", start_time_ms=None, end_time_ms=None, limit=1000): + """ + 获取合约K线数据(使用 contract.mexc.com) + """ + url = "https://contract.mexc.com/api/v1/contract/kline" + symbol = normalize_futures_symbol(symbol) + interval = map_futures_interval(interval) + + def _fetch(params): + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Content-Type": "application/json" + } + resp = requests.get(url, params=params, headers=headers, timeout=10) + resp.raise_for_status() + return resp, resp.json() + + params = { + "symbol": symbol, + "interval": interval, + "limit": limit + } + # 合约接口通常使用 start/end(秒级),这里默认秒级 + if start_time_ms: + params["start"] = int(start_time_ms // 1000) + if end_time_ms: + params["end"] = int(end_time_ms // 1000) + + try: + resp, data = _fetch(params) + + # 兼容错误结构 + if isinstance(data, dict) and ("success" in data and data.get("success") is False): + print(f"合约API返回错误: {data}, 请求参数: {params}") + return None + + payload = data + if isinstance(data, dict) and "data" in data: + payload = data["data"] + + # 兼容返回为 dict of arrays + if isinstance(payload, dict) and "time" in payload: + times = payload.get("time", []) + opens = payload.get("open", []) + highs = payload.get("high", []) + lows = payload.get("low", []) + closes = payload.get("close", []) + vols = payload.get("vol", []) + rows = list(zip(times, opens, highs, lows, closes, vols)) + payload = rows + + if not isinstance(payload, list) or not payload: + print(f"合约API返回数据异常: {payload}, 请求参数: {params}") + print(f"完整URL: {resp.url}") + return None + + # 兼容 list of lists: [time, open, high, low, close, vol, ...] + columns = ["open_time", "open", "high", "low", "close", "volume"] + df = pd.DataFrame(payload, columns=columns + list(range(len(payload[0]) - len(columns)))) + df = df[columns] + + # 判断时间单位(秒/毫秒) + max_open = pd.to_numeric(df["open_time"], errors="coerce").max() + if max_open < 1_000_000_000_000: + df["open_time_ms"] = (df["open_time"].astype(int) * 1000) + df["open_time"] = pd.to_datetime(df["open_time"], unit="s") + else: + df["open_time_ms"] = df["open_time"].astype(int) + df["open_time"] = pd.to_datetime(df["open_time"], unit="ms") + + for col in ["open", "high", "low", "close", "volume"]: + df[col] = pd.to_numeric(df[col], errors="coerce") + + # 合约接口不一定返回 quote_volume,这里保持一致字段结构 + df["quote_volume"] = pd.NA + df = df[[ + "open_time", "open", "high", "low", "close", + "volume", "quote_volume", "open_time_ms" + ]] + + return df + + except Exception as e: + print(f"合约请求失败: {str(e)}") + return None + + +def get_interval_ms(interval): + """ + 将时间间隔字符串转换为毫秒数 + :param interval: 如 "1m", "15m", "30m", "1h" + :return: 毫秒数 + """ + if interval.endswith("m"): + minutes = int(interval[:-1]) + return minutes * 60 * 1000 + elif interval.endswith("h"): + hours = int(interval[:-1]) + return hours * 60 * 60 * 1000 + elif interval.endswith("d"): + days = int(interval[:-1]) + return days * 24 * 60 * 60 * 1000 + else: + return 60 * 1000 # 默认1分钟 + + +def fetch_historical_data(symbol="SOLUSDT", interval="30m", start_date="2024-01-01", end_date=None, auto_save=True, mode="auto", market="spot"): + """ + 批量获取历史K线数据并存储到数据库 + :param symbol: 交易对符号 + :param interval: 时间间隔,如 "30m", "1m", "15m", "1h" + :param start_date: 开始日期,格式 "YYYY-MM-DD",默认 "2024-01-01" + :param end_date: 结束日期,格式 "YYYY-MM-DD",默认当前时间 + :param auto_save: 是否自动保存到数据库,默认True + :param mode: 获取模式,"auto" 自动判断,"forward" 正向,"reverse" 反向 + :return: 总共获取的数据条数 + """ + # 转换日期为时间戳(毫秒) + start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d") + start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0) + start_time_ms = int(start_dt.timestamp() * 1000) + + if end_date: + end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d") + end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999) + else: + end_dt = datetime.datetime.now() + end_time_ms = int(end_dt.timestamp() * 1000) + + print(f"开始获取 {symbol} {interval} K线数据") + print(f"时间范围: {start_date} 至 {end_dt.strftime('%Y-%m-%d %H:%M:%S')}") + + current_start = start_time_ms + total_count = 0 + limit = 1000 # API最大限制 + interval_ms = get_interval_ms(interval) + + # 计算每次请求的时间窗口(1000条数据的时间跨度) + window_ms = limit * interval_ms + + if mode == "reverse": + return fetch_historical_data_reverse( + symbol=symbol, + interval=interval, + start_date=start_date, + end_date=end_date, + auto_save=auto_save, + market=market + ) + + fetch_fn = get_mexc_klines_with_time + latest_fn = get_mexc_klines + if market == "futures": + fetch_fn = get_mexc_futures_klines_with_time + latest_fn = get_mexc_futures_klines + + if mode == "auto": + # 探测:如果 start+end 为空、start 单独返回最新数据,说明 startTime 被忽略,改用反向获取 + probe_end = min(start_time_ms + window_ms, end_time_ms) + probe_df = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=start_time_ms, + end_time_ms=probe_end, + limit=10 + ) + if probe_df is None or probe_df.empty: + probe_df2 = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=start_time_ms, + end_time_ms=None, + limit=10 + ) + if probe_df2 is not None and not probe_df2.empty: + latest_ms = int(probe_df2.iloc[-1]["open_time_ms"]) + if latest_ms > probe_end: + print("检测到 startTime 被忽略,自动改为反向获取模式") + return fetch_historical_data_reverse( + symbol=symbol, + interval=interval, + start_date=start_date, + end_date=end_date, + auto_save=auto_save, + market=market + ) + + while current_start < end_time_ms: + current_end = min(current_start + window_ms, end_time_ms) + + print(f"正在获取: {datetime.datetime.fromtimestamp(current_start/1000)} 至 {datetime.datetime.fromtimestamp(current_end/1000)}") + print(f"时间戳: {current_start} 至 {current_end}") + + df = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=current_start, + end_time_ms=current_end, + limit=limit + ) + + if df is None or df.empty: + # 先测试一下不带时间参数的最新数据是否能获取到 + if current_start == start_time_ms and total_count == 0: + print("\n=== 开始调试 ===") + print("测试1:尝试获取最新数据(不带时间参数)...") + test_df = latest_fn(symbol=symbol, interval=interval, limit=10) + if test_df is not None and not test_df.empty: + print(f"✓ 测试1成功!最新数据可以获取") + print(f" 最新时间: {test_df.iloc[-1]['open_time']}") + print(f" 最新时间戳: {test_df.iloc[-1]['open_time_ms']}") + + # 测试2:只带startTime,不带endTime + print(f"\n测试2:尝试只带startTime(从{start_date}开始)...") + test_df2 = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=current_start, + end_time_ms=None, + limit=100 + ) + if test_df2 is not None and not test_df2.empty: + print(f"✓ 测试2成功!从指定时间开始获取数据") + print(f" 获取到 {len(test_df2)} 条数据") + print(f" 第一条时间: {test_df2.iloc[0]['open_time']}") + print(f" 最后一条时间: {test_df2.iloc[-1]['open_time']}") + print("=== 调试结束 ===\n") + # 使用测试2的方法继续,赋值给df + df = test_df2 + # 不break,继续下面的处理流程 + else: + print("✗ 测试2失败:带startTime无法获取数据") + print("\n建议:MEXC API可能不支持从历史日期开始查询") + print("可以尝试:1. 检查时间范围是否在API支持范围内") + print(" 2. 使用反向获取:从最新数据往前推") + print("=== 调试结束 ===\n") + break + else: + print("✗ 测试1失败:即使不带时间参数也无法获取数据") + print("可能原因:交易对符号或时间间隔不正确") + print("=== 调试结束 ===\n") + break + + # 如果测试后df仍然是None或空,说明无法获取数据 + if df is None or df.empty: + if total_count > 0: + print("已到达数据末尾") + else: + print("本次获取数据为空,跳过") + break + + count = len(df) + total_count += count + print(f"本次获取 {count} 条数据,累计 {total_count} 条") + + # 自动保存到数据库 + if auto_save: + save_to_database(df, interval=interval, symbol=symbol) + + # 更新下一次的起始时间(从最后一条数据的开盘时间 + 一个间隔) + last_open_ms = int(df.iloc[-1]['open_time_ms']) + current_start = last_open_ms + interval_ms + + # 避免请求过快 + time.sleep(0.2) + + print(f"完成!总共获取 {total_count} 条 {symbol} {interval} K线数据") + return total_count + + +def fetch_historical_data_reverse(symbol="SOLUSDT", interval="30m", start_date="2024-01-01", end_date=None, auto_save=True, market="spot"): + """ + 反向获取历史K线数据(从最新往前推,使用startTime参数) + """ + start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d") + start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0) + start_time_ms = int(start_dt.timestamp() * 1000) + + if end_date: + end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d") + end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999) + else: + end_dt = datetime.datetime.now() + end_time_ms = int(end_dt.timestamp() * 1000) + + print(f"开始反向获取 {symbol} {interval} K线数据(从最新往前推)") + print(f"目标时间范围: {start_date} 至 {end_dt.strftime('%Y-%m-%d %H:%M:%S')}") + + total_count = 0 + limit = 1000 # 每次尝试获取1000条 + interval_ms = get_interval_ms(interval) + + # 使用带时间参数的函数 + fetch_fn = get_mexc_klines_with_time + if market == "futures": + fetch_fn = get_mexc_futures_klines_with_time + + # 先获取最新数据,确定当前最新时间 + print("\n第 1 批:获取最新数据...") + df_latest = get_mexc_klines(symbol=symbol, interval=interval, limit=100) + if df_latest is None or df_latest.empty: + print("无法获取最新数据,停止") + return 0 + + latest_time_ms = int(df_latest.iloc[-1]["open_time_ms"]) + earliest_fetched_ms = int(df_latest.iloc[0]["open_time_ms"]) + + # 保存第一批数据 + df_latest_filtered = df_latest[df_latest["open_time_ms"] >= start_time_ms] + if not df_latest_filtered.empty: + count = len(df_latest_filtered) + total_count += count + print(f"获取 {count} 条最新数据,时间范围: {datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)} 至 {datetime.datetime.fromtimestamp(latest_time_ms/1000)}") + if auto_save: + save_to_database(df_latest_filtered, interval=interval, symbol=symbol) + + # 从已获取的最早时间往前推 + current_start_ms = earliest_fetched_ms - interval_ms + batch_num = 1 + + while current_start_ms >= start_time_ms: + batch_num += 1 + # 计算本次请求的时间窗口(往前推limit条数据的时间跨度) + window_ms = limit * interval_ms + batch_start_ms = max(current_start_ms - window_ms, start_time_ms) + + print(f"\n第 {batch_num} 批:从 {datetime.datetime.fromtimestamp(batch_start_ms/1000)} 开始获取...") + + # 使用startTime参数,尝试获取更早的数据 + df = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=batch_start_ms, + end_time_ms=current_start_ms, + limit=limit + ) + + if df is None or df.empty: + # 如果返回空,尝试只使用startTime,不带endTime + print(" 带endTime返回空,尝试只使用startTime...") + df = fetch_fn( + symbol=symbol, + interval=interval, + start_time_ms=batch_start_ms, + end_time_ms=None, + limit=limit + ) + + if df is None or df.empty: + print(f" 无法获取数据,可能已到达API历史数据限制") + break + + # 检查返回的数据是否真的从startTime开始(判断startTime是否被忽略) + returned_earliest = int(df.iloc[0]["open_time_ms"]) + returned_latest = int(df.iloc[-1]["open_time_ms"]) + + # 如果返回的数据仍然是最新的,说明startTime被忽略 + if returned_latest >= latest_time_ms - 1000: # 允许1秒误差 + print(f" 检测到startTime被忽略(返回最新数据),无法继续往前获取") + break + + # 过滤掉已获取的数据和早于start_date的数据 + df = df[df["open_time_ms"] < current_start_ms] + df = df[df["open_time_ms"] >= start_time_ms] + + if df.empty: + print(f" 过滤后无新数据") + # 如果已经到达目标起始时间,完成 + if batch_start_ms <= start_time_ms: + print(f"已到达目标起始时间({start_date}),完成") + break + # 否则继续往前推 + current_start_ms = batch_start_ms - interval_ms + time.sleep(0.3) + continue + + # 更新已获取的最早时间戳 + current_earliest = int(df.iloc[0]["open_time_ms"]) + if current_earliest < earliest_fetched_ms: + earliest_fetched_ms = current_earliest + + count = len(df) + total_count += count + print(f" 获取 {count} 条新数据,最早时间: {datetime.datetime.fromtimestamp(current_earliest/1000)}") + print(f" 累计获取 {total_count} 条数据") + + if auto_save: + save_to_database(df, interval=interval, symbol=symbol) + + # 更新下一次的起始时间 + current_start_ms = current_earliest - interval_ms + + # 避免请求过快 + time.sleep(0.3) + + # 安全限制 + if batch_num > 500: + print("批次过多,停止") + break + + print(f"\n完成!总共获取 {total_count} 条 {symbol} {interval} K线数据") + if earliest_fetched_ms: + print(f"最早数据时间: {datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)}") + if earliest_fetched_ms > start_time_ms: + print(f"⚠️ 注意:最早数据时间 ({datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)}) 晚于目标起始时间 ({start_date})") + print(f" 这可能是因为MEXC API的历史数据限制,无法获取更早的数据") + return total_count + + +def save_to_database(df, interval="30m", symbol="SOLUSDT"): + """ + 将K线数据存储到数据库 + :param df: pandas DataFrame,包含K线数据 + :param interval: 时间间隔,如 "30m", "1m", "15m", "1h" + :param symbol: 交易对符号 + """ + if df is None or df.empty: + print("数据为空,无法存储") + return + + # 根据时间间隔选择对应的模型 + interval_map = { + "1m": Mexc1, + "15m": Mexc15, + "30m": Mexc30, + "1h": Mexc1Hour, + } + + model_class = interval_map.get(interval) + if model_class is None: + print(f"不支持的时间间隔: {interval}") + return + + # 存储数据到数据库 + saved_count = 0 + for _, row in df.iterrows(): + # 使用原始时间戳(毫秒级)作为 id + timestamp_ms = int(row['open_time_ms']) + + model_class.get_or_create( + id=timestamp_ms, + defaults={ + 'open': float(row['open']), + 'high': float(row['high']), + 'low': float(row['low']), + 'close': float(row['close']), + } + ) + saved_count += 1 + + print(f"成功存储 {saved_count} 条 {interval} K线数据({symbol})到数据库") + + +# 使用示例 +if __name__ == "__main__": + # 方式1: 获取最新200条数据 + # df = get_mexc_klines("SOLUSDT", "30m", 200) + # if df is not None: + # print(df.tail(8)) + # save_to_database(df, interval="30m", symbol="SOLUSDT") + + # 方式2: 批量获取2024年至今的历史数据(使用现货接口,反向获取) + fetch_historical_data( + symbol="SOLUSDT", + interval="30m", + start_date="2024-01-01", + end_date=None, # None表示到当前时间 + auto_save=True, + market="spot", # 使用现货接口 + mode="reverse" # 强制使用反向获取模式(从最新往前推) + ) \ No newline at end of file diff --git a/models/database.db b/models/database.db index cdcb63b..af6a2c2 100644 Binary files a/models/database.db and b/models/database.db differ diff --git a/models/mexc.py b/models/mexc.py new file mode 100644 index 0000000..54e05e9 --- /dev/null +++ b/models/mexc.py @@ -0,0 +1,57 @@ +from peewee import * + +from models import db + + +class Mexc1(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_1' + + +class Mexc15(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_15' + + +class Mexc30(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_30' + + +class Mexc1Hour(Model): + id = IntegerField(primary_key=True) + open = FloatField(null=True) + high = FloatField(null=True) + low = FloatField(null=True) + close = FloatField(null=True) + + class Meta: + database = db + table_name = 'mexc_1_hour' + + +# 连接到数据库 +db.connect() +# 创建表(如果表不存在) +db.create_tables([Mexc1, Mexc15, Mexc30, Mexc1Hour])