diff --git a/polymarket.db b/polymarket.db index 264629690..73446f751 100644 Binary files a/polymarket.db and b/polymarket.db differ diff --git a/polymarket.db-shm b/polymarket.db-shm deleted file mode 100644 index 40713ef1a..000000000 Binary files a/polymarket.db-shm and /dev/null differ diff --git a/polymarket.db-wal b/polymarket.db-wal deleted file mode 100644 index 1357a5e64..000000000 Binary files a/polymarket.db-wal and /dev/null differ diff --git a/text111.py b/text111.py new file mode 100644 index 000000000..50c4e948a --- /dev/null +++ b/text111.py @@ -0,0 +1,380 @@ +import json +import random +import time +import sqlite3 +import requests +import threading +from typing import Any, Dict, Optional, Tuple, List +from concurrent.futures import ThreadPoolExecutor, as_completed +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +GAMMA = "https://gamma-api.polymarket.com" +CLOB = "https://clob.polymarket.com" +INTERVAL_SEC = 15 * 60 # 900 +FETCH_INTERVAL = 0.5 # 每0.5秒获取一次,即每秒2次 + +# 支持的币种列表 +CRYPTOS = [ + {"symbol": "btc", "slug_prefix": "btc-updown-15m"}, + {"symbol": "sol", "slug_prefix": "sol-updown-15m"}, + {"symbol": "eth", "slug_prefix": "eth-updown-15m"}, + {"symbol": "xrp", "slug_prefix": "xrp-updown-15m"}, +] + + +def make_session() -> requests.Session: + """创建带有重试机制的HTTP会话""" + s = requests.Session() + + proxies = { + 'http': f'http://127.0.0.1:{random.randint(20001, 25000)}', + 'https': f'http://127.0.0.1:{random.randint(20001, 25000)}', + } + + s.proxies.update(proxies) + + s.headers.update({"User-Agent": "Mozilla/5.0"}) + retry = Retry( + total=5, + backoff_factor=0.4, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=("GET",), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retry, pool_connections=50, pool_maxsize=50) + s.mount("https://", adapter) + s.mount("http://", adapter) + return s + + +def parse_jsonish_list(v): + if v is None: + return [] + if isinstance(v, list): + return v + if isinstance(v, str): + s = v.strip() + if s.startswith("[") and s.endswith("]"): + return json.loads(s) + return [x.strip() for x in s.split(",") if x.strip()] + return [] + + +def get_market_by_slug(session: requests.Session, slug: str) -> Dict[str, Any]: + r = session.get(f"{GAMMA}/markets/slug/{slug}", timeout=20) + if r.status_code == 404: + raise FileNotFoundError(slug) + r.raise_for_status() + return r.json() + + +def get_price(session: requests.Session, token_id: str, side: str) -> Optional[float]: + r = session.get(f"{CLOB}/price", params={"token_id": token_id, "side": side}, timeout=20) + r.raise_for_status() + p = r.json().get("price") + return float(p) if p is not None else None + + +def get_book(session: requests.Session, token_id: str) -> Dict[str, Any]: + r = session.get(f"{CLOB}/book", params={"token_id": token_id}, timeout=20) + r.raise_for_status() + return r.json() + + +def compute_mid(bid: Optional[float], ask: Optional[float]) -> Tuple[Optional[float], Optional[float]]: + if bid is None and ask is None: + return None, None + if bid is None: + return ask, None + if ask is None: + return bid, None + return (bid + ask) / 2.0, (ask - bid) + + +def trim_book(book: Dict[str, Any], top_n: int = 10) -> Dict[str, Any]: + bids = book.get("bids") or [] + asks = book.get("asks") or [] + bids = sorted(bids, key=lambda x: float(x["price"]), reverse=True)[:top_n] + asks = sorted(asks, key=lambda x: float(x["price"]))[:top_n] + return {"bids": bids, "asks": asks} + + +def fetch_token_bundle(session: requests.Session, token_id: str, top_n: int = 10) -> Dict[str, Any]: + with ThreadPoolExecutor(max_workers=3) as ex: + futs = { + ex.submit(get_price, session, token_id, "buy"): "bid", + ex.submit(get_price, session, token_id, "sell"): "ask", + ex.submit(get_book, session, token_id): "book", + } + out: Dict[str, Any] = {"token_id": token_id} + for fut in as_completed(futs): + out[futs[fut]] = fut.result() + + mid, spread = compute_mid(out.get("bid"), out.get("ask")) + out["mid"] = mid + out["spread"] = spread + out["book"] = trim_book(out.get("book") or {}, top_n=top_n) + return out + + +def current_bucket_ts(interval_sec: int = INTERVAL_SEC, mode: str = "floor") -> int: + now = int(time.time()) + if mode == "ceil": + return ((now + interval_sec - 1) // interval_sec) * interval_sec + return (now // interval_sec) * interval_sec + + +def build_slug(prefix: str, bucket_ts: int) -> str: + return f"{prefix}-{bucket_ts}" + + +def fetch_updown_by_slug(session: requests.Session, slug: str, decimals: int = 0, top_n: int = 10) -> Dict[str, Any]: + m = get_market_by_slug(session, slug) + outcomes = [str(x) for x in parse_jsonish_list(m.get("outcomes"))] + token_ids = [str(x) for x in parse_jsonish_list(m.get("clobTokenIds"))] + if len(token_ids) < 2: + raise RuntimeError(f"clobTokenIds missing/too short: {token_ids}") + + token_map = dict(zip(outcomes, token_ids)) + up_id = token_map.get("Up") or token_map.get("Yes") or token_ids[0] + dn_id = token_map.get("Down") or token_map.get("No") or token_ids[1] + + with ThreadPoolExecutor(max_workers=2) as ex: + up = ex.submit(fetch_token_bundle, session, up_id, top_n).result() + dn = ex.submit(fetch_token_bundle, session, dn_id, top_n).result() + + if up["mid"] is None or dn["mid"] is None: + return { + "error": "missing mid price", + "slug": slug, + "market_id": m.get("id"), + "question": m.get("question"), + "up": up, + "down": dn, + } + + s = float(up["mid"]) + float(dn["mid"]) + up_pct = round(float(up["mid"]) / s * 100, decimals) + dn_pct = round(float(dn["mid"]) / s * 100, decimals) + + return { + "question": m.get("question"), + "slug": m.get("slug"), + "market_id": m.get("id"), + "up_pct": up_pct, + "down_pct": dn_pct, + "up": up, + "down": dn, + } + + +def fetch_current_market( + session: requests.Session, + slug_prefix: str = "eth-updown-15m", + decimals: int = 0, + top_n: int = 10, + ts_mode: str = "floor", + probe_offsets: Tuple[int, ...] = (0, -INTERVAL_SEC, INTERVAL_SEC), +) -> Dict[str, Any]: + base_ts = current_bucket_ts(INTERVAL_SEC, mode=ts_mode) + fetched_at = int(time.time()) + + last_err: Optional[Exception] = None + for off in probe_offsets: + bucket_ts = base_ts + off + slug = build_slug(slug_prefix, bucket_ts) + try: + data = fetch_updown_by_slug(session, slug, decimals=decimals, top_n=top_n) + data["bucket_ts"] = bucket_ts + data["fetched_at"] = fetched_at + return data + except FileNotFoundError as e: + last_err = e + continue + + raise RuntimeError(f"Could not find market for current bucket. last_err={last_err}") + + +# ---------------------------- +# SQLite 数据库操作(添加币种字段) +# ---------------------------- +DDL = """ + CREATE TABLE IF NOT EXISTS pm_snapshots + ( + crypto_symbol \ + TEXT \ + NOT \ + NULL, + bucket_ts \ + INTEGER \ + NOT \ + NULL, + fetched_at \ + INTEGER \ + NOT \ + NULL, + raw_json \ + TEXT \ + NOT \ + NULL, + PRIMARY \ + KEY \ + ( \ + crypto_symbol, \ + bucket_ts, \ + fetched_at \ + ) + ); + + CREATE INDEX IF NOT EXISTS idx_pm_snapshots_crypto_fetched ON pm_snapshots(crypto_symbol, fetched_at); + CREATE INDEX IF NOT EXISTS idx_pm_snapshots_fetched_at ON pm_snapshots(fetched_at); \ + """ + + +def _check_column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool: + """检查表中是否存在指定列""" + cursor = conn.cursor() + cursor.execute(f"PRAGMA table_info({table_name})") + columns = [row[1] for row in cursor.fetchall()] + return column_name in columns + + +def _migrate_database(conn: sqlite3.Connection) -> None: + """迁移数据库:为旧表添加crypto_symbol列并更新结构""" + cursor = conn.cursor() + + # 检查表是否存在 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='pm_snapshots'") + if not cursor.fetchone(): + return # 表不存在,无需迁移 + + # 检查是否需要迁移(是否存在crypto_symbol列) + if _check_column_exists(conn, "pm_snapshots", "crypto_symbol"): + return # 列已存在,无需迁移 + + print("检测到旧版数据库,开始迁移...") + + # 备份旧表 + cursor.execute("CREATE TABLE IF NOT EXISTS pm_snapshots_backup AS SELECT * FROM pm_snapshots") + print("已备份旧数据到 pm_snapshots_backup 表") + + # 删除旧表和索引 + cursor.execute("DROP INDEX IF EXISTS idx_pm_snapshots_fetched_at") + cursor.execute("DROP TABLE IF EXISTS pm_snapshots") + + # 创建新表结构 + cursor.executescript(DDL) + conn.commit() + + print("数据库迁移完成!旧数据已备份到 pm_snapshots_backup 表") + + +def init_db(db_path: str = "polymarket.db") -> None: + conn = sqlite3.connect(db_path) + try: + conn.execute("PRAGMA journal_mode=WAL;") + + # 先尝试迁移(如果需要) + _migrate_database(conn) + + # 执行DDL确保表结构正确 + conn.executescript(DDL) + conn.commit() + finally: + conn.close() + + +def save_snapshot(crypto_symbol: str, bucket_ts: int, fetched_at: int, raw: Dict[str, Any], + db_path: str = "polymarket.db") -> None: + conn = sqlite3.connect(db_path) + try: + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute( + "INSERT OR IGNORE INTO pm_snapshots(crypto_symbol, bucket_ts, fetched_at, raw_json) VALUES (?, ?, ?, ?)", + (crypto_symbol, int(bucket_ts), int(fetched_at), json.dumps(raw, ensure_ascii=False)), + ) + conn.commit() + finally: + conn.close() + + +def fetch_crypto_worker(crypto_config: Dict[str, str], db_path: str, stop_event: threading.Event): + """为单个币种创建独立的工作线程,每0.5秒获取一次数据""" + symbol = crypto_config["symbol"] + slug_prefix = crypto_config["slug_prefix"] + + # 为每个币种创建独立的session以提高性能 + session = make_session() + + print(f"[{symbol.upper()}] 开始抓取数据...") + + while not stop_event.is_set(): + try: + start_time = time.time() + + data = fetch_current_market( + session=session, + slug_prefix=slug_prefix, + decimals=0, + top_n=10, + ts_mode="floor", + probe_offsets=(0, -900, 900), + ) + + save_snapshot(symbol, data["bucket_ts"], data["fetched_at"], data, db_path) + + elapsed = time.time() - start_time + print( + f"[{symbol.upper()}] 已保存: bucket_ts={data['bucket_ts']}, fetched_at={data['fetched_at']}, 耗时={elapsed:.2f}秒") + + # 计算需要等待的时间,确保每0.5秒执行一次 + sleep_time = max(0, FETCH_INTERVAL - elapsed) + time.sleep(sleep_time) + + except Exception as e: + print(f"[{symbol.upper()}] 错误: {e}") + # 出错时也等待一段时间,避免频繁重试 + time.sleep(0.5) + + +def main(): + """主函数:启动多个线程并行抓取所有币种数据""" + db_path = "polymarket.db" + init_db(db_path) + + stop_event = threading.Event() + threads: List[threading.Thread] = [] + + print("=" * 60) + print(f"启动多币种数据抓取程序") + print(f"币种数量: {len(CRYPTOS)}") + print(f"抓取频率: 每 {FETCH_INTERVAL} 秒一次(每秒 {1 / FETCH_INTERVAL} 次)") + print("=" * 60) + + try: + # 为每个币种创建一个独立的工作线程 + for crypto_config in CRYPTOS: + thread = threading.Thread( + target=fetch_crypto_worker, + args=(crypto_config, db_path, stop_event), + daemon=True + ) + thread.start() + threads.append(thread) + print(f"[{crypto_config['symbol'].upper()}] 线程已启动") + + # 等待所有线程 + for thread in threads: + thread.join() + + except KeyboardInterrupt: + print("\n收到中断信号,正在停止...") + stop_event.set() + for thread in threads: + thread.join(timeout=2) + print("程序已停止") + + +if __name__ == "__main__": + main()