Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce latency to process messages #161

Merged
merged 11 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def build(setup_kwargs):
[
"src/dbus_fast/aio/message_reader.py",
"src/dbus_fast/message.py",
"src/dbus_fast/message_bus.py",
"src/dbus_fast/signature.py",
"src/dbus_fast/unpack.py",
"src/dbus_fast/_private/marshaller.py",
Expand Down
29 changes: 29 additions & 0 deletions src/dbus_fast/message_bus.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import cython

from .message cimport Message


cdef object MessageType
cdef object DBusError
cdef object MessageFlag

cdef object MESSAGE_TYPE_CALL
cdef object MESSAGE_TYPE_SIGNAL

cdef class BaseMessageBus:

cdef public object unique_name
cdef public object _disconnected
cdef public object _user_disconnect
cdef public object _method_return_handlers
cdef public object _serial
cdef public cython.list _user_message_handlers
cdef public object _name_owners
cdef public object _bus_address
cdef public object _name_owner_match_rule
cdef public object _match_rules
cdef public object _high_level_client_initialized
cdef public object _ProxyObject
cdef public object _machine_id

cpdef _process_message(self, Message msg)
47 changes: 33 additions & 14 deletions src/dbus_fast/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from .signature import Variant
from .validators import assert_bus_name_valid, assert_object_path_valid

MESSAGE_TYPE_CALL = MessageType.METHOD_CALL
MESSAGE_TYPE_SIGNAL = MessageType.SIGNAL


class BaseMessageBus:
"""An abstract class to manage a connection to a DBus message bus.
Expand Down Expand Up @@ -56,6 +59,22 @@ class BaseMessageBus:
:vartype connected: bool
"""

__slots__ = (
"unique_name",
"_disconnected",
"_user_disconnect",
"_method_return_handlers",
"_serial",
"_user_message_handlers",
"_name_owners",
"_bus_address",
"_name_owner_match_rule",
"_match_rules",
"_high_level_client_initialized",
"_ProxyObject",
"_machine_id",
)

def __init__(
self,
bus_address: Optional[str] = None,
Expand Down Expand Up @@ -620,7 +639,7 @@ def _setup_socket(self) -> None:
if "path" in options:
filename = options["path"]
elif "abstract" in options:
filename = f'\0{options["abstract"]}'
filename = b"\0" + options["abstract"].encode()
else:
raise InvalidAddressError(
"got unix transport with unknown path specifier"
Expand Down Expand Up @@ -770,9 +789,8 @@ def send_error(self, exc: Exception) -> None:

return SendReply()

def _process_message(self, msg: Message) -> None:
def _process_message(self, msg) -> None:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
handled = False

for user_handler in self._user_message_handlers:
try:
result = user_handler(msg)
Expand All @@ -782,7 +800,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break
except DBusError as e:
if msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
self.send(e._as_message(msg))
handled = True
break
Expand All @@ -794,7 +812,7 @@ def _process_message(self, msg: Message) -> None:
logging.error(
f"A message handler raised an exception: {e}.\n{traceback.format_exc()}"
)
if msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
self.send(
Message.new_error(
msg,
Expand All @@ -805,7 +823,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break

if msg.message_type == MessageType.SIGNAL:
if msg.message_type is MESSAGE_TYPE_SIGNAL:
if (
msg.member == "NameOwnerChanged"
and msg.sender == "org.freedesktop.DBus"
Expand All @@ -817,8 +835,9 @@ def _process_message(self, msg: Message) -> None:
self._name_owners[name] = new_owner
elif name in self._name_owners:
del self._name_owners[name]
return

elif msg.message_type == MessageType.METHOD_CALL:
if msg.message_type is MESSAGE_TYPE_CALL:
if not handled:
handler = self._find_message_handler(msg)

Expand All @@ -835,14 +854,14 @@ def _process_message(self, msg: Message) -> None:
f'{msg.interface}.{msg.member} with signature "{msg.signature}" could not be found',
)
)
return

else:
# An ERROR or a METHOD_RETURN
if msg.reply_serial in self._method_return_handlers:
if not handled:
return_handler = self._method_return_handlers[msg.reply_serial]
return_handler(msg, None)
del self._method_return_handlers[msg.reply_serial]
# An ERROR or a METHOD_RETURN
if msg.reply_serial in self._method_return_handlers:
if not handled:
return_handler = self._method_return_handlers[msg.reply_serial]
return_handler(msg, None)
del self._method_return_handlers[msg.reply_serial]

def _make_method_handler(
self, interface: ServiceInterface, method: _Method
Expand Down