Skip to content

Commit

Permalink
refactor: Improve WebSocketWorker class and user connection management
Browse files Browse the repository at this point in the history
  • Loading branch information
crushr3sist committed May 16, 2024
1 parent a845c8d commit 9ba8836
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions r3almX_backend/realtime_service/connection_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import asyncio
import datetime
import json
import sys
import traceback
from queue import Queue
from typing import Dict

import redis
from fastapi import Depends, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from jose import JWTError, jwt

from r3almX_backend.auth_service.auth_utils import TokenData
Expand Down Expand Up @@ -38,10 +40,7 @@ def disconnect(self, user_id):

def get_status_cache(self, user_id) -> Dict[str, str]:
cached_status = self.redis_client.hgetall("user_status")
return {
user_id.decode(): status.decode()
for user_id, status in cached_status.item()
}
return {user_id: status for user_id, status in cached_status.items()}

def set_dnd(self, user_id):
# class C: integer notif push (silent number increment)
Expand All @@ -68,11 +67,20 @@ def set_status_cache(self, user_id, status):

connection_manager = Connection()

# write an endpoint to read all of redis and return it as json


@realtime.get("/redis")
async def read_redis(token: str, db=Depends(get_db)):
user = get_user_from_token(token, db)

cached_data = connection_manager.get_status_cache(user.id)
return JSONResponse(cached_data)


@realtime.websocket("/connection")
async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):
# write a wholistic solution for connection so that we can send notif
# we dont need a feedback websocket we need to just read the data from the client through this

user = get_user_from_token(token, db)
if user:
await websocket.accept()
Expand All @@ -81,7 +89,7 @@ async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):
connection_manager.connection_sockets[user.id] = websocket
last_activity = datetime.datetime.now()
heartbeat_interval = 30
expiry_timeout = 300
expiry_timeout = 100
try:
while True:
try:
Expand All @@ -97,6 +105,7 @@ async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):
connection_manager.set_status(
user.id, connection_change_request["status"]
)

except asyncio.TimeoutError:
try:
await websocket.send_text("ping")
Expand All @@ -113,6 +122,7 @@ async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):
await websocket.close()
connection_manager.disconnect(user.id)
break

except WebSocketDisconnect:
connection_manager.disconnect(user.id)
else:
Expand Down

0 comments on commit 9ba8836

Please sign in to comment.