Skip to content
This repository has been archived by the owner on May 8, 2023. It is now read-only.

Commit

Permalink
Fix protocol error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfrbcsgo committed May 7, 2023
1 parent ae867a4 commit cca020e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
DESCRIPTION = "A single threaded, non blocking TCP server for WoT mods which makes use of the `async` / `await`."

RELEASE_DEPENDENCIES = [
"https://github.com/lgfrbcsgo/wot-async/releases/download/v0.3.2/lgfrbcsgo.async_0.3.2.wotmod",
"https://github.com/lgfrbcsgo/wot-async/releases/download/v0.3.3/lgfrbcsgo.async_0.3.3.wotmod",
]


Expand Down
44 changes: 29 additions & 15 deletions src/mod_async_server/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ def _park(parked, sock):


class StreamClosed(Exception):
pass
def __init__(self, stream):
self.stream = stream


class Stream(object):
def __init__(self, parking_lot, sock):
self._parking_lot = parking_lot
self._sock = sock
self._sock = sock # type: Optional[socket.socket]
self._write_mutex = AsyncMutex()
self._addr = self._sock.getsockname()[:2]
self._peer_addr = self._sock.getpeername()[:2]
Expand All @@ -95,29 +96,41 @@ def addr(self):
def peer_addr(self):
return self._peer_addr

@property
def closed(self):
return self._sock is None

def close(self):
if self.closed:
return

try:
self._parking_lot.close_socket(self._sock)
except Exception:
LOG_WARNING("Error closing stream.")
LOG_CURRENT_EXCEPTION()
finally:
self._sock = None

@async_task
def receive(self, max_length):
while True:
if self.closed:
raise StreamClosed(self)

try:
data = self._sock.recv(max_length)
except socket.error as e:
if e.args[0] in BLOCKS:
# socket not ready, wait until socket is ready
yield self._parking_lot.park_read(self._sock)
elif e.args[0] in DISCONNECTED:
raise StreamClosed()
self._sock = None
else:
raise
else:
if not data:
raise StreamClosed()
self._sock = None
else:
raise Return(data)

Expand All @@ -133,14 +146,17 @@ def send(self, data):
def _do_send(self, data):
data = memoryview(data)
while data:
if self.closed:
raise StreamClosed(self)

try:
bytes_sent = self._sock.send(data[:512])
except socket.error as e:
if e.args[0] in BLOCKS:
# socket not ready, wait until socket is ready
yield self._parking_lot.park_write(self._sock)
elif e.args[0] in DISCONNECTED:
raise StreamClosed()
self._sock = None
else:
raise
else:
Expand Down Expand Up @@ -197,8 +213,8 @@ def close(self):
self._listening_sock.close()
self._listening_sock = None

for sock in self._connections.itervalues():
sock.close()
for stream in self._connections.itervalues():
stream.close()

# wake up waiting futures to clean up protocol instances
self._parking_lot.close()
Expand Down Expand Up @@ -227,8 +243,8 @@ def _start_accepting(self):
@async_task
def _accept_connection(self, sock):
sock_fd = sock.fileno()
self._connections[sock_fd] = sock
stream = Stream(self._parking_lot, sock)
self._connections[sock_fd] = stream

host, port = stream.peer_addr
LOG_NOTE(
Expand All @@ -237,18 +253,16 @@ def _accept_connection(self, sock):

try:
yield self._protocol(self, stream)
except StreamClosed:
pass
except Exception:
LOG_WARNING("Protocol error")
LOG_CURRENT_EXCEPTION()
except Exception as e:
if not isinstance(e, StreamClosed) or e.stream != stream:
LOG_WARNING("Protocol error")
LOG_CURRENT_EXCEPTION()
finally:
LOG_NOTE(
"TCP: [{host}]:{port} disconnected.".format(host=host, port=port)
)

del self._connections[sock_fd]
sock.close()
stream.close()


def create_listening_socket(host, port):
Expand Down

0 comments on commit cca020e

Please sign in to comment.