-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.5 First Last Subscriber Notify Mbox
Suppose we have an agent that gathers and processes some data from an external source. The data processing is a heavy task as we want to avoid spending resources when no one receives the processed data at the moment.
An obvious way to do that is to force data consumers to inform about arrival and leaving. For example, a consumer have to send msg_register_consumer
when the consumer starts and msg_deregister_consumer
when the consumer exits. The data processor has to receive msg_register_consumer
and msg_deregister_consumer
and hold a list of live consumers. When new data is ready, the data processor sends a message with new data to every registered consumer.
It's a working scheme, but it has some drawbacks:
- it makes the producer more complex because it has to hold a list of live consumers;
- it makes consumer more complex because they have to make a subscription to a message with data, but also they have to send two additional messages.
A new mbox added in v1.5.2 provides another solution to that problem. It's first-last-subscriber-notification mbox that sends:
-
msg_first_subscriber
signal when the first subscriber arrives; -
msg_last_subscriber
signal when the last subscriber leaves.
It means that consumers only have to subscribe to a mbox. The corresponding notifications will be automatically sent to data producer.
It also means that the data producer has no need to hold a list of subscriber. An instance of first-last-subscriber-notification mbox can be used as multi-producer/multi-consumer mbox: a message sent to that mbox will be automatically delivered to every subscriber.
Very simple usage of unique-subscribers mbox can look like this:
#include <so_5_extra/mboxes/first_last_subscriber_notification.hpp>
#include <so_5/all.hpp>
namespace notifications_ns = so_5::extra::mboxes::first_last_subscriber_notification;
//
// Data producer part.
//
// Message to be used for data spreading.
struct msg_data final : public so_5::message_t {...};
// Agent that produces the data.
class data_producer final : public so_5::agent_t
{
// State in that the agent waits the first consumer.
state_t st_wait_consumers{this};
// State in that the agent produces data.
state_t st_consumers_connected{this};
// Mbox for data spreading. Will be created in the constructor.
const so_5::mbox_t data_mbox_;
public:
data_producer(context_t & ctx)
: so_5::agent_t{std::move(ctx)}
, data_mbox_t{ // A new mbox has to be created.
// New mbox will be used for msg_data messages...
notifications_ns::make_mbox<msg_data>(
so_environment(),
// Agent's direct mbox for notifications.
so_direct_mbox(),
// This is MPMC mbox.
so_5::mbox_type_t::multi_producer_multi_consumer )
}
{}
// Access to data mbox.
[[nodiscard]]
const so_5::mbox_t &
data_mbox() const noexcept { return data_mbox_; }
void so_define_agent() override
{
st_wait_consumers
.event([this](mhood_t<notifications_ns::msg_first_subscriber>) {
st_consumers_connected.activate();
});
st_consumers_connected
.on_enter([]{ ... /* turn data acquisition on */ })
.on_exit([]{ ... /* turn data acquisition off */ })
.event([this](mhood_t<notifications_ns::msg_last_subscriber>) {
st_no_consumers.activate();
});
}
...
};
Delivery filters are supported, even if first-last-subscriber-notification mbox is used as MPSC mbox.
Please note, that so_5::agent_t::so_set_delivery_filter_for_mutable_msg
has to be used for filtering mutable messages.
make_mbox
function that is used for creation of first-last-subscriber-notification mbox instance is a template function with the following signature:
template< typename Msg_Type, typename Lock_Type = std::mutex >
mbox_t
make_mbox(
environment_t & env,
const so_5::mbox_t & notification_mbox,
so_5::mbox_type_t mbox_type );
Template parameter Msg_Type
specifies a message type to be used with the created mbox. Please note that if a first-last-subscriber-notification mbox is being created for mutable message then so_5::mutable_msg
should be used, for example:
using namespace so_5::extra::mboxes::first_last_subscriber_notification;
auto my_mpsc_mbox = make_mbox< so_5::mutable_msg<my_msg> >(...);
Template parameter Lock_Type
specifies a type of lock object to protect mbox's internals in a multithreaded application. By default, this type is std::mutex
. But it can be changed to something that more appropriate for the user's environment.
For example, a custom implementation of spinlock can be used as Lock_Type
. Or it can so_5::null_mutex_t
for single-threaded environments.