This commit is contained in:
27942
2026-01-18 19:36:48 +08:00
parent 7d9eb698fc
commit 51f3401ea4
3 changed files with 716 additions and 0 deletions

659
mexc/获取数据.py Normal file
View File

@@ -0,0 +1,659 @@
import requests
import pandas as pd
import datetime
import time
from models.mexc import Mexc1, Mexc15, Mexc30, Mexc1Hour
def get_mexc_klines(symbol="SOLUSDT", interval="30m", limit=500):
url = "https://api.mexc.com/api/v3/klines"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
params = {
"symbol": symbol.upper(),
"interval": interval,
"limit": limit
}
try:
resp = requests.get(url, params=params, headers=headers, timeout=10)
resp.raise_for_status() # 提前抛出 HTTP 错误
data = resp.json()
if not isinstance(data, list) or not data:
print("返回数据异常:", data)
return None
# 使用正确的 8 列
columns = [
"open_time", "open", "high", "low", "close",
"volume", "close_time", "quote_volume"
]
df = pd.DataFrame(data, columns=columns)
# 保存原始时间戳用于数据库id
df["open_time_ms"] = df["open_time"].astype(int)
# 时间戳转可读时间
df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
# 常用字段排序 + 类型转换
df = df[[
"open_time", "open", "high", "low", "close",
"volume", "quote_volume", "open_time_ms"
]]
# 转成浮点数(方便后续计算)
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
print(f"成功获取 {len(df)}{interval} K线{symbol}")
return df
except Exception as e:
print("请求失败:", str(e))
return None
def normalize_futures_symbol(symbol):
"""将现货格式转换为合约格式,如 SOLUSDT -> SOL_USDT"""
if "_" in symbol:
return symbol.upper()
symbol = symbol.upper()
for quote in ["USDT", "USDC", "USD", "BTC", "ETH"]:
if symbol.endswith(quote):
base = symbol[:-len(quote)]
return f"{base}_{quote}"
return symbol
def map_futures_interval(interval):
"""合约K线周期映射"""
mapping = {
"1m": "Min1",
"5m": "Min5",
"15m": "Min15",
"30m": "Min30",
"1h": "Min60",
"4h": "Hour4",
"8h": "Hour8",
"1d": "Day1",
"1w": "Week1",
"1M": "Month1",
}
return mapping.get(interval, interval)
def get_mexc_klines_with_time(symbol="SOLUSDT", interval="30m", start_time_ms=None, end_time_ms=None, limit=1000):
"""
获取指定时间范围内的K线数据
:param symbol: 交易对符号
:param interval: 时间间隔,如 "30m", "1m", "15m", "1h"
:param start_time_ms: 开始时间(毫秒级时间戳)
:param end_time_ms: 结束时间(毫秒级时间戳)
:param limit: 单次获取数量最大1000
:return: DataFrame或None
"""
def _fetch(params):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
resp = requests.get(url, params=params, headers=headers, timeout=10)
resp.raise_for_status()
data = resp.json()
return resp, data
url = "https://api.mexc.com/api/v3/klines"
params = {
"symbol": symbol.upper(),
"interval": interval,
"limit": limit
}
if start_time_ms:
params["startTime"] = start_time_ms
if end_time_ms:
params["endTime"] = end_time_ms
try:
resp, data = _fetch(params)
# 检查是否是错误响应
if isinstance(data, dict) and 'code' in data:
print(f"API返回错误: {data}, 请求参数: {params}")
return None
# 尝试秒级时间戳回退(部分接口可能只接受秒)
if (not isinstance(data, list) or not data) and (start_time_ms or end_time_ms):
params_s = params.copy()
if start_time_ms:
params_s["startTime"] = int(start_time_ms // 1000)
if end_time_ms:
params_s["endTime"] = int(end_time_ms // 1000)
resp_s, data_s = _fetch(params_s)
if isinstance(data_s, list) and data_s:
print("检测到秒级时间戳可用,已自动回退到秒级参数")
resp, data = resp_s, data_s
params = params_s
if not isinstance(data, list) or not data:
print(f"API返回数据异常: {data}, 请求参数: {params}")
print(f"完整URL: {resp.url}")
return None
columns = [
"open_time", "open", "high", "low", "close",
"volume", "close_time", "quote_volume"
]
df = pd.DataFrame(data, columns=columns)
# 判断时间单位(秒/毫秒)
max_open = pd.to_numeric(df["open_time"], errors="coerce").max()
if max_open < 1_000_000_000_000: # 小于1e12视为秒
df["open_time_ms"] = (df["open_time"].astype(int) * 1000)
df["open_time"] = pd.to_datetime(df["open_time"], unit="s")
df["close_time"] = pd.to_datetime(df["close_time"], unit="s")
else:
df["open_time_ms"] = df["open_time"].astype(int)
df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
df = df[[
"open_time", "open", "high", "low", "close",
"volume", "quote_volume", "open_time_ms"
]]
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
except Exception as e:
print(f"请求失败: {str(e)}")
return None
def get_mexc_futures_klines(symbol="SOLUSDT", interval="30m", limit=500):
return get_mexc_futures_klines_with_time(
symbol=symbol,
interval=interval,
start_time_ms=None,
end_time_ms=None,
limit=limit
)
def get_mexc_futures_klines_with_time(symbol="SOLUSDT", interval="30m", start_time_ms=None, end_time_ms=None, limit=1000):
"""
获取合约K线数据使用 contract.mexc.com
"""
url = "https://contract.mexc.com/api/v1/contract/kline"
symbol = normalize_futures_symbol(symbol)
interval = map_futures_interval(interval)
def _fetch(params):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json"
}
resp = requests.get(url, params=params, headers=headers, timeout=10)
resp.raise_for_status()
return resp, resp.json()
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
# 合约接口通常使用 start/end秒级这里默认秒级
if start_time_ms:
params["start"] = int(start_time_ms // 1000)
if end_time_ms:
params["end"] = int(end_time_ms // 1000)
try:
resp, data = _fetch(params)
# 兼容错误结构
if isinstance(data, dict) and ("success" in data and data.get("success") is False):
print(f"合约API返回错误: {data}, 请求参数: {params}")
return None
payload = data
if isinstance(data, dict) and "data" in data:
payload = data["data"]
# 兼容返回为 dict of arrays
if isinstance(payload, dict) and "time" in payload:
times = payload.get("time", [])
opens = payload.get("open", [])
highs = payload.get("high", [])
lows = payload.get("low", [])
closes = payload.get("close", [])
vols = payload.get("vol", [])
rows = list(zip(times, opens, highs, lows, closes, vols))
payload = rows
if not isinstance(payload, list) or not payload:
print(f"合约API返回数据异常: {payload}, 请求参数: {params}")
print(f"完整URL: {resp.url}")
return None
# 兼容 list of lists: [time, open, high, low, close, vol, ...]
columns = ["open_time", "open", "high", "low", "close", "volume"]
df = pd.DataFrame(payload, columns=columns + list(range(len(payload[0]) - len(columns))))
df = df[columns]
# 判断时间单位(秒/毫秒)
max_open = pd.to_numeric(df["open_time"], errors="coerce").max()
if max_open < 1_000_000_000_000:
df["open_time_ms"] = (df["open_time"].astype(int) * 1000)
df["open_time"] = pd.to_datetime(df["open_time"], unit="s")
else:
df["open_time_ms"] = df["open_time"].astype(int)
df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
for col in ["open", "high", "low", "close", "volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
# 合约接口不一定返回 quote_volume这里保持一致字段结构
df["quote_volume"] = pd.NA
df = df[[
"open_time", "open", "high", "low", "close",
"volume", "quote_volume", "open_time_ms"
]]
return df
except Exception as e:
print(f"合约请求失败: {str(e)}")
return None
def get_interval_ms(interval):
"""
将时间间隔字符串转换为毫秒数
:param interval: 如 "1m", "15m", "30m", "1h"
:return: 毫秒数
"""
if interval.endswith("m"):
minutes = int(interval[:-1])
return minutes * 60 * 1000
elif interval.endswith("h"):
hours = int(interval[:-1])
return hours * 60 * 60 * 1000
elif interval.endswith("d"):
days = int(interval[:-1])
return days * 24 * 60 * 60 * 1000
else:
return 60 * 1000 # 默认1分钟
def fetch_historical_data(symbol="SOLUSDT", interval="30m", start_date="2024-01-01", end_date=None, auto_save=True, mode="auto", market="spot"):
"""
批量获取历史K线数据并存储到数据库
:param symbol: 交易对符号
:param interval: 时间间隔,如 "30m", "1m", "15m", "1h"
:param start_date: 开始日期,格式 "YYYY-MM-DD",默认 "2024-01-01"
:param end_date: 结束日期,格式 "YYYY-MM-DD",默认当前时间
:param auto_save: 是否自动保存到数据库默认True
:param mode: 获取模式,"auto" 自动判断,"forward" 正向,"reverse" 反向
:return: 总共获取的数据条数
"""
# 转换日期为时间戳(毫秒)
start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d")
start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
start_time_ms = int(start_dt.timestamp() * 1000)
if end_date:
end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d")
end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999)
else:
end_dt = datetime.datetime.now()
end_time_ms = int(end_dt.timestamp() * 1000)
print(f"开始获取 {symbol} {interval} K线数据")
print(f"时间范围: {start_date}{end_dt.strftime('%Y-%m-%d %H:%M:%S')}")
current_start = start_time_ms
total_count = 0
limit = 1000 # API最大限制
interval_ms = get_interval_ms(interval)
# 计算每次请求的时间窗口1000条数据的时间跨度
window_ms = limit * interval_ms
if mode == "reverse":
return fetch_historical_data_reverse(
symbol=symbol,
interval=interval,
start_date=start_date,
end_date=end_date,
auto_save=auto_save,
market=market
)
fetch_fn = get_mexc_klines_with_time
latest_fn = get_mexc_klines
if market == "futures":
fetch_fn = get_mexc_futures_klines_with_time
latest_fn = get_mexc_futures_klines
if mode == "auto":
# 探测:如果 start+end 为空、start 单独返回最新数据,说明 startTime 被忽略,改用反向获取
probe_end = min(start_time_ms + window_ms, end_time_ms)
probe_df = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=start_time_ms,
end_time_ms=probe_end,
limit=10
)
if probe_df is None or probe_df.empty:
probe_df2 = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=start_time_ms,
end_time_ms=None,
limit=10
)
if probe_df2 is not None and not probe_df2.empty:
latest_ms = int(probe_df2.iloc[-1]["open_time_ms"])
if latest_ms > probe_end:
print("检测到 startTime 被忽略,自动改为反向获取模式")
return fetch_historical_data_reverse(
symbol=symbol,
interval=interval,
start_date=start_date,
end_date=end_date,
auto_save=auto_save,
market=market
)
while current_start < end_time_ms:
current_end = min(current_start + window_ms, end_time_ms)
print(f"正在获取: {datetime.datetime.fromtimestamp(current_start/1000)}{datetime.datetime.fromtimestamp(current_end/1000)}")
print(f"时间戳: {current_start}{current_end}")
df = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=current_start,
end_time_ms=current_end,
limit=limit
)
if df is None or df.empty:
# 先测试一下不带时间参数的最新数据是否能获取到
if current_start == start_time_ms and total_count == 0:
print("\n=== 开始调试 ===")
print("测试1尝试获取最新数据不带时间参数...")
test_df = latest_fn(symbol=symbol, interval=interval, limit=10)
if test_df is not None and not test_df.empty:
print(f"✓ 测试1成功最新数据可以获取")
print(f" 最新时间: {test_df.iloc[-1]['open_time']}")
print(f" 最新时间戳: {test_df.iloc[-1]['open_time_ms']}")
# 测试2只带startTime不带endTime
print(f"\n测试2尝试只带startTime{start_date}开始)...")
test_df2 = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=current_start,
end_time_ms=None,
limit=100
)
if test_df2 is not None and not test_df2.empty:
print(f"✓ 测试2成功从指定时间开始获取数据")
print(f" 获取到 {len(test_df2)} 条数据")
print(f" 第一条时间: {test_df2.iloc[0]['open_time']}")
print(f" 最后一条时间: {test_df2.iloc[-1]['open_time']}")
print("=== 调试结束 ===\n")
# 使用测试2的方法继续赋值给df
df = test_df2
# 不break继续下面的处理流程
else:
print("✗ 测试2失败带startTime无法获取数据")
print("\n建议MEXC API可能不支持从历史日期开始查询")
print("可以尝试1. 检查时间范围是否在API支持范围内")
print(" 2. 使用反向获取:从最新数据往前推")
print("=== 调试结束 ===\n")
break
else:
print("✗ 测试1失败即使不带时间参数也无法获取数据")
print("可能原因:交易对符号或时间间隔不正确")
print("=== 调试结束 ===\n")
break
# 如果测试后df仍然是None或空说明无法获取数据
if df is None or df.empty:
if total_count > 0:
print("已到达数据末尾")
else:
print("本次获取数据为空,跳过")
break
count = len(df)
total_count += count
print(f"本次获取 {count} 条数据,累计 {total_count}")
# 自动保存到数据库
if auto_save:
save_to_database(df, interval=interval, symbol=symbol)
# 更新下一次的起始时间(从最后一条数据的开盘时间 + 一个间隔)
last_open_ms = int(df.iloc[-1]['open_time_ms'])
current_start = last_open_ms + interval_ms
# 避免请求过快
time.sleep(0.2)
print(f"完成!总共获取 {total_count}{symbol} {interval} K线数据")
return total_count
def fetch_historical_data_reverse(symbol="SOLUSDT", interval="30m", start_date="2024-01-01", end_date=None, auto_save=True, market="spot"):
"""
反向获取历史K线数据从最新往前推使用startTime参数
"""
start_dt = datetime.datetime.strptime(start_date, "%Y-%m-%d")
start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
start_time_ms = int(start_dt.timestamp() * 1000)
if end_date:
end_dt = datetime.datetime.strptime(end_date, "%Y-%m-%d")
end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999)
else:
end_dt = datetime.datetime.now()
end_time_ms = int(end_dt.timestamp() * 1000)
print(f"开始反向获取 {symbol} {interval} K线数据从最新往前推")
print(f"目标时间范围: {start_date}{end_dt.strftime('%Y-%m-%d %H:%M:%S')}")
total_count = 0
limit = 1000 # 每次尝试获取1000条
interval_ms = get_interval_ms(interval)
# 使用带时间参数的函数
fetch_fn = get_mexc_klines_with_time
if market == "futures":
fetch_fn = get_mexc_futures_klines_with_time
# 先获取最新数据,确定当前最新时间
print("\n第 1 批:获取最新数据...")
df_latest = get_mexc_klines(symbol=symbol, interval=interval, limit=100)
if df_latest is None or df_latest.empty:
print("无法获取最新数据,停止")
return 0
latest_time_ms = int(df_latest.iloc[-1]["open_time_ms"])
earliest_fetched_ms = int(df_latest.iloc[0]["open_time_ms"])
# 保存第一批数据
df_latest_filtered = df_latest[df_latest["open_time_ms"] >= start_time_ms]
if not df_latest_filtered.empty:
count = len(df_latest_filtered)
total_count += count
print(f"获取 {count} 条最新数据,时间范围: {datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)}{datetime.datetime.fromtimestamp(latest_time_ms/1000)}")
if auto_save:
save_to_database(df_latest_filtered, interval=interval, symbol=symbol)
# 从已获取的最早时间往前推
current_start_ms = earliest_fetched_ms - interval_ms
batch_num = 1
while current_start_ms >= start_time_ms:
batch_num += 1
# 计算本次请求的时间窗口往前推limit条数据的时间跨度
window_ms = limit * interval_ms
batch_start_ms = max(current_start_ms - window_ms, start_time_ms)
print(f"\n{batch_num} 批:从 {datetime.datetime.fromtimestamp(batch_start_ms/1000)} 开始获取...")
# 使用startTime参数尝试获取更早的数据
df = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=batch_start_ms,
end_time_ms=current_start_ms,
limit=limit
)
if df is None or df.empty:
# 如果返回空尝试只使用startTime不带endTime
print(" 带endTime返回空尝试只使用startTime...")
df = fetch_fn(
symbol=symbol,
interval=interval,
start_time_ms=batch_start_ms,
end_time_ms=None,
limit=limit
)
if df is None or df.empty:
print(f" 无法获取数据可能已到达API历史数据限制")
break
# 检查返回的数据是否真的从startTime开始判断startTime是否被忽略
returned_earliest = int(df.iloc[0]["open_time_ms"])
returned_latest = int(df.iloc[-1]["open_time_ms"])
# 如果返回的数据仍然是最新的说明startTime被忽略
if returned_latest >= latest_time_ms - 1000: # 允许1秒误差
print(f" 检测到startTime被忽略返回最新数据无法继续往前获取")
break
# 过滤掉已获取的数据和早于start_date的数据
df = df[df["open_time_ms"] < current_start_ms]
df = df[df["open_time_ms"] >= start_time_ms]
if df.empty:
print(f" 过滤后无新数据")
# 如果已经到达目标起始时间,完成
if batch_start_ms <= start_time_ms:
print(f"已到达目标起始时间({start_date}),完成")
break
# 否则继续往前推
current_start_ms = batch_start_ms - interval_ms
time.sleep(0.3)
continue
# 更新已获取的最早时间戳
current_earliest = int(df.iloc[0]["open_time_ms"])
if current_earliest < earliest_fetched_ms:
earliest_fetched_ms = current_earliest
count = len(df)
total_count += count
print(f" 获取 {count} 条新数据,最早时间: {datetime.datetime.fromtimestamp(current_earliest/1000)}")
print(f" 累计获取 {total_count} 条数据")
if auto_save:
save_to_database(df, interval=interval, symbol=symbol)
# 更新下一次的起始时间
current_start_ms = current_earliest - interval_ms
# 避免请求过快
time.sleep(0.3)
# 安全限制
if batch_num > 500:
print("批次过多,停止")
break
print(f"\n完成!总共获取 {total_count}{symbol} {interval} K线数据")
if earliest_fetched_ms:
print(f"最早数据时间: {datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)}")
if earliest_fetched_ms > start_time_ms:
print(f"⚠️ 注意:最早数据时间 ({datetime.datetime.fromtimestamp(earliest_fetched_ms/1000)}) 晚于目标起始时间 ({start_date})")
print(f" 这可能是因为MEXC API的历史数据限制无法获取更早的数据")
return total_count
def save_to_database(df, interval="30m", symbol="SOLUSDT"):
"""
将K线数据存储到数据库
:param df: pandas DataFrame包含K线数据
:param interval: 时间间隔,如 "30m", "1m", "15m", "1h"
:param symbol: 交易对符号
"""
if df is None or df.empty:
print("数据为空,无法存储")
return
# 根据时间间隔选择对应的模型
interval_map = {
"1m": Mexc1,
"15m": Mexc15,
"30m": Mexc30,
"1h": Mexc1Hour,
}
model_class = interval_map.get(interval)
if model_class is None:
print(f"不支持的时间间隔: {interval}")
return
# 存储数据到数据库
saved_count = 0
for _, row in df.iterrows():
# 使用原始时间戳(毫秒级)作为 id
timestamp_ms = int(row['open_time_ms'])
model_class.get_or_create(
id=timestamp_ms,
defaults={
'open': float(row['open']),
'high': float(row['high']),
'low': float(row['low']),
'close': float(row['close']),
}
)
saved_count += 1
print(f"成功存储 {saved_count}{interval} K线数据{symbol})到数据库")
# 使用示例
if __name__ == "__main__":
# 方式1: 获取最新200条数据
# df = get_mexc_klines("SOLUSDT", "30m", 200)
# if df is not None:
# print(df.tail(8))
# save_to_database(df, interval="30m", symbol="SOLUSDT")
# 方式2: 批量获取2024年至今的历史数据使用现货接口反向获取
fetch_historical_data(
symbol="SOLUSDT",
interval="30m",
start_date="2024-01-01",
end_date=None, # None表示到当前时间
auto_save=True,
market="spot", # 使用现货接口
mode="reverse" # 强制使用反向获取模式(从最新往前推)
)

Binary file not shown.

57
models/mexc.py Normal file
View File

@@ -0,0 +1,57 @@
from peewee import *
from models import db
class Mexc1(Model):
id = IntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'mexc_1'
class Mexc15(Model):
id = IntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'mexc_15'
class Mexc30(Model):
id = IntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'mexc_30'
class Mexc1Hour(Model):
id = IntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'mexc_1_hour'
# 连接到数据库
db.connect()
# 创建表(如果表不存在)
db.create_tables([Mexc1, Mexc15, Mexc30, Mexc1Hour])