Skip to content

Commit

Permalink
UPD: add TCPClient, move old version to examples
Browse files Browse the repository at this point in the history
  • Loading branch information
desty2k committed May 26, 2022
1 parent 9e61c2a commit bc49717
Show file tree
Hide file tree
Showing 37 changed files with 389 additions and 150 deletions.
2 changes: 1 addition & 1 deletion QtPyNetwork/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.1"
__version__ = "0.7.0"
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ def write(self, client_id: int, data: bytes):
data = pack(HEADER, len(data)) + data
socket.write(data)
socket.flush()
print(f"Written {len(data)} bytes to {socket.objectName()}")
return
print(f"{client_id} - {data}")
print([socket.objectName() for socket in self.sockets])
self.client_error.emit(client_id, Exception(f"Client {client_id} not found"))

@Slot(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
from qtpy.QtNetwork import QAbstractSocket, QUdpSocket

import logging
from struct import unpack, calcsize

from .AbstractBalancer import AbstractBalancer, HEADER, HEADER_SIZE
from QtPyNetwork.common import read, write
from .AbstractBalancer import AbstractBalancer


class _Worker(QObject):
disconnected = Signal()
connected = Signal(str, int)
readyRead = Signal(bytes)
error = Signal(Exception)
closed = Signal()
disconnected = Signal(int)
connected = Signal(int, str, int)
ready_read = Signal(int, bytes)
error = Signal(int, Exception)
closed = Signal(int)

close_signal = Signal()

write = Signal(bytes)
write_signal = Signal(bytes)

def __init__(self, client_id, socket_type: type, socket_descriptor: int):
super(_Worker, self).__init__()
Expand All @@ -29,6 +29,9 @@ def __init__(self, client_id, socket_type: type, socket_descriptor: int):
self.size_left = 0
self.data = b""

self.close_signal.connect(self.__on_close_signal)
self.write_signal.connect(self.__on_write_signal)

@Slot()
def start(self):
socket: QAbstractSocket = self.socket_type()
Expand All @@ -50,49 +53,35 @@ def __on_socket_ready_read(self):
Note:
Emits message signal.
"""

while self.socket.bytesAvailable():
if self.size_left > 0:
data = self.socket.read(self.size_left)
self.size_left = self.size_left - len(data)
if self.size_left > 0:
self.data = self.data + data
else:
self.readyRead.emit(self.client_id, self.data)
self.data = b""
self.size_left = 0
else:
header = self.socket.read(HEADER_SIZE)
data_size = unpack(HEADER, header)[0]
data = self.socket.read(data_size)
if len(data) < data_size:
self.data = data
self.size_left = data_size - len(data)
else:
self.readyRead.emit(self.client_id, data)

# if self.client_id in self.data:
# size_left = self.data.get(client_id).get("size_left")
# data = socket.read(size_left)
# size_left = size_left - len(data)
# if size_left > 0:
# self.data[client_id]["size_left"] = size_left
# self.data[client_id]["data"] += data
# else:
# data = self.data.get(client_id).get("data") + data
# del self.data[client_id]
# self.message.emit(client_id, data)
#
# else:
# header = socket.read(HEADER_SIZE)
# data_size = unpack(HEADER, header)[0]
# message = socket.read(data_size)
#
# if len(message) < data_size:
# data_size = data_size - len(message)
# self.data[client_id] = {"data": message, "size_left": data_size}
# else:
# self.message.emit(client_id, message)
data, size_left = read(self.socket, self.data, self.size_left)
if size_left == 0:
self._size_left = 0
self._data = b""
self.ready_read.emit(self.client_id, data)
else:
self._data = data
self._size_left = size_left


# while self.socket.bytesAvailable():
# if self.size_left > 0:
# data = self.socket.read(self.size_left)
# self.size_left = self.size_left - len(data)
# if self.size_left > 0:
# self.data = self.data + data
# else:
# self.ready_read.emit(self.client_id, self.data)
# self.data = b""
# self.size_left = 0
# else:
# header = self.socket.read(HEADER_SIZE)
# data_size = unpack(HEADER, header)[0]
# data = self.socket.read(data_size)
# if len(data) < data_size:
# self.data = data
# self.size_left = data_size - len(data)
# else:
# self.ready_read.emit(self.client_id, data)

@Slot()
def __on_socket_disconnected(self):
Expand Down Expand Up @@ -123,24 +112,38 @@ def __on_socket_error(self):
error = self.socket.errorString()
self.error.emit(self.client_id, Exception(error))

@Slot()
def __on_close_signal(self):
"""Close socket.
class ThreadBalancer(AbstractBalancer):
Note:
Emits closed signal.
"""
try:
self.socket.close()
except RuntimeError:
pass
self.closed.emit(self.client_id)

@Slot(bytes)
def write_all(self, message: bytes):
pass
def __on_write_signal(self, data: bytes):
"""Write data to socket.
@Slot(int)
def disconnect(self, client_id: int):
pass
Args:
data (bytes): Data to write.
@Slot()
def close(self):
pass
Note:
Emits written signal.
"""
write(self.socket, data)
self.socket.flush()

# data = pack(HEADER, len(data)) + data
# self.socket.write(data)
# self.socket.flush()

@Slot(int, bytes)
def write(self, client_id: int, message: bytes):
pass

class ThreadBalancer(AbstractBalancer):

def __init__(self):
super(ThreadBalancer, self).__init__()
Expand All @@ -153,40 +156,48 @@ def balance(self, socket_type: type, socket_descriptor: int):
worker = _Worker(client_id, socket_type, socket_descriptor)
worker.setObjectName(str(client_id))
# worker.connected.connect(self.__on_worker_socket_connected)
# worker.readyRead.connect(self.__on_worker_socket_readyRead)
# worker.ready_read.connect(self.__on_worker_socket_readyRead)
# worker.disconnected.connect()
worker.connected.connect(self.connected.emit)
worker.disconnected.connect(self.disconnected.emit)
worker.readyRead.connect(self.message.emit)
worker.ready_read.connect(self.message.emit)
worker.error.connect(self.client_error.emit)

thread = QThread()
worker.moveToThread(thread)
thread.started.connect(worker.run)
thread.started.connect(worker.start)
self.workers.append((worker, thread))
thread.start()

# @Slot(str, int)
# def __on_worker_socket_connected(self, ip: str, port: int):
# client_id = int(self.sender().objectName())
# self.connected.emit(client_id, ip, port)
#
# @Slot(bytes)
# def __on_worker_socket_readyRead(self, data: bytes):
# client_id = int(self.sender().objectName())
# self.connected.emit(client_id, data)
#
# @Slot()
# def __on_worker_socket_disconnected(self):
# client_id = int(self.sender().objectName())
# self.disconnected.emit(client_id)



@Slot(int, bytes)
def write(self, client_id: int, message: bytes):
worker = self.__get_worker_by_client_id(client_id)
print(f"Writing to {client_id}: {message}")
if worker:
worker.write_signal.emit(message)
else:
self.client_error.emit(client_id, Exception("Client not found"))

@Slot(bytes)
def write_all(self, message: bytes):
for worker in self.workers:
worker.write_signal.emit(message)

@Slot(int)
def disconnect(self, client_id: int):
worker = self.__get_worker_by_client_id(client_id)
if worker:
worker.close_signal.emit()

@Slot()
def close(self):
pass

@Slot(int)
def __get_worker_by_client_id(self, client_id: int):
for worker, thread in self.workers:
if worker.objectName() == str(client_id):
return worker



Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .AbstractBalancer import AbstractBalancer
from .NoBalancer import NoBalancer
from .ThreadBalancer import ThreadBalancer
96 changes: 96 additions & 0 deletions QtPyNetwork/client/AbstractClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from qtpy.QtCore import Slot, Signal, QObject

from abc import abstractmethod


class AbstractClient(QObject):

connected = Signal(str, int)
disconnected = Signal()
message = Signal(bytes)
error = Signal(Exception)
closed = Signal()
failed_to_connect = Signal()

def __init__(self, timeout: int = 5):
super(AbstractClient, self).__init__()
self._socket = None
self._timeout = timeout

@abstractmethod
@Slot(str, int)
def start(self, ip: str, port: int):
"""Start client thread and connect to server."""
pass

@abstractmethod
@Slot(bytes)
def write(self, data: bytes):
"""Write data to server.
Args:
data (bytes): Data to write.
"""
pass

@Slot(str, int)
def on_connected(self, ip, port):
"""Called when client connects to server.
Emits connected signal.
Args:
ip (str): Client ip address.
port (int): Client port.
"""
self.connected.emit(ip, port)

def on_message(self, message: bytes):
"""Called when client receives message from server.
Emits message signal.
Args:
message (bytes): Message.
"""
self.message.emit(message)

@Slot()
def on_disconnected(self):
"""Called when device disconnects from server.
Emits disconnected signal."""
self.disconnected.emit()

@Slot(str)
def on_error(self, error: str):
"""Called when a socket error occurs.
Emits error signal.
Args:
error (str): Error string.
"""
self.error.emit(Exception, error)

@Slot()
def on_failed_to_connect(self):
"""Called when client fails to connect to server.
Emits failed_to_connect signal.
"""
self.failed_to_connect.emit()

@Slot()
def on_closed(self):
"""Called when the socket is closed.
Emits closed signal."""
self.closed.emit()

@Slot()
def close(self):
"""Disconnect from server and close socket."""
if self._socket:
self._socket.close()
self._socket = None
self.closed.emit()

@Slot()
def is_running(self):
"""Check if client is running."""
return self._socket is not None
6 changes: 6 additions & 0 deletions QtPyNetwork/client/QThreadedClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ def _disconnect(self):

@Slot()
def close(self):
print("Pre _disconnect")
self._disconnect()
print("Pre tcpsocket.close")
self.tcpSocket.close()
print("Pre closed.emit")
self.closed.emit()


Expand Down Expand Up @@ -257,8 +260,11 @@ def write(self, data: bytes):
def close(self):
"""Disconnect from server and close socket."""
if self.__client and self.__client_thread:
print("__client and __client_thread")
self.__client.close_signal.emit()
print("Signal emited")
self.__client_thread.quit()
print("_client_thread quited")
else:
self.error.emit("Client not running")

Expand Down
Loading

0 comments on commit bc49717

Please sign in to comment.