Skip to content

Commit

Permalink
fix: messages could be sent out of order if they had to queue (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Aug 17, 2023
1 parent 8cb0a62 commit 4051cf2
Showing 1 changed file with 34 additions and 15 deletions.
49 changes: 34 additions & 15 deletions src/dbus_fast/aio/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
from collections import deque
from copy import copy
from typing import Any, Optional
from typing import Any, List, Optional, Tuple

from .. import introspection as intr
from ..auth import Authenticator, AuthExternal
Expand Down Expand Up @@ -37,28 +37,38 @@ def _future_set_result(fut: asyncio.Future, result: Any) -> None:


class _MessageWriter:
"""A class to handle writing messages to the message bus."""

def __init__(self, bus: "MessageBus") -> None:
self.messages = deque()
"""A class to handle writing messages to the message bus."""
self.messages: deque[
Tuple[bytearray, Optional[List[int]], Optional[asyncio.Future]]
] = deque()
self.negotiate_unix_fd = bus._negotiate_unix_fd
self.bus = bus
self.sock = bus._sock
self.loop = bus._loop
self.buf = None
self.buf: Optional[memoryview] = None
self.fd = bus._fd
self.offset = 0
self.unix_fds = None
self.unix_fds: Optional[List[int]] = None
self.fut: Optional[asyncio.Future] = None

def write_callback(self, remove_writer: bool = True) -> None:
"""The callback to write messages to the message bus."""
sock = self.sock
try:
while True:
if self.buf is None:
# If there is no buffer, get the next message
if not self.messages:
# nothing more to write
if remove_writer:
self.loop.remove_writer(self.fd)
return
buf, unix_fds, fut = self.messages.pop()

# Get the next message
buf, unix_fds, fut = self.messages.popleft()
self.unix_fds = unix_fds
self.buf = memoryview(buf)
self.offset = 0
Expand All @@ -72,30 +82,34 @@ def write_callback(self, remove_writer: bool = True) -> None:
array.array("i", self.unix_fds),
)
]
self.offset += self.sock.sendmsg([self.buf[self.offset :]], ancdata)
self.offset += sock.sendmsg([self.buf[self.offset :]], ancdata)
self.unix_fds = None
else:
self.offset += self.sock.send(self.buf[self.offset :])
self.offset += sock.send(self.buf[self.offset :])

if self.offset >= len(self.buf):
# finished writing
self.buf = None
_future_set_result(self.fut, None)
else:
if self.offset < len(self.buf):
# wait for writable
return

# finished writing
self.buf = None
_future_set_result(self.fut, None)
except Exception as e:
if self.bus._user_disconnect:
_future_set_result(self.fut, None)
else:
_future_set_exception(self.fut, e)
self.bus._finalize(e)

def buffer_message(self, msg: Message, future=None) -> None:
def buffer_message(
self, msg: Message, future: Optional[asyncio.Future] = None
) -> None:
"""Buffer a message to be sent later."""
unix_fds = msg.unix_fds
self.messages.append(
(
msg._marshall(self.negotiate_unix_fd),
copy(msg.unix_fds),
copy(unix_fds) if unix_fds else None,
future,
)
)
Expand All @@ -104,17 +118,22 @@ def _write_without_remove_writer(self) -> None:
"""Call the write callback without removing the writer."""
self.write_callback(remove_writer=False)

def schedule_write(self, msg: Optional[Message] = None, future=None) -> None:
def schedule_write(
self, msg: Optional[Message] = None, future: Optional[asyncio.Future] = None
) -> None:
"""Schedule a message to be written."""
queue_is_empty = not self.messages
if msg is not None:
self.buffer_message(msg, future)

if self.bus.unique_name:
# Optimization: try to send now if the queue
# is empty. With bleak this usually means we
# can send right away 99% of the time which
# is a huge improvement in latency.
if queue_is_empty:
self._write_without_remove_writer()

if (
self.buf is not None
or self.messages
Expand Down

0 comments on commit 4051cf2

Please sign in to comment.