390 lines
14 KiB
Python
390 lines
14 KiB
Python
import os
|
|
import time
|
|
import random
|
|
import string
|
|
import hashlib
|
|
import requests
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
# -------------------------
|
|
# Config
|
|
# -------------------------
|
|
@dataclass
|
|
class BotConfig:
|
|
symbol: str = "ETH-USDT"
|
|
quote_offset: float = 0.0006
|
|
refresh_threshold: float = 0.0005
|
|
amount: float = 1.0
|
|
lever_rate: int = 3
|
|
is_full: int = 1
|
|
depth_limit: int = 20
|
|
loop_interval: float = 2.0
|
|
dry_run: bool = True
|
|
max_net_pos_contracts: float = 5.0
|
|
min_spread_frac: float = 0.00015
|
|
max_spread_frac: float = 0.0030
|
|
close_threshold_frac: float = 0.8 # New: Fraction of max_net to trigger closing
|
|
close_offset: float = 0.001 # New: Offset for closing prices
|
|
|
|
|
|
# -------------------------
|
|
# Helper Functions
|
|
# -------------------------
|
|
def best_bid_ask(depth: Dict[str, Any]) -> Tuple[float, float]:
|
|
"""Extract best bid and ask from depth data."""
|
|
result = depth.get("result", {})
|
|
bids = result.get("bids", []) # Assuming list of dicts: [{'price': str, 'number': str}, ...], bids descending
|
|
asks = result.get("asks", []) # asks ascending
|
|
if not bids or not asks:
|
|
raise ValueError("Invalid depth data")
|
|
best_bid = float(bids[0]["price"])
|
|
best_ask = float(asks[0]["price"])
|
|
return best_bid, best_ask
|
|
|
|
|
|
def compute_net_position(pos_data: Dict[str, Any], symbol: str) -> float:
|
|
"""Compute net position contracts (long - short) for the symbol."""
|
|
result = pos_data.get("result", [])
|
|
net = 0.0
|
|
for p in result:
|
|
if p.get("symbol") == symbol:
|
|
side = p.get("side", "").lower() # Assuming 'buy' or 'long' for long, 'sell' or 'short' for short
|
|
amount = float(p.get("hold_vol", 0) or p.get("position", 0)) # Common field names
|
|
if "buy" in side or "long" in side:
|
|
net += amount
|
|
elif "sell" in side or "short" in side:
|
|
net -= amount
|
|
return net
|
|
|
|
|
|
def classify_current_orders(cur_data: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Classify current orders into buy_orders and sell_orders."""
|
|
orders = cur_data.get("result", {}).get("list", [])
|
|
buy_orders = [o for o in orders if o.get("type", "").lower().startswith("buy")]
|
|
sell_orders = [o for o in orders if o.get("type", "").lower().startswith("sell")]
|
|
return buy_orders, sell_orders
|
|
|
|
|
|
# -------------------------
|
|
# Websea REST Client (optimized)
|
|
# -------------------------
|
|
class WebseaAPIError(RuntimeError):
|
|
def __init__(self, errno: int, errmsg: str, payload: Dict[str, Any]):
|
|
super().__init__(f"Websea API error: errno={errno} errmsg={errmsg}")
|
|
self.errno = errno
|
|
self.errmsg = errmsg
|
|
self.payload = payload
|
|
|
|
|
|
class WebseaClient:
|
|
def __init__(self, token: str, secret: str, base_url: str = "https://coapi.websea.com", timeout: int = 15):
|
|
self.token = token.strip()
|
|
self.secret = secret.strip()
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
|
|
@staticmethod
|
|
def _nonce() -> str:
|
|
ts = str(int(time.time()))
|
|
rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
|
|
return f"{ts}_{rand}"
|
|
|
|
@staticmethod
|
|
def _stringify_params(d: Optional[Dict[str, Any]]) -> Dict[str, str]:
|
|
"""Convert non-None values to str for consistent signing/parsing."""
|
|
if not d:
|
|
return {}
|
|
out: Dict[str, str] = {}
|
|
for k, v in d.items():
|
|
if v is None:
|
|
continue
|
|
if isinstance(v, bool):
|
|
out[k] = "1" if v else "0"
|
|
else:
|
|
out[k] = str(v)
|
|
return out
|
|
|
|
def _signature(self, nonce: str, params: Dict[str, str]) -> str:
|
|
tmp: List[str] = [self.token, self.secret, nonce]
|
|
for k, v in params.items():
|
|
tmp.append(f"{k}={v}")
|
|
tmp.sort()
|
|
payload = "".join(tmp)
|
|
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
data: Optional[Dict[str, Any]] = None,
|
|
auth: bool = False,
|
|
) -> Dict[str, Any]:
|
|
method = method.upper()
|
|
url = f"{self.base_url}{path}"
|
|
|
|
q = self._stringify_params(params)
|
|
b = self._stringify_params(data)
|
|
|
|
sig_params: Dict[str, str] = {**q, **b}
|
|
|
|
headers: Dict[str, str] = {
|
|
"Content-Type": "application/x-www-form-urlencoded" if method in ("POST", "PUT", "DELETE") else "application/json"
|
|
}
|
|
|
|
if auth:
|
|
nonce = self._nonce()
|
|
signature = self._signature(nonce, sig_params)
|
|
headers.update({
|
|
"Token": self.token,
|
|
"Nonce": nonce,
|
|
"Signature": signature,
|
|
})
|
|
|
|
resp = self.session.request(
|
|
method=method,
|
|
url=url,
|
|
params=q if q and method == "GET" else None,
|
|
data=b if (method in ("POST", "PUT", "DELETE") and b) else None,
|
|
json=None if b else data, # Fallback if needed
|
|
timeout=self.timeout,
|
|
headers=headers,
|
|
)
|
|
try:
|
|
resp.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
print(f"HTTP Error: {e} - Response: {resp.text}")
|
|
raise
|
|
|
|
try:
|
|
payload = resp.json()
|
|
except ValueError:
|
|
print(f"Invalid JSON: {resp.text}")
|
|
raise
|
|
|
|
if isinstance(payload, dict):
|
|
errno = payload.get("errno")
|
|
if errno not in (None, 0):
|
|
raise WebseaAPIError(int(errno), str(payload.get("errmsg")), payload)
|
|
return payload
|
|
|
|
def futures_depth(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
|
|
return self._request("GET", "/openApi/contract/depth", params={"symbol": symbol, "limit": limit}, auth=False)
|
|
|
|
def futures_24hr(self, symbol: str) -> Dict[str, Any]:
|
|
return self._request("GET", "/openApi/market/24hr", params={"symbol": symbol}, auth=False)
|
|
|
|
def positions(self, symbol: str, is_full: int = 1) -> Dict[str, Any]:
|
|
return self._request("GET", "/openApi/contract/position", params={"symbol": symbol, "is_full": is_full}, auth=True)
|
|
|
|
def current_orders(self, symbol: str, is_full: int = 1, limit: int = 100, direct: str = "prev") -> Dict[str, Any]:
|
|
return self._request("GET", "/openApi/contract/currentList",
|
|
params={"symbol": symbol, "is_full": is_full, "limit": limit, "direct": direct}, auth=True)
|
|
|
|
def add_order(
|
|
self,
|
|
*,
|
|
contract_type: str, # open/close
|
|
order_type: str, # buy-limit / sell-limit
|
|
symbol: str,
|
|
amount: float,
|
|
price: Optional[float] = None,
|
|
lever_rate: Optional[int] = None,
|
|
is_full: int = 1,
|
|
) -> Dict[str, Any]:
|
|
return self._request(
|
|
"POST",
|
|
"/openApi/contract/add",
|
|
data={
|
|
"contract_type": contract_type,
|
|
"type": order_type,
|
|
"symbol": symbol,
|
|
"amount": amount,
|
|
"price": price,
|
|
"lever_rate": lever_rate,
|
|
"is_full": is_full,
|
|
},
|
|
auth=True,
|
|
)
|
|
|
|
def cancel_orders(self, *, symbol: str, order_ids: List[str]) -> Dict[str, Any]:
|
|
return self._request(
|
|
"POST",
|
|
"/openApi/contract/cancel",
|
|
data={"symbol": symbol, "order_ids": ",".join(order_ids)},
|
|
auth=True,
|
|
)
|
|
|
|
def auth_ping(self, symbol: str, is_full: int) -> None:
|
|
"""Minimal auth test: fetch positions to confirm user recognition."""
|
|
_ = self.positions(symbol=symbol, is_full=is_full)
|
|
|
|
|
|
# -------------------------
|
|
# Bot loop
|
|
# -------------------------
|
|
def run_bot(client: WebseaClient, cfg: BotConfig) -> None:
|
|
last_mid: Optional[float] = None
|
|
print(f"[BOT] symbol={cfg.symbol} dry_run={cfg.dry_run} max_net={cfg.max_net_pos_contracts}")
|
|
|
|
while True:
|
|
try:
|
|
depth = client.futures_depth(cfg.symbol, limit=cfg.depth_limit)
|
|
bid, ask = best_bid_ask(depth)
|
|
mid = (bid + ask) / 2.0
|
|
spread_frac = (ask - bid) / mid
|
|
|
|
if spread_frac < cfg.min_spread_frac or spread_frac > cfg.max_spread_frac:
|
|
print(f"[SKIP] spread={spread_frac:.6f} bid={bid:.2f} ask={ask:.2f}")
|
|
time.sleep(cfg.loop_interval)
|
|
continue
|
|
|
|
pos = client.positions(symbol=cfg.symbol, is_full=cfg.is_full)
|
|
net = compute_net_position(pos, cfg.symbol)
|
|
|
|
cur = client.current_orders(symbol=cfg.symbol, is_full=cfg.is_full, limit=100)
|
|
buy_orders, sell_orders = classify_current_orders(cur)
|
|
|
|
buy_px = bid * (1.0 - cfg.quote_offset)
|
|
sell_px = ask * (1.0 + cfg.quote_offset)
|
|
|
|
need_refresh = last_mid is None or abs(mid - last_mid) / last_mid >= cfg.refresh_threshold
|
|
|
|
def far(orders: List[Dict[str, Any]], target: float) -> bool:
|
|
if not orders:
|
|
return True
|
|
prices = [float(o.get("price") or 0) for o in orders if o.get("price")]
|
|
if not prices:
|
|
return True
|
|
best = min(prices, key=lambda x: abs(x - target))
|
|
return abs(best - target) / target > (cfg.refresh_threshold * 0.8)
|
|
|
|
if far(buy_orders, buy_px) or far(sell_orders, sell_px):
|
|
need_refresh = True
|
|
|
|
allow_buy = net < cfg.max_net_pos_contracts
|
|
allow_sell = net > -cfg.max_net_pos_contracts
|
|
|
|
if need_refresh:
|
|
ids: List[str] = [str(o.get("order_id")) for o in (buy_orders + sell_orders) if o.get("order_id")]
|
|
if ids:
|
|
if cfg.dry_run:
|
|
print(f"[DRY] cancel {len(ids)} orders")
|
|
else:
|
|
client.cancel_orders(symbol=cfg.symbol, order_ids=ids)
|
|
print(f"[OK] canceled {len(ids)} orders")
|
|
|
|
rounded_buy_px = round(buy_px, 2)
|
|
rounded_sell_px = round(sell_px, 2)
|
|
|
|
if cfg.dry_run:
|
|
print(f"[DRY] mid={mid:.2f} net={net:.2f} buy@{rounded_buy_px:.2f} sell@{rounded_sell_px:.2f}")
|
|
else:
|
|
if allow_buy:
|
|
r1 = client.add_order(
|
|
contract_type="open",
|
|
order_type="buy-limit",
|
|
symbol=cfg.symbol,
|
|
amount=cfg.amount,
|
|
price=rounded_buy_px,
|
|
lever_rate=cfg.lever_rate,
|
|
is_full=cfg.is_full,
|
|
)
|
|
print("[OK] buy:", r1)
|
|
|
|
if allow_sell:
|
|
r2 = client.add_order(
|
|
contract_type="open",
|
|
order_type="sell-limit",
|
|
symbol=cfg.symbol,
|
|
amount=cfg.amount,
|
|
price=rounded_sell_px,
|
|
lever_rate=cfg.lever_rate,
|
|
is_full=cfg.is_full,
|
|
)
|
|
print("[OK] sell:", r2)
|
|
|
|
last_mid = mid
|
|
else:
|
|
print(f"[HOLD] mid={mid:.2f} spread={spread_frac:.6f} net={net:.2f} "
|
|
f"orders(buy={len(buy_orders)} sell={len(sell_orders)})")
|
|
|
|
# Enhanced closing logic if net is extreme
|
|
if abs(net) > cfg.max_net_pos_contracts * cfg.close_threshold_frac and not cfg.dry_run:
|
|
if net > 0:
|
|
close_type = "close"
|
|
close_order_type = "sell-limit"
|
|
close_px = mid * (1.0 - cfg.close_offset)
|
|
close_amt = net # Close all long
|
|
else:
|
|
close_type = "close"
|
|
close_order_type = "buy-limit"
|
|
close_px = mid * (1.0 + cfg.close_offset)
|
|
close_amt = abs(net) # Close all short
|
|
print(f"[WARN] Net position high ({net:.2f}), attempting to {close_type} with {close_order_type} amt={close_amt:.2f} @ {round(close_px, 2):.2f}")
|
|
r_close = client.add_order(
|
|
contract_type=close_type,
|
|
order_type=close_order_type,
|
|
symbol=cfg.symbol,
|
|
amount=close_amt,
|
|
price=round(close_px, 2),
|
|
lever_rate=cfg.lever_rate,
|
|
is_full=cfg.is_full,
|
|
)
|
|
print("[OK] close:", r_close)
|
|
|
|
except WebseaAPIError as e:
|
|
if e.errno == 20522:
|
|
print("[AUTH] User not recognized: Check Token/Secret or API permissions (enable Futures Trading).")
|
|
else:
|
|
print(f"[API] {e} payload={e.payload}")
|
|
except requests.HTTPError as e:
|
|
print(f"[HTTP] {e}")
|
|
except Exception as e:
|
|
print(f"[ERR] {e}")
|
|
|
|
time.sleep(cfg.loop_interval)
|
|
|
|
|
|
def env_bool(name: str, default: bool) -> bool:
|
|
v = os.environ.get(name)
|
|
if v is None:
|
|
return default
|
|
return v.strip().lower() in ("1", "true", "yes", "y", "on")
|
|
|
|
|
|
def main() -> None:
|
|
token = os.environ.get("WEBSEA_TOKEN", "")
|
|
secret = os.environ.get("WEBSEA_SECRET", "")
|
|
if not token or not secret:
|
|
raise SystemExit("Please set environment variables WEBSEA_TOKEN and WEBSEA_SECRET")
|
|
|
|
cfg = BotConfig(dry_run=env_bool("DRY_RUN", True))
|
|
|
|
client = WebseaClient(token=token, secret=secret)
|
|
|
|
# Public check
|
|
try:
|
|
t = client.futures_24hr(cfg.symbol)
|
|
print("[API] 24hr ok:", t)
|
|
except Exception as e:
|
|
print("[API CHECK FAILED]", e)
|
|
|
|
# Auth check
|
|
try:
|
|
client.auth_ping(cfg.symbol, cfg.is_full)
|
|
print("[AUTH] ok (user recognized)")
|
|
except WebseaAPIError as e:
|
|
print(f"[AUTH FAILED] errno={e.errno} errmsg={e.errmsg} payload={e.payload}")
|
|
print("Tips: Ensure API Key has Futures permissions enabled in Websea dashboard.")
|
|
raise
|
|
|
|
run_bot(client, cfg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |