247 lines
7.6 KiB
Python
247 lines
7.6 KiB
Python
import json
|
|
import time
|
|
import sqlite3
|
|
import requests
|
|
from typing import Any, Dict, Optional, Tuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
GAMMA = "https://gamma-api.polymarket.com"
|
|
CLOB = "https://clob.polymarket.com"
|
|
INTERVAL_SEC = 15 * 60 # 900
|
|
|
|
|
|
def make_session() -> requests.Session:
|
|
s = requests.Session()
|
|
s.headers.update({"User-Agent": "Mozilla/5.0"})
|
|
retry = Retry(
|
|
total=5,
|
|
backoff_factor=0.4,
|
|
status_forcelist=(429, 500, 502, 503, 504),
|
|
allowed_methods=("GET",),
|
|
raise_on_status=False,
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
|
|
s.mount("https://", adapter)
|
|
s.mount("http://", adapter)
|
|
return s
|
|
|
|
|
|
def parse_jsonish_list(v):
|
|
if v is None:
|
|
return []
|
|
if isinstance(v, list):
|
|
return v
|
|
if isinstance(v, str):
|
|
s = v.strip()
|
|
if s.startswith("[") and s.endswith("]"):
|
|
return json.loads(s)
|
|
return [x.strip() for x in s.split(",") if x.strip()]
|
|
return []
|
|
|
|
|
|
def get_market_by_slug(session: requests.Session, slug: str) -> Dict[str, Any]:
|
|
r = session.get(f"{GAMMA}/markets/slug/{slug}", timeout=20)
|
|
if r.status_code == 404:
|
|
raise FileNotFoundError(slug)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def get_price(session: requests.Session, token_id: str, side: str) -> Optional[float]:
|
|
r = session.get(f"{CLOB}/price", params={"token_id": token_id, "side": side}, timeout=20)
|
|
r.raise_for_status()
|
|
p = r.json().get("price")
|
|
return float(p) if p is not None else None
|
|
|
|
|
|
def get_book(session: requests.Session, token_id: str) -> Dict[str, Any]:
|
|
r = session.get(f"{CLOB}/book", params={"token_id": token_id}, timeout=20)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def compute_mid(bid: Optional[float], ask: Optional[float]) -> Tuple[Optional[float], Optional[float]]:
|
|
if bid is None and ask is None:
|
|
return None, None
|
|
if bid is None:
|
|
return ask, None
|
|
if ask is None:
|
|
return bid, None
|
|
return (bid + ask) / 2.0, (ask - bid)
|
|
|
|
|
|
def trim_book(book: Dict[str, Any], top_n: int = 10) -> Dict[str, Any]:
|
|
bids = book.get("bids") or []
|
|
asks = book.get("asks") or []
|
|
bids = sorted(bids, key=lambda x: float(x["price"]), reverse=True)[:top_n]
|
|
asks = sorted(asks, key=lambda x: float(x["price"]))[:top_n]
|
|
return {"bids": bids, "asks": asks}
|
|
|
|
|
|
def fetch_token_bundle(session: requests.Session, token_id: str, top_n: int = 10) -> Dict[str, Any]:
|
|
with ThreadPoolExecutor(max_workers=3) as ex:
|
|
futs = {
|
|
ex.submit(get_price, session, token_id, "buy"): "bid",
|
|
ex.submit(get_price, session, token_id, "sell"): "ask",
|
|
ex.submit(get_book, session, token_id): "book",
|
|
}
|
|
out: Dict[str, Any] = {"token_id": token_id}
|
|
for fut in as_completed(futs):
|
|
out[futs[fut]] = fut.result()
|
|
|
|
mid, spread = compute_mid(out.get("bid"), out.get("ask"))
|
|
out["mid"] = mid
|
|
out["spread"] = spread
|
|
out["book"] = trim_book(out.get("book") or {}, top_n=top_n)
|
|
return out
|
|
|
|
|
|
def current_bucket_ts(interval_sec: int = INTERVAL_SEC, mode: str = "floor") -> int:
|
|
now = int(time.time())
|
|
if mode == "ceil":
|
|
return ((now + interval_sec - 1) // interval_sec) * interval_sec
|
|
return (now // interval_sec) * interval_sec
|
|
|
|
|
|
def build_slug(prefix: str, bucket_ts: int) -> str:
|
|
return f"{prefix}-{bucket_ts}"
|
|
|
|
|
|
def fetch_updown_by_slug(session: requests.Session, slug: str, decimals: int = 0, top_n: int = 10) -> Dict[str, Any]:
|
|
m = get_market_by_slug(session, slug)
|
|
outcomes = [str(x) for x in parse_jsonish_list(m.get("outcomes"))]
|
|
token_ids = [str(x) for x in parse_jsonish_list(m.get("clobTokenIds"))]
|
|
if len(token_ids) < 2:
|
|
raise RuntimeError(f"clobTokenIds missing/too short: {token_ids}")
|
|
|
|
token_map = dict(zip(outcomes, token_ids))
|
|
up_id = token_map.get("Up") or token_map.get("Yes") or token_ids[0]
|
|
dn_id = token_map.get("Down") or token_map.get("No") or token_ids[1]
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as ex:
|
|
up = ex.submit(fetch_token_bundle, session, up_id, top_n).result()
|
|
dn = ex.submit(fetch_token_bundle, session, dn_id, top_n).result()
|
|
|
|
if up["mid"] is None or dn["mid"] is None:
|
|
return {
|
|
"error": "missing mid price",
|
|
"slug": slug,
|
|
"market_id": m.get("id"),
|
|
"question": m.get("question"),
|
|
"up": up,
|
|
"down": dn,
|
|
}
|
|
|
|
s = float(up["mid"]) + float(dn["mid"])
|
|
up_pct = round(float(up["mid"]) / s * 100, decimals)
|
|
dn_pct = round(float(dn["mid"]) / s * 100, decimals)
|
|
|
|
return {
|
|
"question": m.get("question"),
|
|
"slug": m.get("slug"),
|
|
"market_id": m.get("id"),
|
|
"up_pct": up_pct,
|
|
"down_pct": dn_pct,
|
|
"up": up,
|
|
"down": dn,
|
|
}
|
|
|
|
|
|
def fetch_current_market(
|
|
slug_prefix: str = "eth-updown-15m",
|
|
decimals: int = 0,
|
|
top_n: int = 10,
|
|
ts_mode: str = "floor",
|
|
probe_offsets: Tuple[int, ...] = (0, -INTERVAL_SEC, INTERVAL_SEC),
|
|
) -> Dict[str, Any]:
|
|
session = make_session()
|
|
base_ts = current_bucket_ts(INTERVAL_SEC, mode=ts_mode)
|
|
fetched_at = int(time.time()) # 当前抓取时间戳
|
|
|
|
last_err: Optional[Exception] = None
|
|
for off in probe_offsets:
|
|
bucket_ts = base_ts + off
|
|
slug = build_slug(slug_prefix, bucket_ts)
|
|
try:
|
|
data = fetch_updown_by_slug(session, slug, decimals=decimals, top_n=top_n)
|
|
data["bucket_ts"] = bucket_ts
|
|
data["fetched_at"] = fetched_at
|
|
return data
|
|
except FileNotFoundError as e:
|
|
last_err = e
|
|
continue
|
|
|
|
raise RuntimeError(f"Could not find market for current bucket. last_err={last_err}")
|
|
|
|
|
|
# ----------------------------
|
|
# SQLite (3 columns)
|
|
# ----------------------------
|
|
DDL = """
|
|
CREATE TABLE IF NOT EXISTS pm_snapshots
|
|
(
|
|
bucket_ts
|
|
INTEGER
|
|
NOT
|
|
NULL,
|
|
fetched_at
|
|
INTEGER
|
|
NOT
|
|
NULL,
|
|
raw_json
|
|
TEXT
|
|
NOT
|
|
NULL,
|
|
PRIMARY
|
|
KEY
|
|
(
|
|
bucket_ts,
|
|
fetched_at
|
|
)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_pm_snapshots_fetched_at ON pm_snapshots(fetched_at); \
|
|
"""
|
|
|
|
|
|
def init_db(db_path: str = "polymarket.db") -> None:
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
conn.executescript(DDL)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def save_snapshot(bucket_ts: int, fetched_at: int, raw: Dict[str, Any], db_path: str = "polymarket.db") -> None:
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO pm_snapshots(bucket_ts, fetched_at, raw_json) VALUES (?, ?, ?)",
|
|
(int(bucket_ts), int(fetched_at), json.dumps(raw, ensure_ascii=False)),
|
|
)
|
|
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_db("polymarket.db")
|
|
|
|
while True:
|
|
data = fetch_current_market(
|
|
slug_prefix="eth-updown-15m",
|
|
decimals=0,
|
|
top_n=10,
|
|
ts_mode="floor", # 如果你发现应该取下一档,改成 'ceil'
|
|
probe_offsets=(0, -900, 900), # 先试当前,再试前一档/后一档
|
|
)
|
|
|
|
save_snapshot(data["bucket_ts"], data["fetched_at"], data, "polymarket.db")
|
|
print("saved:", data["bucket_ts"], data["fetched_at"])
|