import json import random import time import sqlite3 import requests import threading from typing import Any, Dict, Optional, Tuple, List from concurrent.futures import ThreadPoolExecutor, as_completed from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry GAMMA = "https://gamma-api.polymarket.com" CLOB = "https://clob.polymarket.com" INTERVAL_SEC = 15 * 60 # 900 FETCH_INTERVAL = 0.5 # 每0.5秒获取一次,即每秒2次 # 支持的币种列表 CRYPTOS = [ {"symbol": "btc", "slug_prefix": "btc-updown-15m"}, {"symbol": "sol", "slug_prefix": "sol-updown-15m"}, {"symbol": "eth", "slug_prefix": "eth-updown-15m"}, {"symbol": "xrp", "slug_prefix": "xrp-updown-15m"}, ] def make_session() -> requests.Session: """创建带有重试机制的HTTP会话""" s = requests.Session() proxies = { 'http': f'http://192.168.1.79:{random.randint(20001, 25000)}', 'https': f'http://192.168.1.79:{random.randint(20001, 25000)}', } s.proxies.update(proxies) s.headers.update({"User-Agent": "Mozilla/5.0"}) retry = Retry( total=5, backoff_factor=0.4, status_forcelist=(429, 500, 502, 503, 504), allowed_methods=("GET",), raise_on_status=False, ) adapter = HTTPAdapter(max_retries=retry, pool_connections=50, pool_maxsize=50) s.mount("https://", adapter) s.mount("http://", adapter) return s def parse_jsonish_list(v): if v is None: return [] if isinstance(v, list): return v if isinstance(v, str): s = v.strip() if s.startswith("[") and s.endswith("]"): return json.loads(s) return [x.strip() for x in s.split(",") if x.strip()] return [] def get_market_by_slug(session: requests.Session, slug: str) -> Dict[str, Any]: r = session.get(f"{GAMMA}/markets/slug/{slug}", timeout=20) if r.status_code == 404: raise FileNotFoundError(slug) r.raise_for_status() return r.json() def get_price(session: requests.Session, token_id: str, side: str) -> Optional[float]: r = session.get(f"{CLOB}/price", params={"token_id": token_id, "side": side}, timeout=20) r.raise_for_status() p = r.json().get("price") return float(p) if p is not None else None def get_book(session: requests.Session, token_id: str) -> Dict[str, Any]: r = session.get(f"{CLOB}/book", params={"token_id": token_id}, timeout=20) r.raise_for_status() return r.json() def compute_mid(bid: Optional[float], ask: Optional[float]) -> Tuple[Optional[float], Optional[float]]: if bid is None and ask is None: return None, None if bid is None: return ask, None if ask is None: return bid, None return (bid + ask) / 2.0, (ask - bid) def trim_book(book: Dict[str, Any], top_n: int = 10) -> Dict[str, Any]: bids = book.get("bids") or [] asks = book.get("asks") or [] bids = sorted(bids, key=lambda x: float(x["price"]), reverse=True)[:top_n] asks = sorted(asks, key=lambda x: float(x["price"]))[:top_n] return {"bids": bids, "asks": asks} def fetch_token_bundle(session: requests.Session, token_id: str, top_n: int = 10) -> Dict[str, Any]: with ThreadPoolExecutor(max_workers=3) as ex: futs = { ex.submit(get_price, session, token_id, "buy"): "bid", ex.submit(get_price, session, token_id, "sell"): "ask", ex.submit(get_book, session, token_id): "book", } out: Dict[str, Any] = {"token_id": token_id} for fut in as_completed(futs): out[futs[fut]] = fut.result() mid, spread = compute_mid(out.get("bid"), out.get("ask")) out["mid"] = mid out["spread"] = spread out["book"] = trim_book(out.get("book") or {}, top_n=top_n) return out def current_bucket_ts(interval_sec: int = INTERVAL_SEC, mode: str = "floor") -> int: now = int(time.time()) if mode == "ceil": return ((now + interval_sec - 1) // interval_sec) * interval_sec return (now // interval_sec) * interval_sec def build_slug(prefix: str, bucket_ts: int) -> str: return f"{prefix}-{bucket_ts}" def fetch_updown_by_slug(session: requests.Session, slug: str, decimals: int = 0, top_n: int = 10) -> Dict[str, Any]: m = get_market_by_slug(session, slug) outcomes = [str(x) for x in parse_jsonish_list(m.get("outcomes"))] token_ids = [str(x) for x in parse_jsonish_list(m.get("clobTokenIds"))] if len(token_ids) < 2: raise RuntimeError(f"clobTokenIds missing/too short: {token_ids}") token_map = dict(zip(outcomes, token_ids)) up_id = token_map.get("Up") or token_map.get("Yes") or token_ids[0] dn_id = token_map.get("Down") or token_map.get("No") or token_ids[1] with ThreadPoolExecutor(max_workers=2) as ex: up = ex.submit(fetch_token_bundle, session, up_id, top_n).result() dn = ex.submit(fetch_token_bundle, session, dn_id, top_n).result() if up["mid"] is None or dn["mid"] is None: return { "error": "missing mid price", "slug": slug, "market_id": m.get("id"), "question": m.get("question"), "up": up, "down": dn, } s = float(up["mid"]) + float(dn["mid"]) up_pct = round(float(up["mid"]) / s * 100, decimals) dn_pct = round(float(dn["mid"]) / s * 100, decimals) return { "question": m.get("question"), "slug": m.get("slug"), "market_id": m.get("id"), "up_pct": up_pct, "down_pct": dn_pct, "up": up, "down": dn, } def fetch_current_market( session: requests.Session, slug_prefix: str = "eth-updown-15m", decimals: int = 0, top_n: int = 10, ts_mode: str = "floor", probe_offsets: Tuple[int, ...] = (0, -INTERVAL_SEC, INTERVAL_SEC), ) -> Dict[str, Any]: base_ts = current_bucket_ts(INTERVAL_SEC, mode=ts_mode) fetched_at = int(time.time()) last_err: Optional[Exception] = None for off in probe_offsets: bucket_ts = base_ts + off slug = build_slug(slug_prefix, bucket_ts) try: data = fetch_updown_by_slug(session, slug, decimals=decimals, top_n=top_n) data["bucket_ts"] = bucket_ts data["fetched_at"] = fetched_at return data except FileNotFoundError as e: last_err = e continue raise RuntimeError(f"Could not find market for current bucket. last_err={last_err}") # ---------------------------- # SQLite 数据库操作(添加币种字段) # ---------------------------- DDL = """ CREATE TABLE IF NOT EXISTS pm_snapshots ( crypto_symbol \ TEXT \ NOT \ NULL, bucket_ts \ INTEGER \ NOT \ NULL, fetched_at \ INTEGER \ NOT \ NULL, raw_json \ TEXT \ NOT \ NULL, PRIMARY \ KEY \ ( \ crypto_symbol, \ bucket_ts, \ fetched_at \ ) ); CREATE INDEX IF NOT EXISTS idx_pm_snapshots_crypto_fetched ON pm_snapshots(crypto_symbol, fetched_at); CREATE INDEX IF NOT EXISTS idx_pm_snapshots_fetched_at ON pm_snapshots(fetched_at); \ """ def _check_column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool: """检查表中是否存在指定列""" cursor = conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns def _migrate_database(conn: sqlite3.Connection) -> None: """迁移数据库:为旧表添加crypto_symbol列并更新结构""" cursor = conn.cursor() # 检查表是否存在 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='pm_snapshots'") if not cursor.fetchone(): return # 表不存在,无需迁移 # 检查是否需要迁移(是否存在crypto_symbol列) if _check_column_exists(conn, "pm_snapshots", "crypto_symbol"): return # 列已存在,无需迁移 print("检测到旧版数据库,开始迁移...") # 备份旧表 cursor.execute("CREATE TABLE IF NOT EXISTS pm_snapshots_backup AS SELECT * FROM pm_snapshots") print("已备份旧数据到 pm_snapshots_backup 表") # 删除旧表和索引 cursor.execute("DROP INDEX IF EXISTS idx_pm_snapshots_fetched_at") cursor.execute("DROP TABLE IF EXISTS pm_snapshots") # 创建新表结构 cursor.executescript(DDL) conn.commit() print("数据库迁移完成!旧数据已备份到 pm_snapshots_backup 表") def init_db(db_path: str = "polymarket.db") -> None: conn = sqlite3.connect(db_path) try: conn.execute("PRAGMA journal_mode=WAL;") # 先尝试迁移(如果需要) _migrate_database(conn) # 执行DDL确保表结构正确 conn.executescript(DDL) conn.commit() finally: conn.close() def save_snapshot(crypto_symbol: str, bucket_ts: int, fetched_at: int, raw: Dict[str, Any], db_path: str = "polymarket.db") -> None: conn = sqlite3.connect(db_path) try: conn.execute("PRAGMA journal_mode=WAL;") conn.execute( "INSERT OR IGNORE INTO pm_snapshots(crypto_symbol, bucket_ts, fetched_at, raw_json) VALUES (?, ?, ?, ?)", (crypto_symbol, int(bucket_ts), int(fetched_at), json.dumps(raw, ensure_ascii=False)), ) conn.commit() finally: conn.close() def fetch_crypto_worker(crypto_config: Dict[str, str], db_path: str, stop_event: threading.Event): """为单个币种创建独立的工作线程,每0.5秒获取一次数据""" symbol = crypto_config["symbol"] slug_prefix = crypto_config["slug_prefix"] # 为每个币种创建独立的session以提高性能 session = make_session() print(f"[{symbol.upper()}] 开始抓取数据...") while not stop_event.is_set(): try: start_time = time.time() data = fetch_current_market( session=session, slug_prefix=slug_prefix, decimals=0, top_n=10, ts_mode="floor", probe_offsets=(0, -900, 900), ) save_snapshot(symbol, data["bucket_ts"], data["fetched_at"], data, db_path) elapsed = time.time() - start_time print( f"[{symbol.upper()}] 已保存: bucket_ts={data['bucket_ts']}, fetched_at={data['fetched_at']}, 耗时={elapsed:.2f}秒") # 计算需要等待的时间,确保每0.5秒执行一次 sleep_time = max(0, FETCH_INTERVAL - elapsed) time.sleep(sleep_time) except Exception as e: print(f"[{symbol.upper()}] 错误: {e}") # 出错时也等待一段时间,避免频繁重试 time.sleep(0.5) def main(): """主函数:启动多个线程并行抓取所有币种数据""" db_path = "polymarket.db" init_db(db_path) stop_event = threading.Event() threads: List[threading.Thread] = [] print("=" * 60) print(f"启动多币种数据抓取程序") print(f"币种数量: {len(CRYPTOS)}") print(f"抓取频率: 每 {FETCH_INTERVAL} 秒一次(每秒 {1 / FETCH_INTERVAL} 次)") print("=" * 60) try: # 为每个币种创建一个独立的工作线程 for crypto_config in CRYPTOS: thread = threading.Thread( target=fetch_crypto_worker, args=(crypto_config, db_path, stop_event), daemon=True ) thread.start() threads.append(thread) print(f"[{crypto_config['symbol'].upper()}] 线程已启动") # 等待所有线程 for thread in threads: thread.join() except KeyboardInterrupt: print("\n收到中断信号,正在停止...") stop_event.set() for thread in threads: thread.join(timeout=2) print("程序已停止") if __name__ == "__main__": main()