Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Adding rabbitmq based RPC for IPC between services #3909

Merged
merged 36 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ed19ad0
first version of the RPC client via rabbitmq
Feb 22, 2023
ec3b363
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 22, 2023
46e84d3
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
7c37064
added a server namespace
Feb 23, 2023
7b24567
moved robust rpc to rabbitmq
Feb 23, 2023
eeb30de
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
a91bd89
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
8b3a0a3
rabbitmq rpc refactor
Feb 24, 2023
b7a7742
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 27, 2023
b4890bd
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 28, 2023
bebdbdb
added extra test
Feb 28, 2023
6a5805d
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 1, 2023
d1cfab7
fix error when closing channel
Mar 1, 2023
dce584f
add missing type
Mar 1, 2023
6d7ad01
some more progress
Mar 1, 2023
8616e3b
remove extension module
Mar 2, 2023
26f4708
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 2, 2023
1241850
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 2, 2023
c0a1d8a
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 3, 2023
b929d93
added get_namespace
Mar 3, 2023
839b84e
refactor
Mar 7, 2023
4ab92e0
added registration helper
Mar 7, 2023
9eeb9bf
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 7, 2023
4f1d38b
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 8, 2023
d5f0eb5
refactor import
Mar 8, 2023
fddef59
refactor
Mar 8, 2023
993cef8
refactor
Mar 8, 2023
ca3bdc2
refactor
Mar 8, 2023
d7cf000
moved to helpers
Mar 8, 2023
13fef45
refactor names
Mar 8, 2023
d8f7bcb
refactor
Mar 8, 2023
50a9c17
fix utility
Mar 8, 2023
58b292e
replaced helpers with pydantic models
Mar 8, 2023
0603de9
using hostname
Mar 9, 2023
ede5a09
Merge branch 'master' into pr-osparc-aiopika-solidrpc
GitHK Mar 9, 2023
5a79662
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions packages/service-library/src/servicelib/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import socket
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable, Final, Optional
from uuid import uuid4

import aio_pika
from aio_pika import MessageProcessError, RobustChannel, RobustConnection
GitHK marked this conversation as resolved.
Show resolved Hide resolved
from aio_pika.exceptions import ChannelClosed
from aio_pika.patterns import RPC
from pydantic import PositiveInt
from servicelib.logging_utils import log_context
from settings_library.rabbit import RabbitSettings

from .rabbitmq_errors import RemoteMethodNotRegisteredError, RPCNotInitializedError
from .rabbitmq_utils import RPCNamespace, get_namespace

log = logging.getLogger(__name__)


Expand All @@ -28,6 +36,8 @@ def _channel_close_callback(sender: Any, exc: Optional[BaseException]) -> None:
if exc:
if isinstance(exc, asyncio.CancelledError):
log.info("Rabbit channel was cancelled")
elif isinstance(exc, ChannelClosed):
log.info("%s", exc)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
else:
log.error(
"Rabbit channel closed with exception from %s:%s",
Expand Down Expand Up @@ -57,13 +67,21 @@ async def _get_connection(
_RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S: Final[int] = 15 * _MINUTE


def _get_namespaced_method_name(namespace: RPCNamespace, handler_name: str) -> str:
GitHK marked this conversation as resolved.
Show resolved Hide resolved
return f"{namespace}.{handler_name}"


@dataclass
class RabbitMQClient:
client_name: str
settings: RabbitSettings
_connection_pool: Optional[aio_pika.pool.Pool] = field(init=False, default=None)
_channel_pool: Optional[aio_pika.pool.Pool] = field(init=False, default=None)

_rpc_connection: Optional[RobustConnection] = None
_rpc_channel: Optional[RobustChannel] = None
_rpc: Optional[RPC] = None

def __post_init__(self):
# recommendations are 1 connection per process
self._connection_pool = aio_pika.pool.Pool(
Expand All @@ -72,13 +90,29 @@ def __post_init__(self):
# channels are not thread safe, what about python?
self._channel_pool = aio_pika.pool.Pool(self._get_channel, max_size=10)

async def rpc_initialize(self) -> None:
# TODO: to SAN: not sure that we always want to setup RPC connection
GitHK marked this conversation as resolved.
Show resolved Hide resolved
self._rpc_connection = await aio_pika.connect_robust(
self.settings.dsn, client_properties={"connection_name": f"rpc.{uuid4()}"}
GitHK marked this conversation as resolved.
Show resolved Hide resolved
)
self._rpc_channel = await self._rpc_connection.channel()
self._rpc = await RPC.create(self._rpc_channel)

async def close(self) -> None:
with log_context(log, logging.INFO, msg="Closing connection to RabbitMQ"):
assert self._channel_pool # nosec
await self._channel_pool.close()
assert self._connection_pool # nosec
await self._connection_pool.close()

# rpc is not always initialized
if self._rpc is not None:
GitHK marked this conversation as resolved.
Show resolved Hide resolved
await self._rpc.close()
if self._rpc_channel is not None:
await self._rpc_channel.close()
if self._rpc_connection is not None:
await self._rpc_connection.close()

async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
assert self._connection_pool # nosec
async with self._connection_pool.acquire() as connection:
Expand Down Expand Up @@ -118,11 +152,11 @@ async def subscribe(
# consumer/publisher must set the same configuration for same queue
# exclusive means that the queue is only available for THIS very client
# and will be deleted when the client disconnects
queue_parameters = dict(
durable=True,
exclusive=exclusive_queue,
arguments={"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S},
)
queue_parameters = {
"durable": True,
"exclusive": exclusive_queue,
"arguments": {"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S},
}
if not exclusive_queue:
# NOTE: setting a name will ensure multiple instance will take their data here
queue_parameters |= {"name": exchange_name}
Expand Down Expand Up @@ -152,3 +186,78 @@ async def publish(self, exchange_name: str, message: Message) -> None:
aio_pika.Message(message.encode()),
routing_key="",
)

async def rpc_request(
self,
namespace: RPCNamespace,
method_name: str,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
*,
timeout: Optional[PositiveInt] = 5,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
**kwargs: dict[str, Any],
) -> Any:
"""
Call a remote registered `handler` by providing it's `namespace`, `method_name`
and list of expected arguments.

NOTE: `namespace` should always be composed via `get_namespace`
GitHK marked this conversation as resolved.
Show resolved Hide resolved
"""

if not self._rpc:
raise RPCNotInitializedError()

namespaced_method_name = _get_namespaced_method_name(namespace, method_name)
try:
queue_expiration_timeout = timeout
awaitable = self._rpc.call(
namespaced_method_name,
expiration=queue_expiration_timeout,
kwargs=kwargs,
)
return await asyncio.wait_for(awaitable, timeout=timeout)
except MessageProcessError as e:
if e.args[0] == "Message has been returned":
GitHK marked this conversation as resolved.
Show resolved Hide resolved
raise RemoteMethodNotRegisteredError(
method_name=namespaced_method_name, incoming_message=e.args[1]
) from e
raise e

async def rpc_register_for(self, entries: dict[str, str], handler: Awaitable):
GitHK marked this conversation as resolved.
Show resolved Hide resolved
"""
Bind a local `handler` to a `namespace` derived from the provided `entries`
dictionary.

NOTE: This is a helper enforce the pattern defined in `rpc_register`'s
docstring.
"""
await self.rpc_register(
get_namespace(entries), method_name=handler.__name__, handler=handler
)

async def rpc_register(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
self, namespace: RPCNamespace, method_name: str, handler: Awaitable
) -> None:
"""
Bind a local `handler` to a `namespace` and `method_name`.
The handler can be remotely called by providing the `namespace` and `method_name`

NOTE: method_name could be computed from the handler, but by design, it is
left to the caller to do so.
NOTE: `namespace` should always be composed via `get_namespace`
"""
GitHK marked this conversation as resolved.
Show resolved Hide resolved

if self._rpc is None:
raise RPCNotInitializedError()

await self._rpc.register(
_get_namespaced_method_name(namespace, method_name),
handler,
auto_delete=True,
)

async def rpc_unregister(self, handler: Awaitable) -> None:
GitHK marked this conversation as resolved.
Show resolved Hide resolved
"""Unbind a locally added `handler`"""

if self._rpc is None:
raise RPCNotInitializedError()

await self._rpc.unregister(handler)
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
38 changes: 38 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Final

from pydantic.errors import PydanticErrorMixin

_ERROR_PREFIX: Final[str] = "rabbitmq_error"


class BaseRPCError(PydanticErrorMixin, RuntimeError):
...


class RPCNotInitializedError(BaseRPCError):
code = f"{_ERROR_PREFIX}.not_started"
msg_template = "Please check that the RPC backend was initialized!"
GitHK marked this conversation as resolved.
Show resolved Hide resolved


class RemoteMethodNotRegisteredError(BaseRPCError):
code = f"{_ERROR_PREFIX}.remote_not_registered"
msg_template = (
"Could not find a remote method named: '{method_name}'. "
"Message from remote server was returned: {incoming_message}. "
)


class RPCNamespaceTooLongError(BaseRPCError):
code = f"{_ERROR_PREFIX}.rpc_namespace_error"
msg_template = (
"The generated namespace {namespace} is too long. "
"It contains {namespace_length} characters it is limited to {char_limit}."
)


class RPCNamespaceInvalidCharsError(BaseRPCError):
code = f"{_ERROR_PREFIX}.rpc_namespace_error"
msg_template = (
"Generated namespace {namespace} contains not allowed characters."
"Allowed chars must match {match_regex}."
)
40 changes: 34 additions & 6 deletions packages/service-library/src/servicelib/rabbitmq_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# FIXME: move to settings-library or refactor

import logging
import re
from typing import Final, Optional

import aio_pika
Expand All @@ -10,11 +11,38 @@
from tenacity.wait import wait_fixed

from .logging_utils import log_context
from .rabbitmq_errors import RPCNamespaceInvalidCharsError, RPCNamespaceTooLongError

log = logging.getLogger(__file__)


_MINUTE: Final[int] = 60
_NAMESPACE_CHAR_LIMIT: Final[int] = 100

REGEX_VALIDATE_RABBIT_QUEUE_NAME: Final[str] = r"^[\w\-\.]{1,255}$"

RPCNamespace = str

sanderegg marked this conversation as resolved.
Show resolved Hide resolved

def get_namespace(entries: dict[str, str]) -> RPCNamespace:
"""
Given a list of entries creates a namespace to be used in declaring the rabbitmq queue.
Keeping this to a predefined length
"""

namespace = "-".join(f"{k}_{v}" for k, v in sorted(entries.items()))
if len(namespace) > _NAMESPACE_CHAR_LIMIT:
raise RPCNamespaceTooLongError(
namespace=namespace,
namespace_length=len(namespace),
char_limit=_NAMESPACE_CHAR_LIMIT,
)

if not re.compile(REGEX_VALIDATE_RABBIT_QUEUE_NAME).match(namespace):
raise RPCNamespaceInvalidCharsError(
namespace=namespace, match_regex=REGEX_VALIDATE_RABBIT_QUEUE_NAME
)
return namespace


class RabbitMQRetryPolicyUponInitialization:
Expand All @@ -23,12 +51,12 @@ class RabbitMQRetryPolicyUponInitialization:
def __init__(self, logger: Optional[logging.Logger] = None):
logger = logger or log

self.kwargs = dict(
wait=wait_fixed(2),
stop=stop_after_delay(3 * _MINUTE),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
self.kwargs = {
"wait": wait_fixed(2),
"stop": stop_after_delay(3 * _MINUTE),
"before_sleep": before_sleep_log(logger, logging.WARNING),
"reraise": True,
}


@retry(**RabbitMQRetryPolicyUponInitialization().kwargs)
Expand Down
Loading