from __future__ import annotations import asyncio import json import os import sys from pathlib import Path import websockets from dotenv import load_dotenv BASE_DIR = Path(__file__).resolve().parents[1] ENV_FILE = BASE_DIR / ".env" load_dotenv(ENV_FILE) BASE_HTTP_URL = os.getenv("EXCHANGE_BASE_URL", "").strip().rstrip("/") API_KEY = os.getenv("EXCHANGE_API_KEY", "").strip() SYMBOL = os.getenv("DEFAULT_SYMBOL", "BTC/USD_LEVERAGE").strip() TIMEOUT = int(os.getenv("EXCHANGE_TIMEOUT_SEC", "10")) FOUND = False def to_ws_url(raw_url: str) -> str: if raw_url.startswith("https://"): return raw_url.replace("https://", "wss://", 1) if raw_url.startswith("http://"): return raw_url.replace("http://", "ws://", 1) return raw_url async def try_connect( *, label: str, url: str, headers: dict[str, str] | None = None, subprotocols: list[str] | None = None, send_payload: dict | None = None, ) -> None: global FOUND if FOUND: return print(f"\n=== {label} ===") print(f"URL: {url}") try: async with websockets.connect( url, extra_headers=headers or {}, subprotocols=subprotocols, ping_interval=20, open_timeout=TIMEOUT, ) as websocket: print("CONNECTED: 101 Switching Protocols") FOUND = True if send_payload is not None: raw_payload = json.dumps(send_payload) print(f"SEND: {raw_payload}") await websocket.send(raw_payload) try: message = await asyncio.wait_for(websocket.recv(), timeout=5) print(f"RECV: {message}") except asyncio.TimeoutError: print("RECV: timeout after 5s") except Exception as exc: print(f"FAILED: {type(exc).__name__}: {exc}") async def main() -> None: global FOUND if not BASE_HTTP_URL: print("EXCHANGE_BASE_URL is empty") sys.exit(1) base_ws = to_ws_url(BASE_HTTP_URL) payload = { "limit": 5, "symbol": SYMBOL, } header_sets = [{}] if API_KEY: header_sets.append({"X-MBX-APIKEY": API_KEY}) paths = [ "/api/v2/depth", "/api/v1/depth", "/ws", "/websocket", ] query_variants = [ "", f"?symbol={SYMBOL}&limit=5", ] subprotocol_variants = [ None, ["json"], ] count = 0 for path in paths: if FOUND: break for query in query_variants: if FOUND: break url = f"{base_ws}{path}{query}" for headers in header_sets: if FOUND: break for subprotocols in subprotocol_variants: if FOUND: break send_payload = None if query else payload count += 1 await try_connect( label=f"probe #{count}", url=url, headers=headers, subprotocols=subprotocols, send_payload=send_payload, ) if FOUND: print("\nSUCCESS: working WebSocket endpoint found") else: print("\nFAILED: no WebSocket endpoint found") if __name__ == "__main__": asyncio.run(main())