diff --git a/src/core/Client.py b/src/core/Client.py new file mode 100644 index 0000000..1ab511e --- /dev/null +++ b/src/core/Client.py @@ -0,0 +1,280 @@ +import asyncio +import math +import zlib + +from core import utils + + +class Client: + + def __init__(self, reader, writer, core): + self.reader = reader + self.writer = writer + self.down_rw = (None, None) + self.log = utils.get_logger("client(None:0)") + self.addr = writer.get_extra_info("sockname") + self.loop = asyncio.get_event_loop() + self.Core = core + self.cid = -1 + self.key = None + self.nick = None + self.roles = None + self.guest = True + self.alive = True + self.ready = False + + def _update_logger(self): + self.log = utils.get_logger(f"{self.nick}:{self.cid})") + self.log.debug(f"Update logger") + + def is_disconnected(self): + if not self.alive: + return True + res = self.writer.is_closing() + if res: + self.log.debug(f"Client Disconnected") + self.alive = False + return True + else: + self.log.debug(f"Client Alive") + self.alive = True + return False + + async def kick(self, reason): + if not self.alive: + self.log.debug(f"Kick({reason}) skipped;") + return + self.log.info(f"Kicked with reason: \"{reason}\"") + await self.tcp_send(b"K" + bytes(reason, "utf-8")) + self.alive = False + # await self.remove_me() + + async def tcp_send(self, data, to_all=False, writer=None): + + # TNetwork.cpp; Line: 383 + # BeamMP TCP protocol sends a header of 4 bytes, followed by the data. + # [][][][][][]...[] + # ^------^^---...-^ + # size data + + if writer is None: + writer = self.writer + + if to_all: + for client in self.Core.clients: + if not client: + continue + await client.tcp_send(data) + return + + # self.log.debug(f"tcp_send({data})") + if len(data) == 10: + data += b"." + header = len(data).to_bytes(4, "little", signed=True) + self.log.debug(f'len: {len(data)}; send: {header + data}') + try: + writer.write(header + data) + await writer.drain() + except ConnectionError: + self.log.debug('tcp_send: Disconnected') + self.alive = False + + async def recv(self): + try: + header = await self.reader.read(4) # header: 4 bytes + + int_header = 0 + for i in range(len(header)): + int_header += header[i] + + if int_header <= 0: + await asyncio.sleep(0.1) + self.is_disconnected() + if self.alive: + self.log.debug(f"Header: {header}") + await self.kick("Invalid packet - header negative") + return b"" + + if int_header > 100 * MB: + await self.kick("Header size limit exceeded") + self.log.warn(f"Client {self.nick}:{self.cid} sent header of >100MB - " + f"assuming malicious intent and disconnecting the client.") + return b"" + + data = await self.reader.read(100 * MB) + self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;") + + if len(data) != int_header: + self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}") + + abg = b"ABG:" + if len(data) > len(abg) and data.startswith(abg): + data = zlib.decompress(data[len(abg):]) + self.log.debug(f"ABG: {data}") + return data + return data + except ConnectionError: + self.alive = False + return b"" + + async def _split_load(self, start, end, d_sock, filename): + real_size = end - start + writer = self.down_rw[1] if d_sock else self.writer + who = 'dwn' if d_sock else 'srv' + self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}") + + with open(filename, 'rb') as f: + f.seek(start) + data = f.read(end) + try: + writer.write(data) + await writer.drain() + self.log.debug(f"[{who}] File sent.") + except ConnectionError: + self.alive = False + self.log.debug(f"[{who}] Disconnected.") + # break + return real_size + + # chunk_size = 125 * MB + # if chunk_size > real_size: + # chunk_size = real_size + # chunks = math.floor(real_size / chunk_size) + # self.log.debug(f"[{who}] s:{start}, e:{end}, c:{chunks}, cz:{chunk_size/MB}mb, rs:{real_size/MB}mb") + # dw = 0 + # for chunk in range(1, chunks + 1): + # chunk_end = start + (chunk_size * chunk) + # chunk_start = chunk_end - chunk_size + # # if chunk_start != 0: + # # chunk_start -= 1 + # real_size -= chunk_size + # if chunk_size > real_size: + # chunk_end = real_size + # self.log.debug(f"[{who}] Chunk: {chunk}; Start: {chunk_start}; End: {chunk_end/MB};") + # with open(filename, 'rb') as f: + # f.seek(chunk_start) + # data = f.read(chunk_end) + # try: + # writer.write(data) + # await writer.drain() + # except ConnectionError: + # self.alive = False + # self.log.debug(f"[{who}] Disconnected") + # break + # dw += len(data) + # del data + # self.log.debug(f"[{who}] File sent.") + # return dw + + async def sync_resources(self): + while self.alive: + data = await self.recv() + self.log.debug(f"data: {data!r}") + if data.startswith(b"f"): + file = data[1:].decode("utf-8") + self.log.debug(f"Sending File: {file}") + size = -1 + for mod in self.Core.mods_list: + if type(mod) == int: + continue + if mod.get('path') == file: + size = mod['size'] + self.log.debug("File is accept.") + break + if size == -1: + await self.tcp_send(b"CO") + await self.kick(f"Not allowed mod: " + file) + return + await self.tcp_send(b"AG") + t = 0 + while not self.down_rw[0]: + await asyncio.sleep(0.1) + t += 1 + if t > 50: + await self.kick("Missing download socket") + return + self.log.info(f"Requested mode: {file!r}") + self.log.debug(f"Mode size: {size / MB}") + + msize = math.floor(size / 2) + # uploads = [ + # asyncio.create_task(self._split_load(0, msize, False, file)), # SplitLoad_0 + # asyncio.create_task(self._split_load(msize, size, True, file)) # SplitLoad_1 + # ] + # await asyncio.wait(uploads) + uploads = [ + self._split_load(0, msize, False, file), + self._split_load(msize, size, True, file) + ] + sl0, sl1 = await asyncio.gather(*uploads) + sent = sl0 + sl1 + ok = sent == size + lost = size - sent + self.log.debug(f"SplitLoad_0: {sl0}; SplitLoad_1: {sl1}; At all ({ok}): Sent: {sent}; Lost: {lost}") + self.log.debug(f"SplitLoad_0: {sl0 / MB}mb; " + f"SplitLoad_1: {sl1 / MB}MB; At all ({ok}): Sent: {sent / MB}mb; Lost: {lost / MB}mb") + if not ok: + self.alive = False + self.log.error(f"Error while sending.") + return + elif data.startswith(b"SR"): + path_list = '' + size_list = '' + for mod in self.Core.mods_list: + if type(mod) == int: + continue + path_list += f"{mod['path']};" + size_list += f"{mod['size']};" + mod_list = path_list + size_list + self.log.debug(f"Mods List: {mod_list}") + if len(mod_list) == 0: + await self.tcp_send(b"-") + else: + await self.tcp_send(bytes(mod_list, "utf-8")) + elif data == b"Done": + await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") + break + return + + async def looper(self): + await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8")) # Send clientID + await self.sync_resources() + while self.alive: + data = await self.recv() + if data == b"": + if not self.alive: + break + else: + await asyncio.sleep(.2) + self.is_disconnected() + continue + code = data.decode()[0] + self.log.debug(f"Received code: {code}, data: {data}") + match code: + case "H": + # Client connected + self.ready = True + await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True) + case "C": + # Chat + await self.tcp_send(data, to_all=True) + + async def remove_me(self): + await asyncio.sleep(0.3) + self.alive = False + if (self.cid > 0 or self.nick is not None) and \ + self.Core.clients_by_nick.get(self.nick): + # if self.ready: + # await self.tcp_send(b"", to_all=True) # I'm disconnected. + self.log.debug(f"Removing client {self.nick}:{self.cid}") + self.log.info("Disconnected") + self.Core.clients[self.cid] = None + self.Core.clients_by_id.pop(self.cid) + self.Core.clients_by_nick.pop(self.nick) + else: + self.log.debug(f"Removing client; Closing connection...") + if not self.writer.is_closing(): + self.writer.close() + _, down_w = self.down_rw + if down_w and not down_w.is_closing(): + down_w.close() diff --git a/src/core/Client.pyi b/src/core/Client.pyi new file mode 100644 index 0000000..6e0fccb --- /dev/null +++ b/src/core/Client.pyi @@ -0,0 +1,33 @@ +import asyncio +from asyncio import StreamReader, StreamWriter +from typing import Tuple + +from core import Core, utils + + +class Client: + + def __init__(self, reader: StreamReader, writer: StreamWriter, core: Core) -> "Client": + self.reader = reader + self.writer = writer + self.down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) + self.log = utils.get_logger("client(id: )") + self.addr = writer.get_extra_info("sockname") + self.loop = asyncio.get_event_loop() + self.Core = core + self.cid: int = -1 + self.key: str = None + self.nick: str = None + self.roles: str = None + self.guest = True + self.alive = True + self.ready = False + def is_disconnected(self) -> bool: ... + async def kick(self, reason: str) -> None: ... + async def tcp_send(self, data: bytes, to_all:bool = False, writer: StreamWriter = None) -> None: ... + async def sync_resources(self) -> None: ... + async def recv(self) -> bytes: ... + async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... + async def looper(self) -> None: ... + def _update_logger(self) -> None: ... + async def remove_me(self) -> None: ... diff --git a/src/core/core.py b/src/core/core.py index 3d94e20..8176906 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -5,296 +5,20 @@ # Licence: FPA # (c) kuitoi.su 2023 import asyncio -import math import os import random -import zlib from threading import Thread import aiohttp import uvicorn from core import utils +from core.Client import Client from core.tcp_server import TCPServer from core.udp_server import UDPServer from modules.WebAPISystem import app as webapp -class Client: - - def __init__(self, reader, writer, core): - self.reader = reader - self.writer = writer - self.down_rw = (None, None) - self.log = utils.get_logger("client(None:0)") - self.addr = writer.get_extra_info("sockname") - self.loop = asyncio.get_event_loop() - self.Core = core - self.cid = -1 - self.key = None - self.nick = None - self.roles = None - self.guest = True - self.alive = True - self.ready = False - - def _update_logger(self): - self.log = utils.get_logger(f"{self.nick}:{self.cid})") - self.log.debug(f"Update logger") - - def is_disconnected(self): - if not self.alive: - return True - res = self.writer.is_closing() - if res: - self.log.debug(f"Client Disconnected") - self.alive = False - return True - else: - self.log.debug(f"Client Alive") - self.alive = True - return False - - async def kick(self, reason): - if not self.alive: - self.log.debug(f"Kick({reason}) skipped;") - return - self.log.info(f"Kicked with reason: \"{reason}\"") - await self.tcp_send(b"K" + bytes(reason, "utf-8")) - self.alive = False - # await self.remove_me() - - async def tcp_send(self, data, to_all=False, writer=None): - - # TNetwork.cpp; Line: 383 - # BeamMP TCP protocol sends a header of 4 bytes, followed by the data. - # [][][][][][]...[] - # ^------^^---...-^ - # size data - - if writer is None: - writer = self.writer - - if to_all: - for client in self.Core.clients: - if not client: - continue - await client.tcp_send(data) - return - - # self.log.debug(f"tcp_send({data})") - if len(data) == 10: - data += b"." - header = len(data).to_bytes(4, "little", signed=True) - self.log.debug(f'len: {len(data)}; send: {header + data}') - try: - writer.write(header + data) - await writer.drain() - except ConnectionError: - self.log.debug('tcp_send: Disconnected') - self.alive = False - - async def recv(self): - try: - header = await self.reader.read(4) # header: 4 bytes - - int_header = 0 - for i in range(len(header)): - int_header += header[i] - - if int_header <= 0: - await asyncio.sleep(0.1) - self.is_disconnected() - if self.alive: - self.log.debug(f"Header: {header}") - await self.kick("Invalid packet - header negative") - return b"" - - if int_header > 100 * MB: - await self.kick("Header size limit exceeded") - self.log.warn(f"Client {self.nick}:{self.cid} sent header of >100MB - " - f"assuming malicious intent and disconnecting the client.") - return b"" - - data = await self.reader.read(100 * MB) - self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;") - - if len(data) != int_header: - self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}") - - abg = b"ABG:" - if len(data) > len(abg) and data.startswith(abg): - data = zlib.decompress(data[len(abg):]) - self.log.debug(f"ABG: {data}") - return data - return data - except ConnectionError: - self.alive = False - return b"" - - async def _split_load(self, start, end, d_sock, filename): - real_size = end - start - writer = self.down_rw[1] if d_sock else self.writer - who = 'dwn' if d_sock else 'srv' - self.log.debug(f"[{who}] Real size: {real_size/MB}mb; {real_size == end}, {real_size*2 == end}") - - with open(filename, 'rb') as f: - f.seek(start) - data = f.read(end) - try: - writer.write(data) - await writer.drain() - self.log.debug(f"[{who}] File sent.") - except ConnectionError: - self.alive = False - self.log.debug(f"[{who}] Disconnected.") - # break - return real_size - - # chunk_size = 125 * MB - # if chunk_size > real_size: - # chunk_size = real_size - # chunks = math.floor(real_size / chunk_size) - # self.log.debug(f"[{who}] s:{start}, e:{end}, c:{chunks}, cz:{chunk_size/MB}mb, rs:{real_size/MB}mb") - # dw = 0 - # for chunk in range(1, chunks + 1): - # chunk_end = start + (chunk_size * chunk) - # chunk_start = chunk_end - chunk_size - # # if chunk_start != 0: - # # chunk_start -= 1 - # real_size -= chunk_size - # if chunk_size > real_size: - # chunk_end = real_size - # self.log.debug(f"[{who}] Chunk: {chunk}; Start: {chunk_start}; End: {chunk_end/MB};") - # with open(filename, 'rb') as f: - # f.seek(chunk_start) - # data = f.read(chunk_end) - # try: - # writer.write(data) - # await writer.drain() - # except ConnectionError: - # self.alive = False - # self.log.debug(f"[{who}] Disconnected") - # break - # dw += len(data) - # del data - # self.log.debug(f"[{who}] File sent.") - # return dw - - async def sync_resources(self): - while self.alive: - data = await self.recv() - self.log.debug(f"data: {data!r}") - if data.startswith(b"f"): - file = data[1:].decode("utf-8") - self.log.debug(f"Sending File: {file}") - size = -1 - for mod in self.Core.mods_list: - if type(mod) == int: - continue - if mod.get('path') == file: - size = mod['size'] - self.log.debug("File is accept.") - break - if size == -1: - await self.tcp_send(b"CO") - await self.kick(f"Not allowed mod: " + file) - return - await self.tcp_send(b"AG") - t = 0 - while not self.down_rw[0]: - await asyncio.sleep(0.1) - t += 1 - if t > 50: - await self.kick("Missing download socket") - return - self.log.info(f"Requested mode: {file!r}") - self.log.debug(f"Mode size: {size/MB}") - - msize = math.floor(size / 2) - # uploads = [ - # asyncio.create_task(self._split_load(0, msize, False, file)), # SplitLoad_0 - # asyncio.create_task(self._split_load(msize, size, True, file)) # SplitLoad_1 - # ] - # await asyncio.wait(uploads) - uploads = [ - self._split_load(0, msize, False, file), - self._split_load(msize, size, True, file) - ] - sl0, sl1 = await asyncio.gather(*uploads) - sent = sl0 + sl1 - ok = sent == size - lost = size - sent - self.log.debug(f"SplitLoad_0: {sl0}; SplitLoad_1: {sl1}; At all ({ok}): Sent: {sent}; Lost: {lost}") - self.log.debug(f"SplitLoad_0: {sl0/MB}mb; " - f"SplitLoad_1: {sl1/MB}MB; At all ({ok}): Sent: {sent/MB}mb; Lost: {lost/MB}mb") - if not ok: - self.alive = False - self.log.error(f"Error while sending.") - return - elif data.startswith(b"SR"): - path_list = '' - size_list = '' - for mod in self.Core.mods_list: - if type(mod) == int: - continue - path_list += f"{mod['path']};" - size_list += f"{mod['size']};" - mod_list = path_list + size_list - self.log.debug(f"Mods List: {mod_list}") - if len(mod_list) == 0: - await self.tcp_send(b"-") - else: - await self.tcp_send(bytes(mod_list, "utf-8")) - elif data == b"Done": - await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") - break - return - - async def looper(self): - await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8")) # Send clientID - await self.sync_resources() - while self.alive: - data = await self.recv() - if data == b"": - if not self.alive: - break - else: - await asyncio.sleep(.2) - self.is_disconnected() - continue - code = data.decode()[0] - self.log.debug(f"Received code: {code}, data: {data}") - match code: - case "H": - # Client connected - self.ready = True - await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True) - case "C": - # Chat - await self.tcp_send(data, to_all=True) - - async def remove_me(self): - await asyncio.sleep(0.3) - self.alive = False - if (self.cid > 0 or self.nick is not None) and \ - self.Core.clients_by_nick.get(self.nick): - # if self.ready: - # await self.tcp_send(b"", to_all=True) # I'm disconnected. - self.log.debug(f"Removing client {self.nick}:{self.cid}") - self.log.info("Disconnected") - self.Core.clients[self.cid] = None - self.Core.clients_by_id.pop(self.cid) - self.Core.clients_by_nick.pop(self.nick) - else: - self.log.debug(f"Removing client; Closing connection...") - if not self.writer.is_closing(): - self.writer.close() - _, down_w = self.down_rw - if down_w and not down_w.is_closing(): - down_w.close() - - class Core: def __init__(self): diff --git a/src/core/core.pyi b/src/core/core.pyi index d041217..164bf16 100644 --- a/src/core/core.pyi +++ b/src/core/core.pyi @@ -5,42 +5,15 @@ # Licence: FPA # (c) kuitoi.su 2023 import asyncio -from asyncio import StreamWriter, StreamReader from threading import Thread -from typing import Callable, List, Dict, Tuple +from typing import Callable, List, Dict from core import utils +from .Client import Client from .tcp_server import TCPServer from .udp_server import UDPServer -class Client: - - def __init__(self, reader: StreamReader, writer: StreamWriter, core: Core) -> "Client": - self.reader = reader - self.writer = writer - self.down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) - self.log = utils.get_logger("client(id: )") - self.addr = writer.get_extra_info("sockname") - self.loop = asyncio.get_event_loop() - self.Core = core - self.cid: int = -1 - self.key: str = None - self.nick: str = None - self.roles: str = None - self.guest = True - self.alive = True - self.ready = False - def is_disconnected(self) -> bool: ... - async def kick(self, reason: str) -> None: ... - async def tcp_send(self, data: bytes, to_all:bool = False, writer: StreamWriter = None) -> None: ... - async def sync_resources(self) -> None: ... - async def recv(self) -> bytes: ... - async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... - async def looper(self) -> None: ... - def _update_logger(self) -> None: ... - async def remove_me(self) -> None: ... - class Core: def __init__(self): self.log = utils.get_logger("core")