Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce latency to process messages #161

Merged
merged 11 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
poetry install --only=main,dev
fi
- name: Test with Pytest
run: export $(dbus-launch); poetry run pytest --cov-report=xml --timeout=5
run: export $(dbus-launch); poetry run pytest -vvvvs --cov-report=xml --timeout=5
bdraco marked this conversation as resolved.
Show resolved Hide resolved
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def build(setup_kwargs):
[
"src/dbus_fast/aio/message_reader.py",
"src/dbus_fast/message.py",
"src/dbus_fast/message_bus.py",
"src/dbus_fast/signature.py",
"src/dbus_fast/unpack.py",
"src/dbus_fast/_private/marshaller.py",
Expand Down
27 changes: 27 additions & 0 deletions src/dbus_fast/message_bus.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import cython

from .message cimport Message


cdef object MessageType
cdef object DBusError
cdef object MessageFlag


cdef class BaseMessageBus:

cdef public object unique_name
cdef object _disconnected
cdef object _user_disconnect
cdef object _method_return_handlers
cdef object _serial
cdef object _user_message_handlers
cdef object _name_owners
cdef object _bus_address
cdef object _name_owner_match_rule
cdef object _match_rules
cdef object _high_level_client_initialized
cdef object _ProxyObject
cdef object _machine_id

cpdef _process_message(self, msg: Message)
31 changes: 24 additions & 7 deletions src/dbus_fast/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ class BaseMessageBus:
:vartype connected: bool
"""

__slots__ = (
"unique_name",
"_disconnected",
"_user_disconnect",
"_method_return_handlers",
"_serial",
"_user_message_handlers",
"_name_owners",
"_bus_address",
"_name_owner_match_rule",
"_match_rules",
"_high_level_client_initialized",
"_ProxyObject",
"_machine_id",
)

def __init__(
self,
bus_address: Optional[str] = None,
Expand Down Expand Up @@ -620,7 +636,7 @@ def _setup_socket(self) -> None:
if "path" in options:
filename = options["path"]
elif "abstract" in options:
filename = f'\0{options["abstract"]}'
filename = b"\0" + options["abstract"].encode()
else:
raise InvalidAddressError(
"got unix transport with unknown path specifier"
Expand All @@ -629,7 +645,7 @@ def _setup_socket(self) -> None:
try:
self._sock.connect(filename)
self._sock.setblocking(False)
break
return
bdraco marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
err = e

Expand All @@ -646,7 +662,7 @@ def _setup_socket(self) -> None:
try:
self._sock.connect((ip_addr, ip_port))
self._sock.setblocking(False)
break
return
bdraco marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
err = e

Expand Down Expand Up @@ -770,8 +786,9 @@ def send_error(self, exc: Exception) -> None:

return SendReply()

def _process_message(self, msg: Message) -> None:
def _process_message(self, msg) -> None:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
handled = False
message_type = msg.message_type

for user_handler in self._user_message_handlers:
try:
Expand All @@ -782,7 +799,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break
except DBusError as e:
if msg.message_type == MessageType.METHOD_CALL:
if message_type == MessageType.METHOD_CALL:
self.send(e._as_message(msg))
handled = True
break
Expand All @@ -805,7 +822,7 @@ def _process_message(self, msg: Message) -> None:
handled = True
break

if msg.message_type == MessageType.SIGNAL:
if message_type == MessageType.SIGNAL:
if (
msg.member == "NameOwnerChanged"
and msg.sender == "org.freedesktop.DBus"
Expand All @@ -818,7 +835,7 @@ def _process_message(self, msg: Message) -> None:
elif name in self._name_owners:
del self._name_owners[name]

elif msg.message_type == MessageType.METHOD_CALL:
elif message_type == MessageType.METHOD_CALL:
if not handled:
handler = self._find_message_handler(msg)

Expand Down