Skip to content

Commit

Permalink
fix: ensure the underlying socket is closed on disconnect (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Sep 9, 2022
1 parent 2355fa1 commit 6770a65
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions src/dbus_fast/aio/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
from asyncio import Queue
from copy import copy
from typing import Optional
from typing import Any, Optional

from .. import introspection as intr
from .._private.unmarshaller import Unmarshaller
Expand All @@ -24,18 +24,18 @@
from .proxy_object import ProxyObject


def _future_set_exception(fut, exc):
def _future_set_exception(fut: asyncio.Future, exc: Exception) -> None:
if fut is not None and not fut.done():
fut.set_exception(exc)


def _future_set_result(fut, result):
def _future_set_result(fut: asyncio.Future, result: Any) -> None:
if fut is not None and not fut.done():
fut.set_result(result)


class _MessageWriter:
def __init__(self, bus):
def __init__(self, bus: "MessageBus") -> None:
self.messages = Queue()
self.negotiate_unix_fd = bus._negotiate_unix_fd
self.bus = bus
Expand Down Expand Up @@ -82,7 +82,10 @@ def write_callback(self):
# wait for writable
return
except Exception as e:
_future_set_exception(self.fut, e)
if self.bus._user_disconnect:
_future_set_result(self.fut, None)
else:
_future_set_exception(self.fut, e)
self.bus._finalize(e)

def buffer_message(self, msg: Message, future=None):
Expand Down Expand Up @@ -229,7 +232,7 @@ async def introspect(
"""
future = self._loop.create_future()

def reply_handler(reply, err):
def reply_handler(reply: Any, err: Exception) -> None:
if err:
_future_set_exception(future, err)
else:
Expand Down Expand Up @@ -436,13 +439,24 @@ async def _authenticate(self):
if response == "BEGIN":
break

def _create_unmarshaller(self):
def disconnect(self):
"""Disconnect the message bus by closing the underlying connection asynchronously.
All pending and future calls will error with a connection error.
"""
super().disconnect()
try:
self._sock.close()
except Exception:
logging.warning("could not close socket", exc_info=True)

def _create_unmarshaller(self) -> Unmarshaller:
sock = None
if self._negotiate_unix_fd:
sock = self._sock
return Unmarshaller(self._stream, sock)

def _finalize(self, err=None):
def _finalize(self, err: Optional[Exception] = None) -> None:
try:
self._loop.remove_reader(self._fd)
except Exception:
Expand Down

0 comments on commit 6770a65

Please sign in to comment.