-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathwebsocket.py
91 lines (77 loc) · 2.97 KB
/
websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import asyncio
from fastapi import Depends, HTTPException, WebSocket, WebSocketDisconnect
from app.dependencies.auth import (
AcaPyAuthVerified,
get_acapy_auth,
get_acapy_auth_verified,
)
from app.services.event_handling.websocket_manager import WebsocketManager
from shared.log_config import get_logger
logger = get_logger(__name__)
DISCONNECT_CHECK_PERIOD = 0.1
async def get_websocket_api_key(websocket: WebSocket) -> str:
for header, value in websocket.headers.items():
if header.lower() == "x-api-key":
return value
return ""
async def websocket_auth(
websocket: WebSocket,
api_key: str = Depends(get_websocket_api_key),
) -> AcaPyAuthVerified:
try:
auth = get_acapy_auth(api_key)
return get_acapy_auth_verified(auth)
except HTTPException:
await websocket.accept() # Accept to send unauthorized message
await websocket.send_text("Unauthorized")
await websocket.close(code=1008)
logger.info("Unauthorized WebSocket connection closed.")
async def handle_websocket(
websocket: WebSocket,
*,
group_id: str,
wallet_id: str,
topic: str,
auth: AcaPyAuthVerified,
) -> None:
bound_logger = logger.bind(body={"wallet_id": wallet_id, "topic": topic})
bound_logger.debug("Accepting websocket")
await websocket.accept()
bound_logger.debug("Accepted websocket")
if (
not auth
or (
wallet_id and auth.wallet_id not in ("admin", wallet_id)
) # tenants can subscribe to their own wallets; admin can subscribe to any wallet
or (
not wallet_id and auth.wallet_id != "admin"
) # only admin can subscribe to topic (wallet_id == "")
):
bound_logger.debug("Notifying user is unauthorized")
await websocket.send_text("Unauthorized")
await websocket.close(code=1008)
bound_logger.info("Unauthorized WebSocket connection closed")
return
if not group_id and not wallet_id and not topic:
bound_logger.debug("Notifying one of group, wallet, or topic must be specified")
await websocket.send_text("One of group, wallet, or topic must be specified")
await websocket.close(code=1008)
bound_logger.info("Closed WebSocket connection with bad request")
return
uuid = None
try:
# Subscribe the WebSocket connection to the wallet / topic
uuid = await WebsocketManager.subscribe(
websocket, group_id=group_id, wallet_id=wallet_id, topic=topic
)
# Keep the connection open until the client disconnects
while True:
await asyncio.sleep(DISCONNECT_CHECK_PERIOD)
except WebSocketDisconnect:
bound_logger.info("WebSocket connection closed.")
if uuid:
await WebsocketManager.unsubscribe(uuid)
except Exception:
bound_logger.exception("Exception caught while handling websocket.")
if uuid:
await WebsocketManager.unsubscribe(uuid)