Skip to content

Commit

Permalink
feat: reduce duplicate code in aio MessageBus (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 4, 2023
1 parent e30c477 commit 502ab0d
Showing 1 changed file with 14 additions and 29 deletions.
43 changes: 14 additions & 29 deletions src/dbus_fast/aio/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import socket
from collections import deque
from copy import copy
from functools import partial
from typing import Any, Callable, List, Optional, Set, Tuple

from .. import introspection as intr
Expand Down Expand Up @@ -288,13 +289,7 @@ async def introspect(
"""
future = self._loop.create_future()

def reply_handler(reply: Any, err: Exception) -> None:
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)

super().introspect(bus_name, path, reply_handler)
super().introspect(bus_name, path, partial(self._reply_handler, future))

timer_handle = self._loop.call_later(
timeout, _future_set_exception, future, asyncio.TimeoutError
Expand Down Expand Up @@ -326,13 +321,7 @@ async def request_name(
"""
future = self._loop.create_future()

def reply_handler(reply, err):
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)

super().request_name(name, flags, reply_handler)
super().request_name(name, flags, partial(self._reply_handler, future))

return await future

Expand All @@ -354,13 +343,7 @@ async def release_name(self, name: str) -> ReleaseNameReply:
"""
future = self._loop.create_future()

def reply_handler(reply, err):
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)

super().release_name(name, reply_handler)
super().release_name(name, partial(self._reply_handler, future))

return await future

Expand All @@ -387,14 +370,7 @@ async def call(self, msg: Message) -> Optional[Message]:

future = self._loop.create_future()

def reply_handler(reply, err):
if not future.done():
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)

self._call(msg, reply_handler)
self._call(msg, partial(self._reply_handler, future))

await future

Expand Down Expand Up @@ -546,3 +522,12 @@ def _finalize(self, err: Optional[Exception] = None) -> None:
_future_set_exception(self._disconnect_future, err)
else:
_future_set_result(self._disconnect_future, None)

def _reply_handler(
self, future: asyncio.Future, reply: Optional[Any], err: Optional[Exception]
) -> None:
"""The reply handler for method calls."""
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)

0 comments on commit 502ab0d

Please sign in to comment.