This commit is contained in:
27942
2025-12-30 13:29:03 +08:00
parent 70903f2472
commit 004a48d990
4 changed files with 380 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

380
text111.py Normal file
View File

@@ -0,0 +1,380 @@
import json
import random
import time
import sqlite3
import requests
import threading
from typing import Any, Dict, Optional, Tuple, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
GAMMA = "https://gamma-api.polymarket.com"
CLOB = "https://clob.polymarket.com"
INTERVAL_SEC = 15 * 60 # 900
FETCH_INTERVAL = 0.5 # 每0.5秒获取一次即每秒2次
# 支持的币种列表
CRYPTOS = [
{"symbol": "btc", "slug_prefix": "btc-updown-15m"},
{"symbol": "sol", "slug_prefix": "sol-updown-15m"},
{"symbol": "eth", "slug_prefix": "eth-updown-15m"},
{"symbol": "xrp", "slug_prefix": "xrp-updown-15m"},
]
def make_session() -> requests.Session:
"""创建带有重试机制的HTTP会话"""
s = requests.Session()
proxies = {
'http': f'http://127.0.0.1:{random.randint(20001, 25000)}',
'https': f'http://127.0.0.1:{random.randint(20001, 25000)}',
}
s.proxies.update(proxies)
s.headers.update({"User-Agent": "Mozilla/5.0"})
retry = Retry(
total=5,
backoff_factor=0.4,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=("GET",),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=50, pool_maxsize=50)
s.mount("https://", adapter)
s.mount("http://", adapter)
return s
def parse_jsonish_list(v):
if v is None:
return []
if isinstance(v, list):
return v
if isinstance(v, str):
s = v.strip()
if s.startswith("[") and s.endswith("]"):
return json.loads(s)
return [x.strip() for x in s.split(",") if x.strip()]
return []
def get_market_by_slug(session: requests.Session, slug: str) -> Dict[str, Any]:
r = session.get(f"{GAMMA}/markets/slug/{slug}", timeout=20)
if r.status_code == 404:
raise FileNotFoundError(slug)
r.raise_for_status()
return r.json()
def get_price(session: requests.Session, token_id: str, side: str) -> Optional[float]:
r = session.get(f"{CLOB}/price", params={"token_id": token_id, "side": side}, timeout=20)
r.raise_for_status()
p = r.json().get("price")
return float(p) if p is not None else None
def get_book(session: requests.Session, token_id: str) -> Dict[str, Any]:
r = session.get(f"{CLOB}/book", params={"token_id": token_id}, timeout=20)
r.raise_for_status()
return r.json()
def compute_mid(bid: Optional[float], ask: Optional[float]) -> Tuple[Optional[float], Optional[float]]:
if bid is None and ask is None:
return None, None
if bid is None:
return ask, None
if ask is None:
return bid, None
return (bid + ask) / 2.0, (ask - bid)
def trim_book(book: Dict[str, Any], top_n: int = 10) -> Dict[str, Any]:
bids = book.get("bids") or []
asks = book.get("asks") or []
bids = sorted(bids, key=lambda x: float(x["price"]), reverse=True)[:top_n]
asks = sorted(asks, key=lambda x: float(x["price"]))[:top_n]
return {"bids": bids, "asks": asks}
def fetch_token_bundle(session: requests.Session, token_id: str, top_n: int = 10) -> Dict[str, Any]:
with ThreadPoolExecutor(max_workers=3) as ex:
futs = {
ex.submit(get_price, session, token_id, "buy"): "bid",
ex.submit(get_price, session, token_id, "sell"): "ask",
ex.submit(get_book, session, token_id): "book",
}
out: Dict[str, Any] = {"token_id": token_id}
for fut in as_completed(futs):
out[futs[fut]] = fut.result()
mid, spread = compute_mid(out.get("bid"), out.get("ask"))
out["mid"] = mid
out["spread"] = spread
out["book"] = trim_book(out.get("book") or {}, top_n=top_n)
return out
def current_bucket_ts(interval_sec: int = INTERVAL_SEC, mode: str = "floor") -> int:
now = int(time.time())
if mode == "ceil":
return ((now + interval_sec - 1) // interval_sec) * interval_sec
return (now // interval_sec) * interval_sec
def build_slug(prefix: str, bucket_ts: int) -> str:
return f"{prefix}-{bucket_ts}"
def fetch_updown_by_slug(session: requests.Session, slug: str, decimals: int = 0, top_n: int = 10) -> Dict[str, Any]:
m = get_market_by_slug(session, slug)
outcomes = [str(x) for x in parse_jsonish_list(m.get("outcomes"))]
token_ids = [str(x) for x in parse_jsonish_list(m.get("clobTokenIds"))]
if len(token_ids) < 2:
raise RuntimeError(f"clobTokenIds missing/too short: {token_ids}")
token_map = dict(zip(outcomes, token_ids))
up_id = token_map.get("Up") or token_map.get("Yes") or token_ids[0]
dn_id = token_map.get("Down") or token_map.get("No") or token_ids[1]
with ThreadPoolExecutor(max_workers=2) as ex:
up = ex.submit(fetch_token_bundle, session, up_id, top_n).result()
dn = ex.submit(fetch_token_bundle, session, dn_id, top_n).result()
if up["mid"] is None or dn["mid"] is None:
return {
"error": "missing mid price",
"slug": slug,
"market_id": m.get("id"),
"question": m.get("question"),
"up": up,
"down": dn,
}
s = float(up["mid"]) + float(dn["mid"])
up_pct = round(float(up["mid"]) / s * 100, decimals)
dn_pct = round(float(dn["mid"]) / s * 100, decimals)
return {
"question": m.get("question"),
"slug": m.get("slug"),
"market_id": m.get("id"),
"up_pct": up_pct,
"down_pct": dn_pct,
"up": up,
"down": dn,
}
def fetch_current_market(
session: requests.Session,
slug_prefix: str = "eth-updown-15m",
decimals: int = 0,
top_n: int = 10,
ts_mode: str = "floor",
probe_offsets: Tuple[int, ...] = (0, -INTERVAL_SEC, INTERVAL_SEC),
) -> Dict[str, Any]:
base_ts = current_bucket_ts(INTERVAL_SEC, mode=ts_mode)
fetched_at = int(time.time())
last_err: Optional[Exception] = None
for off in probe_offsets:
bucket_ts = base_ts + off
slug = build_slug(slug_prefix, bucket_ts)
try:
data = fetch_updown_by_slug(session, slug, decimals=decimals, top_n=top_n)
data["bucket_ts"] = bucket_ts
data["fetched_at"] = fetched_at
return data
except FileNotFoundError as e:
last_err = e
continue
raise RuntimeError(f"Could not find market for current bucket. last_err={last_err}")
# ----------------------------
# SQLite 数据库操作(添加币种字段)
# ----------------------------
DDL = """
CREATE TABLE IF NOT EXISTS pm_snapshots
(
crypto_symbol \
TEXT \
NOT \
NULL,
bucket_ts \
INTEGER \
NOT \
NULL,
fetched_at \
INTEGER \
NOT \
NULL,
raw_json \
TEXT \
NOT \
NULL,
PRIMARY \
KEY \
( \
crypto_symbol, \
bucket_ts, \
fetched_at \
)
);
CREATE INDEX IF NOT EXISTS idx_pm_snapshots_crypto_fetched ON pm_snapshots(crypto_symbol, fetched_at);
CREATE INDEX IF NOT EXISTS idx_pm_snapshots_fetched_at ON pm_snapshots(fetched_at); \
"""
def _check_column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool:
"""检查表中是否存在指定列"""
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _migrate_database(conn: sqlite3.Connection) -> None:
"""迁移数据库为旧表添加crypto_symbol列并更新结构"""
cursor = conn.cursor()
# 检查表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='pm_snapshots'")
if not cursor.fetchone():
return # 表不存在,无需迁移
# 检查是否需要迁移是否存在crypto_symbol列
if _check_column_exists(conn, "pm_snapshots", "crypto_symbol"):
return # 列已存在,无需迁移
print("检测到旧版数据库,开始迁移...")
# 备份旧表
cursor.execute("CREATE TABLE IF NOT EXISTS pm_snapshots_backup AS SELECT * FROM pm_snapshots")
print("已备份旧数据到 pm_snapshots_backup 表")
# 删除旧表和索引
cursor.execute("DROP INDEX IF EXISTS idx_pm_snapshots_fetched_at")
cursor.execute("DROP TABLE IF EXISTS pm_snapshots")
# 创建新表结构
cursor.executescript(DDL)
conn.commit()
print("数据库迁移完成!旧数据已备份到 pm_snapshots_backup 表")
def init_db(db_path: str = "polymarket.db") -> None:
conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL;")
# 先尝试迁移(如果需要)
_migrate_database(conn)
# 执行DDL确保表结构正确
conn.executescript(DDL)
conn.commit()
finally:
conn.close()
def save_snapshot(crypto_symbol: str, bucket_ts: int, fetched_at: int, raw: Dict[str, Any],
db_path: str = "polymarket.db") -> None:
conn = sqlite3.connect(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute(
"INSERT OR IGNORE INTO pm_snapshots(crypto_symbol, bucket_ts, fetched_at, raw_json) VALUES (?, ?, ?, ?)",
(crypto_symbol, int(bucket_ts), int(fetched_at), json.dumps(raw, ensure_ascii=False)),
)
conn.commit()
finally:
conn.close()
def fetch_crypto_worker(crypto_config: Dict[str, str], db_path: str, stop_event: threading.Event):
"""为单个币种创建独立的工作线程每0.5秒获取一次数据"""
symbol = crypto_config["symbol"]
slug_prefix = crypto_config["slug_prefix"]
# 为每个币种创建独立的session以提高性能
session = make_session()
print(f"[{symbol.upper()}] 开始抓取数据...")
while not stop_event.is_set():
try:
start_time = time.time()
data = fetch_current_market(
session=session,
slug_prefix=slug_prefix,
decimals=0,
top_n=10,
ts_mode="floor",
probe_offsets=(0, -900, 900),
)
save_snapshot(symbol, data["bucket_ts"], data["fetched_at"], data, db_path)
elapsed = time.time() - start_time
print(
f"[{symbol.upper()}] 已保存: bucket_ts={data['bucket_ts']}, fetched_at={data['fetched_at']}, 耗时={elapsed:.2f}")
# 计算需要等待的时间确保每0.5秒执行一次
sleep_time = max(0, FETCH_INTERVAL - elapsed)
time.sleep(sleep_time)
except Exception as e:
print(f"[{symbol.upper()}] 错误: {e}")
# 出错时也等待一段时间,避免频繁重试
time.sleep(0.5)
def main():
"""主函数:启动多个线程并行抓取所有币种数据"""
db_path = "polymarket.db"
init_db(db_path)
stop_event = threading.Event()
threads: List[threading.Thread] = []
print("=" * 60)
print(f"启动多币种数据抓取程序")
print(f"币种数量: {len(CRYPTOS)}")
print(f"抓取频率: 每 {FETCH_INTERVAL} 秒一次(每秒 {1 / FETCH_INTERVAL} 次)")
print("=" * 60)
try:
# 为每个币种创建一个独立的工作线程
for crypto_config in CRYPTOS:
thread = threading.Thread(
target=fetch_crypto_worker,
args=(crypto_config, db_path, stop_event),
daemon=True
)
thread.start()
threads.append(thread)
print(f"[{crypto_config['symbol'].upper()}] 线程已启动")
# 等待所有线程
for thread in threads:
thread.join()
except KeyboardInterrupt:
print("\n收到中断信号,正在停止...")
stop_event.set()
for thread in threads:
thread.join(timeout=2)
print("程序已停止")
if __name__ == "__main__":
main()