Skip to content

Commit

Permalink
feat: Add notification system for new messages in chat rooms
Browse files Browse the repository at this point in the history
  • Loading branch information
crushr3sist committed Jun 1, 2024
1 parent 69ec2c8 commit 53854ef
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
8 changes: 6 additions & 2 deletions r3almX_backend/realtime_service/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from r3almX_backend.auth_service.Config import UsersConfig
from r3almX_backend.auth_service.user_handler_utils import get_db, get_user_by_username
from r3almX_backend.auth_service.user_models import User
from r3almX_backend.realtime_service.connection_service import NotificationSystem
from r3almX_backend.realtime_service.main import realtime


Expand Down Expand Up @@ -102,13 +103,13 @@ async def disconnect_user(self, room_id: str, websocket: WebSocket):


room_manager = RoomManager()
notification_system = NotificationSystem()


@realtime.websocket("/message/{room_id}")
async def websocket_endpoint(
websocket: WebSocket, room_id: str, token: str, db=Depends(get_db)
):

user = get_user_from_token(token, db)
if user:
await websocket.accept()
Expand All @@ -118,10 +119,13 @@ async def websocket_endpoint(
data = await websocket.receive_text()
room_manager.add_message_to_queue(room_id, data, user.id)
await room_manager.start_broadcast_task(room_id)
await notification_system.send_notification_to_user(
user.id, f"New message in room {room_id}: {data}"
)
except WebSocketDisconnect:
await room_manager.disconnect_user(room_id, websocket)
else:
await websocket.close(code=1008) # Unsupported data
await websocket.close(code=1008)


@realtime.get("/message/rooms/")
Expand Down
57 changes: 40 additions & 17 deletions r3almX_backend/realtime_service/connection_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,29 @@
import redis
from fastapi import Depends, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from jose import JWTError, jwt

from r3almX_backend.auth_service.user_handler_utils import get_db
from r3almX_backend.realtime_service.chat_service import get_user_from_token
from r3almX_backend.auth_service.auth_utils import TokenData
from r3almX_backend.auth_service.Config import UsersConfig
from r3almX_backend.auth_service.user_handler_utils import get_db, get_user_by_username
from r3almX_backend.auth_service.user_models import User
from r3almX_backend.realtime_service.main import realtime


def get_user_from_token(token: str, db) -> User:

try:
payload = jwt.decode(
token, UsersConfig.SECRET_KEY, algorithms=[UsersConfig.ALGORITHM]
)
username: str = payload.get("sub")
token_data = TokenData(username=username)
user = get_user_by_username(db, username=token_data.username)
return user
except JWTError as j:
return j


class Connection:
def __init__(self):
self.redis_client = redis.Redis().from_url(
Expand Down Expand Up @@ -57,10 +74,13 @@ def set_status(self, user_id, status):
def set_status_cache(self, user_id, status):
self.redis_client.hset("user_status", str(user_id), status)

async def send_notification(self, user_id, message):
websocket = self.connection_sockets.get(user_id)
if websocket:
await websocket.send_text(message)

connection_manager = Connection()

# write an endpoint to read all of redis and return it as json
connection_manager = Connection()


class NotificationSystem:
Expand All @@ -76,22 +96,22 @@ def __init__(self):
def return_user(self, user_id):
return self.connections.connection_cache_list.get(user_id)

async def send_notification_to_user(self, user_id, message):
await self.connections.send_notification(user_id, message)


@realtime.get("/redis")
async def read_redis(token: str, db=Depends(get_db)):
user = get_user_from_token(token, db)

cached_data = connection_manager.get_status_cache(user.id)
return JSONResponse(cached_data)


@realtime.websocket("/connection")
async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):

user = get_user_from_token(token, db)
if user:
await websocket.accept()

connection_manager.connect(user.id)
connection_manager.connection_sockets[user.id] = websocket
last_activity = datetime.datetime.now()
Expand All @@ -101,36 +121,39 @@ async def connect(websocket: WebSocket, token: str, db=Depends(get_db)):
while True:
try:
if connection_manager.is_connected(user.id) is False:
connection_manager.connection_socket[str(user.id)] = websocket
connection_manager.connection_sockets[str(user.id)] = websocket
connection_manager.set_status_cache(
connection_manager.get_status(user.id)
user.id, connection_manager.get_status(user.id)
)
connection_change_request = await websocket.receive_json()

# through this we can just check for keys inside of the change request
if connection_change_request["status"]:
if "status" in connection_change_request:
connection_manager.set_status(
user.id, connection_change_request["status"]
)

except asyncio.TimeoutError:
try:
await websocket.send_text("ping")
await asyncio.wait_for(
websocket.receive_text(), timeout=heartbeat_interval / 2
)
last_activity = datetime.now()

last_activity = datetime.datetime.now()
except (asyncio.TimeoutError, WebSocketDisconnect):
if (
datetime.now() - last_activity
datetime.datetime.now() - last_activity
).total_seconds() > expiry_timeout:
print(f"disconnecting user: {user.id} ")
await websocket.close()
connection_manager.disconnect(user.id)
break

except WebSocketDisconnect:
connection_manager.disconnect(user.id)
else:
return websocket.close(1001)


# Now to use the NotificationSystem to send a notification to a user:
notification_system = NotificationSystem()


async def notify_user(user_id, message):
await notification_system.send_notification_to_user(user_id, message)

0 comments on commit 53854ef

Please sign in to comment.