Files
lm_code/bitmart/回测.py
2025-12-19 19:16:41 +08:00

84 lines
2.5 KiB
Python

import time
import csv
import loguru
from bitmart.api_contract import APIContract
# ------------------ 配置 ------------------
START_YEAR = 2025
CONTRACT_SYMBOL = "ETHUSDT"
STEP = 3 # K 线周期,单位分钟
CSV_FILE = f"kline_{STEP}.csv"
memo = "合约交易"
api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15))
# ------------------ 时间戳 ------------------
start_of_year = int(time.mktime((START_YEAR, 1, 1, 0, 0, 0, 0, 0, 0)))
current_time = int(time.time())
# ------------------ 抓取数据 ------------------
all_data = []
existing_ids = set()
start_time = start_of_year
# 每次请求时间长度 = step * 500 条 K 线
request_interval_ms = STEP * 60 * 500
while start_time < current_time:
end_time = min(start_time + request_interval_ms, current_time)
loguru.logger.info(f"抓取时间段: {start_time} ~ {end_time}")
try:
response = contractAPI.get_kline(
contract_symbol=CONTRACT_SYMBOL,
step=STEP,
start_time=start_time,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
print(k)
k_id = int(k["timestamp"])
if k_id in existing_ids:
continue
existing_ids.add(k_id)
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"]),
'volume': float(k["volume"])
})
formatted.sort(key=lambda x: x['id'])
all_data.extend(formatted)
if len(response) < 500:
start_time = end_time
else:
start_time = formatted[-1]['id'] + 1
except Exception as e:
print(f"请求出错: {e},等待 60 秒后重试")
time.sleep(60)
time.sleep(0.2) # 控制速率,保证 <= 每 2 秒 12 次
# ------------------ 保存 CSV ------------------
csv_columns = ['id', 'open', 'high', 'low', 'close', 'volume']
try:
with open(CSV_FILE, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for data in all_data:
writer.writerow(data)
print(f"数据已保存到 {CSV_FILE},共 {len(all_data)}")
except IOError:
print("I/O error")