-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket_server.py
134 lines (109 loc) · 4.86 KB
/
socket_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
socket_file_transfer/socket_client.py
Main script for accessing the server classes.
"""
import tqdm
import ast
import socket
from datetime import datetime
from abc import ABCMeta, abstractmethod, ABC
from validator import Validator
from init_handler import InitEnv
from cryptography import CryptoHandler
from python_datalogger import DataLogger
class ISocketServer(metaclass=ABCMeta):
@abstractmethod
def print_data(self):
"""Implemented in child class"""
class SocketServer(ISocketServer, ABC):
__instance = None
@staticmethod
def get_instance():
if SocketServer.__instance is None:
SocketServer(key=b"SocketClientsPWD", nonce=b"SocketClientsNCE", file="test.txt")
return SocketServer.__instance
def __init__(self, key: bytes = b'DefaultSampleKey', nonce: bytes = b'DefaultSampleNCE',
receive: bool = False, file: str = "test.txt"):
InitEnv.init_env()
self.__logger = DataLogger(name="SocketServer", propagate=True, level="DEBUG")
if SocketServer.__instance is not None:
raise Exception("Cannot be instantiated more than once")
else:
self.__key = key
self.__nonce = nonce
self.__receive = receive
self.__file = f"receive/{file}"
self.__validator = Validator(key=self.__key, nonce=self.__nonce, file=self.__file)
self.__hashes = self.__validator.get_hashes()
self.__server = self.__get_server()
self.__cipher = CryptoHandler(key=self.__key, nonce=self.__nonce)
self.__progress = None
if self.__validator.validate_length():
self.__validity = True
else:
raise Exception("\nPassword and Nonce must be exactly 16 characters long !\n")
SocketServer.__instance = self
@staticmethod
def print_data(**kwargs):
print(SocketServer.__instance.__receive)
@staticmethod
def __get_time():
return datetime.now().strftime('%H:%M:%S')
@staticmethod
@DataLogger.logger
def __get_server():
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@staticmethod
def __get_progress(file_size: str):
return tqdm.tqdm(unit="B", unit_scale=True, unit_divisor=1000, total=float(file_size))
@DataLogger.logger
def receive_data(self, port: int = 8999):
if self.__receive:
self.__server.bind(('localhost', port))
self.__server.listen()
client, addr = self.__server.accept()
if self.__validity:
file_name = client.recv(1024).decode()
self.__logger.log_info(f"Received file name - {datetime.now().time()} : ({file_name})")
file_size = client.recv(1024).decode()
self.__logger.log_info(f"Received file size - {datetime.now().time()} : ({file_size}B)")
hashes = client.recv(1024).decode()
clear_hash = ast.literal_eval(hashes)
self.__logger.log_info(f"Received hashes - {datetime.now().time()}\n")
if self.__validator.check_hashes(hashes=clear_hash):
self.__progress = self.__get_progress(file_size=file_size)
file = open(f"receive/{file_name}", 'wb')
done = False
encrypted_data = b""
while not done:
data = client.recv(1024)
if data[-5:] == b"<END>":
done = True
else:
encrypted_data += data
self.__progress.update(1024)
del self.__progress
data = self.__cipher.decrypt(encrypted_data)
file.write(data)
print(" ")
self.__logger.log_info(f"File received - {datetime.now().time()}")
file.close()
"""
if self.__validator.verify_file(file_hash=clear_hash["file_hash"], path=self.__file):
self.__logger.log_info(f"File verified - {datetime.now().time()}")
else:
try:
os.remove(self.__file)
self.__logger.log_info(f"File deleted - {datetime.now().time()}")
except FileNotFoundError:
pass
"""
else:
self.__logger.log_critical("Hashes doesn't match, terminating the server")
client.close()
self.__server.close()
self.__logger.log_info("Client, Server connections terminated")
else:
self.__logger.log_critical("(receive) parameter must be True to call this method")
def __repr__(self):
return f"{self.__hashes}"