-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.5 Composite Mbox
Suppose we have an agent-producer that sends messages of different types (e.g. M1 and M2) to a single mbox. And we want to implement different strategies for delivering those messages to actual handlers. For example, we want to have a round_robin mbox for M1 messages, and retained_msg for M2 messages.
The problem is that instances of round_robin and retained_msg mboxes will be separate mboxes, but our producer sends all messages to just one mbox.
So we need some kind of facade mbox that will hide our round_robin and retained_msg mboxes under the hood.
A new mbox added in v1.5.2 provides a solution to that problem in the form of composite-mbox. That mbox can be seen as a dictionary of several independent mboxes with message type as a key.
For example, we can have an instance of round_robin mbox and set that mbox as the destination for messages of type M1. And we can also have an instance of retained_msg mbox that is used as the destination for messages of type M2.
When a message of type M1 is being sent to a composite-mbox it will seek for a destination for M1 type and will find a round_robin mbox instance, so the message will be redirected to that mbox. The same for messages of type M2: an instance of retained_msg mbox will be found and the message will be redirected to it.
Composite-mbox works as a simple conductor: it doesn't hold subscribers and/or delivery filters, it only holds reference to actual mboxes and delegates all actions to the appropriate destinations.
Very simple usage of composite-mbox mbox can look like this:
#include <so_5_extra/mboxes/composite.hpp>
#include <so_5_extra/mboxes/round_robin.hpp>
#include <so_5_extra/mboxes/retained_msg.hpp>
#include <so_5/all.hpp>
namespace rr_ns = so_5::extra::round_robin;
namespace retained_ns = so_5::extra::retained_msg;
namespace composite_ns = so_5::extra::mboxes::composite;
//
// Data producer part.
//
// Message to be used for data spreading.
struct msg_data final : public so_5::message_t {...};
// Message to be used for reporting statistics.
struct msg_generation_stats final : public so_5::message_t {...};
// Agent that produces the data.
class data_producer final : public so_5::agent_t
{
// Signal for data generation.
struct msg_generate final : public so_5::signal_t {};
// Mbox for data distribution.
const so_5::mbox_t distribution_mbox_;
// Timer for data generation.
so_5::timer_id_t generation_timer_;
public:
data_producer(context_t & ctx, so_5::mbox_t distribution_mbox)
: so_5::agent_t{std::move(ctx)}
, distribution_mbox_{std::move(distribution_mbox)}
{}
...
void so_evt_start() override
{
generation_timer_ = so_5::send_periodic<msg_generate>(*this);
...
}
private:
...
void evt_generate(mhood_t<msg_generate>)
{
// Generate the next portion of data and related messages.
// NOTE: all messages are sent to the same mbox.
...
so_5::send<msg_data>(distribution_mbox_, ...);
...
so_5::send<msg_generation_stats>(distribution_mbox_, ...);
}
};
//
// Data consumer part
//
class data_consumer final : public so_5::agent_t
{
const so_5::mbox_t data_mbox_;
public:
data_consumer(context_t ctx, so_5::mbox_t data_mbox)
: so_5::agent_t{std::move(ctx)}
, data_mbox_{std::move(data_mbox)}
{}
...
void so_define_agent() override
{
so_subscribe(data_mbox_).event([](mhood_t<msg_data> cmd) { ... });
}
...
};
class stats_consumer final : public so_5::agent_t
{
const so_5::mbox_t stats_mbox_;
public:
stats_consumer(context_t ctx, so_5::mbox_t stats_mbox)
: so_5::agent_t{std::move(ctx)}
, stats_mbox_{std::move(stats_mbox)}
...
void so_define_agent() override
{
so_subscribe(stats_mbox_).event([](mhood_t<msg_generation_stats>) {...});
}
};
...
// Creation of producer and consumers.
env.introduce_coop([](so_5::coop_t & coop) {
// Create actual mboxes.
auto data_mbox = rr_ns::make_mbox<>(coop.environment());
auto stats_mbox = retained_ns::make_mbox<>(coop.environment());
// Mbox to be used for data distribution.
auto distribution_mbox =
// Use MPMC mbox because all messages are sent as immutable.
composite_ns::multi_consumer_builder(composite_ns::throw_if_not_found())
// Destinations.
.add<msg_data>(data_mbox)
.add<msg_generation_stats>(stats_mbox)
// Build the mbox.
make(coop.environment());
// Create agents. Note that we use distribution_mbox for all agents.
coop.make_agent<data_consumer>(distribution_mbox);
coop.make_agent<stats_consumer>(distribution_mbox);
coop.make_agent<data_producer>(distribution_mbox);
});
Composite-mbox has a major limitation: all message types that are served by composite-mbox have to be described and only one destination can be set for a distinct message type.
It means that a run-time error in the form of so_5::exception_t
will be raised in such a case at the point (1):
auto mbox = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::throw_if_not_found())
// Define a destination for M1 message type.
.add<M1>(first_mbox)
// Try to add another destination for the M1 message type.
.add<M1>(second_mbox) // (1) An exception will be thrown here.
...
Because all messages to be handled by a composite mbox have to be described there is a question: what to do with a message of unknown type?
There are several policies that can be applied. A user has to specify the policy at the definition of composite-mbox. Once specified the policy can't be changed.
At the moment composite-mbox supports the following policies.
A user specified a mbox that will be used as the default destination: all messages of unknown types will be redirected to that mbox (subscription attempts and attempts to set a delivery filter will be redirected as well).
A composite-mbox with the default destination can be seen this way:
Usage example:
auto default_dest = env.create_mbox();
auto mbox = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::redirect_to_if_not_found(default_dest))
.add<M1>(first_mbox)
.add<M2>(second_mbox)
.make(env);
An attempt to make a subscription to a message of type M3, an attempt to set a delivery filter for a message of type M3, an attempt to send a message of type M3 will be redirected to default_dest mbox.
An exception will be thrown at an attempt to subscribe to messages of unknown type, at an attempt to set a delivery filter for messages of unknown type, at an attempt to send a message of unknown type.
Usage example:
auto default_dest = env.create_mbox();
auto mbox = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::throw_if_not_found())
.add<M1>(first_mbox)
.add<M2>(second_mbox)
.make(env);
An attempt to subscribe to messages of unknown type or an attempt to set a delivery filter for messages of unknown type will silently be ignored.
A message of unknown type will silently be ignored during delivery (sending).
Usage example:
auto default_dest = env.create_mbox();
auto mbox = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::drop_if_not_found())
.add<M1>(first_mbox)
.add<M2>(second_mbox)
.make(env);
An instance of composite mbox can be created as MPMC or MPSC mbox. It's specified by a parameter to mbox_builder
function. Or multi_consumer_builder
/single_consumer_builder
can be used as a more convenient variant:
// MPMC mbox.
auto mpmc = so_5::extra::mboxes::composite::multi_consumer_builder(...)
.add<M1>(first_mbox)
.add<M2>(second_mbox)
.make(env);
// MPSC mbox.
auto mpsc = so_5::extra::mboxes::composite::single_consumer_builder(...)
.add<M1>(first_mbox)
.add<M2>(second_mbox)
.make(env);
MPSC mbox allows delivery of mutable messages. It means that MPSC mbox allows to write code like that:
// MPSC mbox.
auto mpsc = so_5::extra::mboxes::composite::single_consumer_builder(...)
.add<M1>(first_mbox)
// Note that M1 and mutable_msg<M1> are different message types.
.add< so_5::mutable_msg<M1> >(first_mbox)
.add<M2>(second_mbox)
.add< so_5::mutable_msg<M2> >(third_mbox)
.make(env);
If a composite-mbox is being created as a MPSC mbox then MPMC mboxes can be added as destinations, but for immutable messages only. For example:
auto mpmc_dest = env.create_mbox(); // MPMC mbox created.
// MPSC composite-mbox.
auto mpsc = so_5::extra::mboxes::composite::single_consumer_builder(...)
.add<M1>(mpmc_dest) // OK, M1 is an immutable message.
.add< so_5::mutable_msg<M1> >(mpmc_dest) // ERROR! An exception will be thrown.
.make(env);
Because composite-mbox doesn't process subscriptions and delivery filters itself and delegates the corresponding calls to destination mboxes the support of delivery filters depends on the destination. If a destination mbox supports delivery filters then an attempt to set a delivery filter will be successful. Otherwise an exception will be thrown by the destination mbox.
There are two main functions for the creation of composite mbox builder:
// Creates a builder for multi-producer/multi-consumer mbox.
[[nodiscard]] mbox_builder_t
multi_consumer_builder(
so_5::extra::mboxes::composite::type_not_found_reaction_t type_not_found_reaction);
// Creates a multi-producer/single-consumer mbox.
[[nodiscard]] mbox_builder_t
single_consumer_builder(
so_5::extra::mboxes::composite::type_not_found_reaction_t type_not_found_reaction);
Both are just thin wrappers around mbox_builder
function that is actually used for creation of a builder. It has the following signature:
[[nodiscard]] mbox_builder_t
mbox_builder(
so_5::mbox_type_t mbox_type,
so_5::extra::mboxes::composite::type_not_found_reaction_t type_not_found_reaction);
An instance of mbox_builder_t
type returned by mbox_builder
function (or multi_consumer_builder
/single_consumer_builder
functions) can be stored for some (not so long) time. For example:
auto builder = so_5::extra::mboxes::composite::multi_consumer_builder(...);
...
builder.add<M1>(first_mbox);
...
if(some_condition)
builder.add<M2>(second_mbox);
...
if(third_mbox_available)
builder.add<M3>(third_mbox);
...
auto mbox = builder.make(env);
The mbox_builder_t
isn't a thread-safe type. It's assumed that the instance of mbox_builder_t
will be modified from just one thread.
NOTE. An instance of composite-mbox is thread-safe.