Skip to content

Commit

Permalink
UPD: add/update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
desty2k committed Jun 30, 2021
1 parent e4fb661 commit c647037
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 33 deletions.
30 changes: 30 additions & 0 deletions QtPyNetwork/client/QThreadedClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,26 +190,56 @@ def start(self, ip: str, port: int):

@Slot(str, int)
def on_connected(self, ip, port):
"""Called when client connects to server.
Emits connected signal.
Args:
ip (str): Client ip address.
port (int): Client port.
"""
self.connected.emit(ip, port)

@Slot(str, int)
def on_failed_to_connect(self, ip, port):
"""Called when client fails to connect to server.
Emits failed_to_connect signal.
Args:
ip (str): Client ip address.
port (int): Client port.
"""
self.failed_to_connect.emit(ip, port)

@Slot(bytes)
def on_message(self, message: bytes):
"""Called when client receives message from server.
Emits message signal.
Args:
message (bytes): Message.
"""
self.message.emit(message)

@Slot()
def on_disconnected(self):
"""Called when device disconnects from server.
Emits disconnected signal."""
self.disconnected.emit()

@Slot(str)
def on_error(self, error: str):
"""Called when a socket error occurs.
Emits error signal.
Args:
error (str): Error string.
"""
self.error.emit(error)

@Slot()
def on_closed(self):
"""Called when when the socket is closed.
Emits closed signal."""
self.closed.emit()

@Slot(bytes)
Expand Down
31 changes: 29 additions & 2 deletions QtPyNetwork/server/BaseServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def __init__(self, loggerName=None):

@Slot(str, int, bytes)
def start(self, ip: str, port: int):
"""Start server on IP:Port and decrypt incomming and
outcomming messages with encryption key."""
"""Start server."""
if self.__handlerClass:
ip = QHostAddress(ip)
self.__ip = ip
Expand Down Expand Up @@ -110,18 +109,46 @@ def __on_handler_device_error(self, device_id, error):

@Slot(Device, str, int)
def on_connected(self, device: Device, ip: str, port: int):
"""Called when new client connects to server.
Emits connected signal.
Args:
device (Device): Device object.
ip (str): Client ip address.
port (int): Client port.
"""
self.connected.emit(device, ip, port)

@Slot(Device, bytes)
def on_message(self, device: Device, message: bytes):
"""Called when server receives message from client.
Emits message signal.
Args:
device (Device): Message sender.
message (bytes): Message.
"""
self.message.emit(device, message)

@Slot(Device)
def on_disconnected(self, device: Device):
"""Called when device disconnects from server.
Emits disconnected signal.
Args:
device (Device): Disconnected device.
"""
self.disconnected.emit(device)

@Slot(Device, str)
def on_error(self, device: Device, error: str):
"""Called when a socket error occurs.
Emits error signal.
Args:
device (Device): Device object.
error (str): Error string.
"""
self.error.emit(device, error)

@Slot()
Expand Down
61 changes: 44 additions & 17 deletions QtPyNetwork/server/QBalancedServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@
class SocketWorker(QObject):
"""SocketWorker manages sockets and handles messages.
Signals:
Outgoing signals:
- disconnected (device_id: int): Client disconnected.
- connected (device_id: int, ip: str, port: int): Client connected
- connected (device_id: int, ip: str, port: int): Client connected.
- message (device_id: int, message: bytes): Message from client.
- error (device_id: int, error: str): Error occured.
- closed (): Closed successfully.
- send (device_id: int, message: bytes): Emit to send message
to client with ID in this thread.
- send (message: bytes): Emit to send message
to all clients in this thread.
Incomming signals:
- write (device_id: int, message: bytes): Emit to send message
to client with ID in this worker.
- write_all (message: bytes): Emit to send message
to all clients in this worker.
- kick (device_id: int): Emit to kick device with ID
in this worker.
- connection (device_id: int, socket_descriptor: int): Emit to create new socket
object in this worker.
- close_signal (): Emit to close all connections.
"""

disconnected = Signal(int)
Expand Down Expand Up @@ -54,6 +62,11 @@ def start(self) -> None:

@Slot(int)
def _kick(self, device_id):
"""Executed inside worker thread after emitting kick signal.
Args:
device_id (int): ID of device to kick.
"""
socket = self.get_socket_by_id(device_id)
if socket:
socket.close()
Expand Down Expand Up @@ -102,12 +115,12 @@ def get_socket_by_id(self, device_id: int):

@Slot()
def socket_count(self):
"""Returns amount of sockets."""
"""Returns amount of active sockets."""
return len(self.sockets)

@Slot()
def used_ids(self):
"""Returns used IDs."""
"""Returns IDs used by this worker."""
return [int(x.objectName()) for x in self.sockets]

@Slot(int)
Expand Down Expand Up @@ -234,19 +247,27 @@ def close(self):


class BalancedSocketHandler(QObject):
"""Creates socket handlers threads. New connections
are passed to SocketWorker with least load.
"""Creates socket handlers threads. New sockets
are passed to worker with least load.
Signals:
Outgoing signals:
- started (): Handler started.
- closed (): Handler closed.
- message (device_id: int, message: bytes): Message received.
- close_signal (): Emit this to close handler from another thread.
- connection (device_id: int): New connection.
- closed (): Handler closed all connections.
- message (client_id: int, message: bytes): Message received.
- error (client_id: int, error: str): Socket error.
- disconnected (client_id: int). Client disconnected.
Incomming signals:
- write (device_id: int, message: bytes): Emit to send message
to client with ID.
- write_all (message: bytes): Emit to send message
to all clients.
- kick (device_id: int): Emit to kick client with ID.
- close_signal (): Emit to close all connections.
"""
started = Signal()
closed = Signal()
close_signal = Signal()

connected = Signal(int, str, int)
message = Signal(int, bytes)
Expand All @@ -256,14 +277,15 @@ class BalancedSocketHandler(QObject):
write = Signal(int, bytes)
write_all = Signal(bytes)
kick = Signal(int)
close_signal = Signal()

def __init__(self, cores=None):
super(BalancedSocketHandler, self).__init__(None)
self.cores = cores

@Slot()
def start(self):
"""Start server and create socket handlers."""
"""Start server and create socket workers."""
self.__logger = logging.getLogger(self.__class__.__name__) # noqa
self.workers = [] # noqa
self.threads = [] # noqa
Expand All @@ -290,6 +312,7 @@ def start(self):

@Slot()
def create_worker(self):
"""Creates new socket worker in thread."""
thread = QThread()
worker = SocketWorker()
worker.moveToThread(thread)
Expand Down Expand Up @@ -390,6 +413,10 @@ def close(self) -> None:


class QBalancedServer(QBaseServer):
"""TCP server with constant amount of threads. When new client connects, server
checks which worker has the least amount of active sockets and passes socket
descriptor to that thread."""

def __init__(self, *args, **kwargs):
super(QBalancedServer, self).__init__(*args, **kwargs)
self.set_handler_class(BalancedSocketHandler)
54 changes: 41 additions & 13 deletions QtPyNetwork/server/QThreadedServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,40 @@


class SocketClient(QObject):
"""Keeps socket in separate thread.
Outgoing signals:
- disconnected (device_id: int): Client disconnected.
- connected (device_id: int, ip: str, port: int): Client connected.
- message (device_id: int, message: bytes): Message from client.
- error (device_id: int, error: str): Error occured.
- closed (): Closed successfully.
Incomming signals:
- close_signal (): Emit to close connection.
- write (device_id: int, message: bytes): Emit to send message
to client with ID in this thread.
"""

disconnected = Signal(int)
connected = Signal(int, str, int)
message = Signal(int, bytes)
error = Signal(int, str)
closed = Signal()

connection = Signal(int, int)
close_signal = Signal()

write = Signal(bytes)

json_encoder = None
json_decoder = None

def __init__(self, socket_descriptor, client_id):
super(SocketClient, self).__init__(None)
self.socket_descriptor = socket_descriptor
self.id = int(client_id)

@Slot()
def run(self) -> None:
"""Run socket manager"""
"""Run socket manager."""
self._logger = logging.getLogger(self.__class__.__name__) # noqa
self.data = {"size_left": 0, "data": b""} # noqa
self.socket = QTcpSocket() # noqa
Expand All @@ -48,7 +60,8 @@ def run(self) -> None:
self.write.connect(self._write)

@Slot()
def get_id(self):
def get_id(self) -> int:
"""Get ID of client."""
return self.id

@Slot(bytes)
Expand Down Expand Up @@ -140,18 +153,26 @@ def close(self):


class ThreadedSocketHandler(QObject):
"""Creates client threads.
"""Creates and manages socket client threads.
Signals:
Outgoing signals:
- started (): Handler started.
- finished (): Handler finished.
- closed (): Handler closed all connections.
- message (client_id: int, message: bytes): Message received.
- close_signal (): Emit this to close handler from another thread.
- connection (client_id: int): New connection.
- error (client_id: int, error: str): Socket error.
- disconnected (client_id: int). Client disconnected.
Incomming signals:
- write (device_id: int, message: bytes): Emit to send message
to client with ID.
- write_all (message: bytes): Emit to send message
to all clients.
- kick (device_id: int): Emit to kick client with ID.
- close_signal (): Emit to close all connections.
"""
started = Signal()
closed = Signal()
close_signal = Signal()

connected = Signal(int, str, int)
message = Signal(int, bytes)
Expand All @@ -161,6 +182,7 @@ class ThreadedSocketHandler(QObject):
write = Signal(int, bytes)
write_all = Signal(bytes)
kick = Signal(int)
close_signal = Signal()

def __init__(self):
super(ThreadedSocketHandler, self).__init__(None)
Expand Down Expand Up @@ -212,11 +234,17 @@ def on_incoming_connection(self, socket_descriptor: int) -> None:

@Slot(int)
def on_client_disconnected(self, client_id: int):
"""Remove client object after disconnection.
Args:
client_id (int): Client ID
"""
self.clients.remove(self.get_client_by_id(client_id))
self.disconnected.emit(client_id)

@Slot()
def on_thread_finished(self):
"""Remove all finished threads."""
self.threads = [thread for thread in self.threads if thread.isRunning()]

@Slot(int)
Expand Down Expand Up @@ -300,7 +328,7 @@ def close(self) -> None:

class QThreadedServer(QBaseServer):
"""Socket server with dynamic amount of threads. When client connects,
handler creates new thread and passes socket descriptor to that thread. """
handler creates new thread and passes socket descriptor to that thread."""

def __init__(self, *args, **kwargs):
super(QThreadedServer, self).__init__(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ There are two servers available:

The first one has constant amount of threads.
When new client connects, server checks which worker has the least amount of active sockets and passes socket
descriptor to this thread. QThreadedServer creates new thread for each connected device.
descriptor to that thread. QThreadedServer creates new thread for each connected device.

QThreadedClient is socket client, that keeps socket in separate thread.

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Welcome to QtPyNetwork's documentation!
:caption: Contents:

readme.rst
reference/modules.rst
license.rst
changes.rst

Expand Down
Loading

0 comments on commit c647037

Please sign in to comment.