Files
dzentra_bot/app/tools/ws_probe.py

149 lines
3.4 KiB
Python

from __future__ import annotations
import asyncio
import json
import os
import sys
from pathlib import Path
import websockets
from dotenv import load_dotenv
BASE_DIR = Path(__file__).resolve().parents[1]
ENV_FILE = BASE_DIR / ".env"
load_dotenv(ENV_FILE)
BASE_HTTP_URL = os.getenv("EXCHANGE_BASE_URL", "").strip().rstrip("/")
API_KEY = os.getenv("EXCHANGE_API_KEY", "").strip()
SYMBOL = os.getenv("DEFAULT_SYMBOL", "BTC/USD_LEVERAGE").strip()
TIMEOUT = int(os.getenv("EXCHANGE_TIMEOUT_SEC", "10"))
FOUND = False
def to_ws_url(raw_url: str) -> str:
if raw_url.startswith("https://"):
return raw_url.replace("https://", "wss://", 1)
if raw_url.startswith("http://"):
return raw_url.replace("http://", "ws://", 1)
return raw_url
async def try_connect(
*,
label: str,
url: str,
headers: dict[str, str] | None = None,
subprotocols: list[str] | None = None,
send_payload: dict | None = None,
) -> None:
global FOUND
if FOUND:
return
print(f"\n=== {label} ===")
print(f"URL: {url}")
try:
async with websockets.connect(
url,
extra_headers=headers or {},
subprotocols=subprotocols,
ping_interval=20,
open_timeout=TIMEOUT,
) as websocket:
print("CONNECTED: 101 Switching Protocols")
FOUND = True
if send_payload is not None:
raw_payload = json.dumps(send_payload)
print(f"SEND: {raw_payload}")
await websocket.send(raw_payload)
try:
message = await asyncio.wait_for(websocket.recv(), timeout=5)
print(f"RECV: {message}")
except asyncio.TimeoutError:
print("RECV: timeout after 5s")
except Exception as exc:
print(f"FAILED: {type(exc).__name__}: {exc}")
async def main() -> None:
global FOUND
if not BASE_HTTP_URL:
print("EXCHANGE_BASE_URL is empty")
sys.exit(1)
base_ws = to_ws_url(BASE_HTTP_URL)
payload = {
"limit": 5,
"symbol": SYMBOL,
}
header_sets = [{}]
if API_KEY:
header_sets.append({"X-MBX-APIKEY": API_KEY})
paths = [
"/api/v2/depth",
"/api/v1/depth",
"/ws",
"/websocket",
]
query_variants = [
"",
f"?symbol={SYMBOL}&limit=5",
]
subprotocol_variants = [
None,
["json"],
]
count = 0
for path in paths:
if FOUND:
break
for query in query_variants:
if FOUND:
break
url = f"{base_ws}{path}{query}"
for headers in header_sets:
if FOUND:
break
for subprotocols in subprotocol_variants:
if FOUND:
break
send_payload = None if query else payload
count += 1
await try_connect(
label=f"probe #{count}",
url=url,
headers=headers,
subprotocols=subprotocols,
send_payload=send_payload,
)
if FOUND:
print("\nSUCCESS: working WebSocket endpoint found")
else:
print("\nFAILED: no WebSocket endpoint found")
if __name__ == "__main__":
asyncio.run(main())