Skip to content

Commit

Permalink
Refactor WebSocketWorker class and add user connection management
Browse files Browse the repository at this point in the history
  • Loading branch information
crushr3sist committed Apr 29, 2024
1 parent a4270f1 commit 3e209bd
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 81 deletions.
21 changes: 21 additions & 0 deletions queue_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from queue import Queue

q = Queue()

q.put(1)
q.put(2)
q.put(3)

print("the size of our queue: ", q.qsize())

first_in_queue = q.get()

print(first_in_queue)

print("the size of our queue: ", q.qsize())

second_in_queue = q.get()

print(second_in_queue)

print("the size of our queue: ", q.qsize())
Binary file modified r3almX_backend/auth_service/__pycache__/auth_routes.cpython-312.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
166 changes: 85 additions & 81 deletions r3almX_backend/chat_service/chat_service_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,113 +1,117 @@
from queue import Queue

from fastapi import Depends, WebSocket, WebSocketDisconnect

from r3almX_backend.auth_service.user_handler_utils import get_db
from fastapi.responses import JSONResponse
from jose import JWTError, jwt
from sqlalchemy.orm import sessionmaker

from r3almX_backend.auth_service.auth_utils import TokenData
from r3almX_backend.auth_service.Config import UsersConfig
from r3almX_backend.auth_service.user_handler_utils import get_db, get_user_by_username
from r3almX_backend.auth_service.user_models import User
from r3almX_backend.chat_service.main import chat_service

"""
this is where we connect users to channels
we need to write a **websocket handler** so multiple users
can poll messages in and out to certain channels
- write a feed websocket handler (client receiver)
- write a broadcast digest websocket handler (client and message log updater)
- we need to specify which users to pool into which websocket
- we already specify the room id so the data knows where to be committed,
- we'll set a 50 message limit to be committed to the database
and stored in memory until committed
- and we'll poll whats being stored
"""


class RoomHandler:
def __init__(self, room_socket: WebSocket) -> None:
self.queue: Queue = []
def __init__(
self,
room_id: str,
) -> None:

self.queue: Queue = Queue()
self.room_id: str
self.room_socket: WebSocket = room_socket
self.users: WebSocket = []

def enqueue(self, message: str, user: str):
def enqueue(self, message: str, user: str) -> None:
"""this method exposes our queue so that we can enqueue messages being received"""
...
self.queue.put((message, user))
self.broadcast()

def broadcast(self):
async def broadcast(self) -> None:
"""this method goes through our queue and pushes messages to the room and every user inside that room"""
...

def digest(self):
"""this method pushes those messages to that specific rooms database"""
...
for user in self.users:
message_to_broadcast = self.queue.get()
await user.send_text(message_to_broadcast[0])

def get_room_users(self):
"""aggregator for all users that are part of the room"""
def digest(self) -> None:
"""limit of 10 messages in the queue, once queue is 10, we dispatch to db"""
...

def connect_user(self, user_socket: WebSocket) -> None:
self.users.append(user_socket)

def disconnect_user(self, user_socket: WebSocket) -> None:
self.users.remove(user_socket)


class WebSocketWorker:
def __init__(self) -> None:
self.room_sockets: RoomHandler = []
# need to keep track of all the rooms that are currently active
# @future need to implement room_socket into redis
self.room_instances: dict[str, RoomHandler] = {}

def spawn(self):
def spawn(self, room_id, websocket: WebSocket) -> None:
"""once a room is created, we spawn an instance for that room specifically"""
new_socket = RoomHandler()
self.room_sockets.append(new_socket)

def intake(self):
"""feafawef"""
...
new_socket = RoomHandler(room_id)
new_socket.connect_user(websocket)
self.room_instances[room_id] = new_socket

def intake(self, room_id: str, message: str, user: str) -> None:
"""call the enqueue function"""
self.room_instances[room_id].enqueue(message, user)

class ConnectionManager:
def __init__(self):
self.active_connections = []
def has(self, room_id: str) -> bool:
try:
if self.room_instances[room_id] != None:
return True
except KeyError:
return False

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect_surface(self, websocket, room_id):
self.room_instances[room_id].disconnect_user(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
ws_worker = WebSocketWorker()


manager = ConnectionManager()
def get_user_from_token(token: str, db) -> User:
payload = jwt.decode(
token, UsersConfig.SECRET_KEY, algorithms=[UsersConfig.ALGORITHM]
)
username: str = payload.get("sub")
token_data = TokenData(username=username)
user = get_user_by_username(db, username=token_data.username)
return user


@chat_service.websocket("/feed/{room_id}/ws")
@chat_service.websocket("/message/{room_id}")
async def broadcast_(
websocket: WebSocket,
room_id: str,
token: str,
db=Depends(get_db),
): ...


@chat_service.websocket("/feed/{room_id}/ws")
async def feed_ws_endpoint(
websocket: WebSocket,
room_id: str,
token: str,
db=Depends(get_db),
): ...


@chat_service.websocket("/feed/{room_id}/ws")
async def feed_ws_endpoint(
websocket: WebSocket,
room_id: str,
token: str,
db=Depends(get_db),
websocket: WebSocket, room_id: str, token: str, db=Depends(get_db)
):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"Message text was: {data}", websocket)

except WebSocketDisconnect:

await manager.send_personal_message("Bye !!!", websocket)
manager.disconnect(websocket)
print(token)
user = get_user_from_token(token, db)
if user:
await websocket.accept()
try:
while True:
if not ws_worker.has(room_id):
ws_worker.spawn(room_id, websocket)
text_received = await websocket.receive_text()
print(text_received)
ws_worker.intake(room_id, text_received, user.id)
except WebSocketDisconnect:
ws_worker.disconnect_surface(websocket, room_id)
else:
return {"status": 500, "message": "there was an issue"}


@chat_service.get("/message/rooms/")
def get_all_connections():
return {
"connections:": {
str(list(ws_worker.room_sockets.keys())): str(
list(ws_worker.room_sockets.values())
)
}
}

0 comments on commit 3e209bd

Please sign in to comment.