diff --git a/src/dbus_fast/aio/message_bus.py b/src/dbus_fast/aio/message_bus.py index 9f1dca77..71b97eb3 100644 --- a/src/dbus_fast/aio/message_bus.py +++ b/src/dbus_fast/aio/message_bus.py @@ -4,7 +4,7 @@ import socket from collections import deque from copy import copy -from typing import Any, Optional +from typing import Any, List, Optional, Tuple from .. import introspection as intr from ..auth import Authenticator, AuthExternal @@ -37,28 +37,38 @@ def _future_set_result(fut: asyncio.Future, result: Any) -> None: class _MessageWriter: + """A class to handle writing messages to the message bus.""" + def __init__(self, bus: "MessageBus") -> None: - self.messages = deque() + """A class to handle writing messages to the message bus.""" + self.messages: deque[ + Tuple[bytearray, Optional[List[int]], Optional[asyncio.Future]] + ] = deque() self.negotiate_unix_fd = bus._negotiate_unix_fd self.bus = bus self.sock = bus._sock self.loop = bus._loop - self.buf = None + self.buf: Optional[memoryview] = None self.fd = bus._fd self.offset = 0 - self.unix_fds = None + self.unix_fds: Optional[List[int]] = None self.fut: Optional[asyncio.Future] = None def write_callback(self, remove_writer: bool = True) -> None: + """The callback to write messages to the message bus.""" + sock = self.sock try: while True: if self.buf is None: + # If there is no buffer, get the next message if not self.messages: # nothing more to write if remove_writer: self.loop.remove_writer(self.fd) return - buf, unix_fds, fut = self.messages.pop() + + # Get the next message + buf, unix_fds, fut = self.messages.popleft() self.unix_fds = unix_fds self.buf = memoryview(buf) self.offset = 0 @@ -72,18 +82,18 @@ def write_callback(self, remove_writer: bool = True) -> None: array.array("i", self.unix_fds), ) ] - self.offset += self.sock.sendmsg([self.buf[self.offset :]], ancdata) + self.offset += sock.sendmsg([self.buf[self.offset :]], ancdata) self.unix_fds = None else: - self.offset += self.sock.send(self.buf[self.offset :]) + self.offset += sock.send(self.buf[self.offset :]) - if self.offset >= len(self.buf): - # finished writing - self.buf = None - _future_set_result(self.fut, None) - else: + if self.offset < len(self.buf): # wait for writable return + + # finished writing + self.buf = None + _future_set_result(self.fut, None) except Exception as e: if self.bus._user_disconnect: _future_set_result(self.fut, None) @@ -91,11 +101,15 @@ def write_callback(self, remove_writer: bool = True) -> None: _future_set_exception(self.fut, e) self.bus._finalize(e) - def buffer_message(self, msg: Message, future=None) -> None: + def buffer_message( + self, msg: Message, future: Optional[asyncio.Future] = None + ) -> None: + """Buffer a message to be sent later.""" + unix_fds = msg.unix_fds self.messages.append( ( msg._marshall(self.negotiate_unix_fd), - copy(msg.unix_fds), + copy(unix_fds) if unix_fds else None, future, ) ) @@ -104,10 +118,14 @@ def _write_without_remove_writer(self) -> None: """Call the write callback without removing the writer.""" self.write_callback(remove_writer=False) - def schedule_write(self, msg: Optional[Message] = None, future=None) -> None: + def schedule_write( + self, msg: Optional[Message] = None, future: Optional[asyncio.Future] = None + ) -> None: + """Schedule a message to be written.""" queue_is_empty = not self.messages if msg is not None: self.buffer_message(msg, future) + if self.bus.unique_name: # Optimization: try to send now if the queue # is empty. With bleak this usually means we @@ -115,6 +133,7 @@ def schedule_write(self, msg: Optional[Message] = None, future=None) -> None: # is a huge improvement in latency. if queue_is_empty: self._write_without_remove_writer() + if ( self.buf is not None or self.messages