-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a7a9f36
commit c4fe201
Showing
4 changed files
with
316 additions
and
306 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,280 @@ | ||
import asyncio | ||
import math | ||
import zlib | ||
|
||
from core import utils | ||
|
||
|
||
class Client: | ||
|
||
def __init__(self, reader, writer, core): | ||
self.reader = reader | ||
self.writer = writer | ||
self.down_rw = (None, None) | ||
self.log = utils.get_logger("client(None:0)") | ||
self.addr = writer.get_extra_info("sockname") | ||
self.loop = asyncio.get_event_loop() | ||
self.Core = core | ||
self.cid = -1 | ||
self.key = None | ||
self.nick = None | ||
self.roles = None | ||
self.guest = True | ||
self.alive = True | ||
self.ready = False | ||
|
||
def _update_logger(self): | ||
self.log = utils.get_logger(f"{self.nick}:{self.cid})") | ||
self.log.debug(f"Update logger") | ||
|
||
def is_disconnected(self): | ||
if not self.alive: | ||
return True | ||
res = self.writer.is_closing() | ||
if res: | ||
self.log.debug(f"Client Disconnected") | ||
self.alive = False | ||
return True | ||
else: | ||
self.log.debug(f"Client Alive") | ||
self.alive = True | ||
return False | ||
|
||
async def kick(self, reason): | ||
if not self.alive: | ||
self.log.debug(f"Kick({reason}) skipped;") | ||
return | ||
self.log.info(f"Kicked with reason: \"{reason}\"") | ||
await self.tcp_send(b"K" + bytes(reason, "utf-8")) | ||
self.alive = False | ||
# await self.remove_me() | ||
|
||
async def tcp_send(self, data, to_all=False, writer=None): | ||
|
||
# TNetwork.cpp; Line: 383 | ||
# BeamMP TCP protocol sends a header of 4 bytes, followed by the data. | ||
# [][][][][][]...[] | ||
# ^------^^---...-^ | ||
# size data | ||
|
||
if writer is None: | ||
writer = self.writer | ||
|
||
if to_all: | ||
for client in self.Core.clients: | ||
if not client: | ||
continue | ||
await client.tcp_send(data) | ||
return | ||
|
||
# self.log.debug(f"tcp_send({data})") | ||
if len(data) == 10: | ||
data += b"." | ||
header = len(data).to_bytes(4, "little", signed=True) | ||
self.log.debug(f'len: {len(data)}; send: {header + data}') | ||
try: | ||
writer.write(header + data) | ||
await writer.drain() | ||
except ConnectionError: | ||
self.log.debug('tcp_send: Disconnected') | ||
self.alive = False | ||
|
||
async def recv(self): | ||
try: | ||
header = await self.reader.read(4) # header: 4 bytes | ||
|
||
int_header = 0 | ||
for i in range(len(header)): | ||
int_header += header[i] | ||
|
||
if int_header <= 0: | ||
await asyncio.sleep(0.1) | ||
self.is_disconnected() | ||
if self.alive: | ||
self.log.debug(f"Header: {header}") | ||
await self.kick("Invalid packet - header negative") | ||
return b"" | ||
|
||
if int_header > 100 * MB: | ||
await self.kick("Header size limit exceeded") | ||
self.log.warn(f"Client {self.nick}:{self.cid} sent header of >100MB - " | ||
f"assuming malicious intent and disconnecting the client.") | ||
return b"" | ||
|
||
data = await self.reader.read(100 * MB) | ||
self.log.debug(f"header: `{header}`; int_header: `{int_header}`; data: `{data}`;") | ||
|
||
if len(data) != int_header: | ||
self.log.debug(f"WARN Expected to read {int_header} bytes, instead got {len(data)}") | ||
|
||
abg = b"ABG:" | ||
if len(data) > len(abg) and data.startswith(abg): | ||
data = zlib.decompress(data[len(abg):]) | ||
self.log.debug(f"ABG: {data}") | ||
return data | ||
return data | ||
except ConnectionError: | ||
self.alive = False | ||
return b"" | ||
|
||
async def _split_load(self, start, end, d_sock, filename): | ||
real_size = end - start | ||
writer = self.down_rw[1] if d_sock else self.writer | ||
who = 'dwn' if d_sock else 'srv' | ||
self.log.debug(f"[{who}] Real size: {real_size / MB}mb; {real_size == end}, {real_size * 2 == end}") | ||
|
||
with open(filename, 'rb') as f: | ||
f.seek(start) | ||
data = f.read(end) | ||
try: | ||
writer.write(data) | ||
await writer.drain() | ||
self.log.debug(f"[{who}] File sent.") | ||
except ConnectionError: | ||
self.alive = False | ||
self.log.debug(f"[{who}] Disconnected.") | ||
# break | ||
return real_size | ||
|
||
# chunk_size = 125 * MB | ||
# if chunk_size > real_size: | ||
# chunk_size = real_size | ||
# chunks = math.floor(real_size / chunk_size) | ||
# self.log.debug(f"[{who}] s:{start}, e:{end}, c:{chunks}, cz:{chunk_size/MB}mb, rs:{real_size/MB}mb") | ||
# dw = 0 | ||
# for chunk in range(1, chunks + 1): | ||
# chunk_end = start + (chunk_size * chunk) | ||
# chunk_start = chunk_end - chunk_size | ||
# # if chunk_start != 0: | ||
# # chunk_start -= 1 | ||
# real_size -= chunk_size | ||
# if chunk_size > real_size: | ||
# chunk_end = real_size | ||
# self.log.debug(f"[{who}] Chunk: {chunk}; Start: {chunk_start}; End: {chunk_end/MB};") | ||
# with open(filename, 'rb') as f: | ||
# f.seek(chunk_start) | ||
# data = f.read(chunk_end) | ||
# try: | ||
# writer.write(data) | ||
# await writer.drain() | ||
# except ConnectionError: | ||
# self.alive = False | ||
# self.log.debug(f"[{who}] Disconnected") | ||
# break | ||
# dw += len(data) | ||
# del data | ||
# self.log.debug(f"[{who}] File sent.") | ||
# return dw | ||
|
||
async def sync_resources(self): | ||
while self.alive: | ||
data = await self.recv() | ||
self.log.debug(f"data: {data!r}") | ||
if data.startswith(b"f"): | ||
file = data[1:].decode("utf-8") | ||
self.log.debug(f"Sending File: {file}") | ||
size = -1 | ||
for mod in self.Core.mods_list: | ||
if type(mod) == int: | ||
continue | ||
if mod.get('path') == file: | ||
size = mod['size'] | ||
self.log.debug("File is accept.") | ||
break | ||
if size == -1: | ||
await self.tcp_send(b"CO") | ||
await self.kick(f"Not allowed mod: " + file) | ||
return | ||
await self.tcp_send(b"AG") | ||
t = 0 | ||
while not self.down_rw[0]: | ||
await asyncio.sleep(0.1) | ||
t += 1 | ||
if t > 50: | ||
await self.kick("Missing download socket") | ||
return | ||
self.log.info(f"Requested mode: {file!r}") | ||
self.log.debug(f"Mode size: {size / MB}") | ||
|
||
msize = math.floor(size / 2) | ||
# uploads = [ | ||
# asyncio.create_task(self._split_load(0, msize, False, file)), # SplitLoad_0 | ||
# asyncio.create_task(self._split_load(msize, size, True, file)) # SplitLoad_1 | ||
# ] | ||
# await asyncio.wait(uploads) | ||
uploads = [ | ||
self._split_load(0, msize, False, file), | ||
self._split_load(msize, size, True, file) | ||
] | ||
sl0, sl1 = await asyncio.gather(*uploads) | ||
sent = sl0 + sl1 | ||
ok = sent == size | ||
lost = size - sent | ||
self.log.debug(f"SplitLoad_0: {sl0}; SplitLoad_1: {sl1}; At all ({ok}): Sent: {sent}; Lost: {lost}") | ||
self.log.debug(f"SplitLoad_0: {sl0 / MB}mb; " | ||
f"SplitLoad_1: {sl1 / MB}MB; At all ({ok}): Sent: {sent / MB}mb; Lost: {lost / MB}mb") | ||
if not ok: | ||
self.alive = False | ||
self.log.error(f"Error while sending.") | ||
return | ||
elif data.startswith(b"SR"): | ||
path_list = '' | ||
size_list = '' | ||
for mod in self.Core.mods_list: | ||
if type(mod) == int: | ||
continue | ||
path_list += f"{mod['path']};" | ||
size_list += f"{mod['size']};" | ||
mod_list = path_list + size_list | ||
self.log.debug(f"Mods List: {mod_list}") | ||
if len(mod_list) == 0: | ||
await self.tcp_send(b"-") | ||
else: | ||
await self.tcp_send(bytes(mod_list, "utf-8")) | ||
elif data == b"Done": | ||
await self.tcp_send(b"M/levels/" + bytes(config.Game['map'], 'utf-8') + b"/info.json") | ||
break | ||
return | ||
|
||
async def looper(self): | ||
await self.tcp_send(b"P" + bytes(f"{self.cid}", "utf-8")) # Send clientID | ||
await self.sync_resources() | ||
while self.alive: | ||
data = await self.recv() | ||
if data == b"": | ||
if not self.alive: | ||
break | ||
else: | ||
await asyncio.sleep(.2) | ||
self.is_disconnected() | ||
continue | ||
code = data.decode()[0] | ||
self.log.debug(f"Received code: {code}, data: {data}") | ||
match code: | ||
case "H": | ||
# Client connected | ||
self.ready = True | ||
await self.tcp_send(b"Sn" + bytes(self.nick, "utf-8"), to_all=True) | ||
case "C": | ||
# Chat | ||
await self.tcp_send(data, to_all=True) | ||
|
||
async def remove_me(self): | ||
await asyncio.sleep(0.3) | ||
self.alive = False | ||
if (self.cid > 0 or self.nick is not None) and \ | ||
self.Core.clients_by_nick.get(self.nick): | ||
# if self.ready: | ||
# await self.tcp_send(b"", to_all=True) # I'm disconnected. | ||
self.log.debug(f"Removing client {self.nick}:{self.cid}") | ||
self.log.info("Disconnected") | ||
self.Core.clients[self.cid] = None | ||
self.Core.clients_by_id.pop(self.cid) | ||
self.Core.clients_by_nick.pop(self.nick) | ||
else: | ||
self.log.debug(f"Removing client; Closing connection...") | ||
if not self.writer.is_closing(): | ||
self.writer.close() | ||
_, down_w = self.down_rw | ||
if down_w and not down_w.is_closing(): | ||
down_w.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import asyncio | ||
from asyncio import StreamReader, StreamWriter | ||
from typing import Tuple | ||
|
||
from core import Core, utils | ||
|
||
|
||
class Client: | ||
|
||
def __init__(self, reader: StreamReader, writer: StreamWriter, core: Core) -> "Client": | ||
self.reader = reader | ||
self.writer = writer | ||
self.down_rw: Tuple[StreamReader, StreamWriter] | Tuple[None, None] = (None, None) | ||
self.log = utils.get_logger("client(id: )") | ||
self.addr = writer.get_extra_info("sockname") | ||
self.loop = asyncio.get_event_loop() | ||
self.Core = core | ||
self.cid: int = -1 | ||
self.key: str = None | ||
self.nick: str = None | ||
self.roles: str = None | ||
self.guest = True | ||
self.alive = True | ||
self.ready = False | ||
def is_disconnected(self) -> bool: ... | ||
async def kick(self, reason: str) -> None: ... | ||
async def tcp_send(self, data: bytes, to_all:bool = False, writer: StreamWriter = None) -> None: ... | ||
async def sync_resources(self) -> None: ... | ||
async def recv(self) -> bytes: ... | ||
async def _split_load(self, start: int, end: int, d_sock: bool, filename: str) -> None: ... | ||
async def looper(self) -> None: ... | ||
def _update_logger(self) -> None: ... | ||
async def remove_me(self) -> None: ... |
Oops, something went wrong.