-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.6 Retained Mbox
Suppose there is an agent which collects some data from an external device (like a temperature sensor) and publishes this data on a regular basis. Something like that:
class temperature_sensor : public so_5::agent_t {
struct next_tick final : public so_5::signal_t {};
public:
struct data final : public so_5::message_t {
temperature value_;
...
};
...
private:
// MPMC mbox for publishing a new data.
const so_5::mbox_t data_delivery_mbox_;
...
void on_next_tick(mhood_t<next_tick>) {
... // Acquire data from sensor;
// Publish data for consumers.
so_5::send<data>(data_delivery_mbox_, ...);
// Schedule next data acquisition time.
so_5::send_delayed<next_tick>(*this, acquisition_timeout_);
}
};
A data consumer can subscribe to MPMC-mbox and will receive fresh data when the temperature_sensor publish it.
But what if temperature_sensor publishes data once per hour and a consumer subscribes just after the temperature_sensor published its data? For example, the temperature_sensor published data at 11:00, but a new consumer makes a subscription to MPMC-mbox at 11:05.
In this case, the new consumer won't receive temperature_sensor's data until 12:00. Sometimes it is not appropriate and a consumer wants to receive the current temperature_sensor's data as soon as possible.
There are at least two obvious solutions, but none of them is perfect:
There could be a temperature_sensor::publish_current_data signal. A new consumer sends this signal to MPMC-mbox (or to temperature_sensor's direct mbox):
class temperature_data_consumer : public so_5::agent_t {
public:
...
void so_evt_start() override {
// Make subscription to MPMC mbox.
so_subscribe(data_delivery_mbox_, &temperature_data_consumer::on_data);
// Request the current data.
so_5::send<temperature_sensor::publish_current_data>(data_delivery_mbox_);
...
}
...
};
and temperature_sensor will receive and handle this signal by publishing the current data to the MPMC-mbox:
class temperature_sensor : public so_5::agent_t {
struct next_tick final : public so_5::signal_t {};
public:
struct data final : public so_5::message_t {
temperature value_;
...
};
struct publish_current_data final : public so_5::signal_t {};
...
private:
// MPMC mbox for publishing a new data.
const so_5::mbox_t data_delivery_mbox_;
...
void on_next_tick(mhood_t<next_tick>) {
... // Acquire data from sensor;
// Publish data for consumers.
so_5::send<data>(data_delivery_mbox_, ...);
// Schedule next data acquisition time.
so_5::send_delayed<next_tick>(*this, acquisition_timeout_);
}
void on_publish_current_data(mhood_t<publish_current_data>) {
// Publish data for consumers.
so_5::send<data>(data_delivery_mbox_, ...);
}
};
The main drawback of this approach is the duplication of data. When the temperature_sensor handles publish_current_data signal it distributes the current data to all consumers. Even to those consumers which already received it. It can be inappropriate.
A consumer can ask the temperature_sensor to send the current data only to that consumer. Something like that:
class temperature_data_consumer : public so_5::agent_t {
public:
...
void so_evt_start() override {
// Make subscription to MPMC mbox.
so_subscribe(data_delivery_mbox_, &temperature_data_consumer::on_data);
// Make subscription to receive the current data.
so_subscribe_self().event(&temperature_data_consumer::on_data);
// Request the current data.
so_5::send<temperature_sensor::ask_current_data>(
data_delivery_mbox_, so_direct_mbox());
...
}
...
};
class temperature_sensor : public so_5::agent_t {
struct next_tick final : public so_5::signal_t {};
public:
struct data final : public so_5::message_t {
temperature value_;
...
};
struct ask_current_data final : public so_5::message_t {
const so_5::mbox_t reply_to_;
ask_current_data(so_5::mbox_t reply_to) : reply_to_{std::move(reply_to)} {}
};
};
...
private:
// MPMC mbox for publishing a new data.
const so_5::mbox_t data_delivery_mbox_;
...
void on_next_tick(mhood_t<next_tick>) {
... // Acquire data from sensor;
// Publish data for consumers.
so_5::send<data>(data_delivery_mbox_, ...);
// Schedule next data acquisition time.
so_5::send_delayed<next_tick>(*this, acquisition_timeout_);
}
void on_ask_current_data(mhood_t<ask_current_data> cmd) {
// Publish data for the specific consumer.
so_5::send<data>(cmd->reply_to_, ...);
}
};
This approach is better than the first one but it requires some additional work as for temperature_sensor as for data consumers.
The retained message mbox provides a different solution for that problem. There is such thing as retained_msg mbox. It is very similar to usual MPMC mbox but has a very important distinction: an instance of the last message of type T is stored inside retained_msg and will be automatically resent for every new subscriber to messages of type T.
It means that when temperature_sensor sends a new message to a retained_msg mbox this message is delivered to every already existed subscriber and stored inside the retained_msg mbox. When a new consumer subscribes to this retained_msg mbox then the last stored message will be automatically sent to that subscriber. It means that new consumer will receive the current data from a temperature sensor without asking for it. So it allows to write something like:
// Trivial implementation of temperature sensor agent.
class temperature_sensor : public so_5::agent_t {
struct next_tick final : public so_5::signal_t {};
public:
struct data final : public so_5::message_t {
temperature value_;
...
};
...
private:
// MPMC mbox for publishing a new data.
// It should be a retained_msg mbox.
const so_5::mbox_t data_delivery_mbox_;
...
void on_next_tick(mhood_t<next_tick>) {
... // Acquire data from sensor;
// Publish data for consumers.
so_5::send<data>(data_delivery_mbox_, ...);
// Schedule next data acquisition time.
so_5::send_delayed<next_tick>(*this, acquisition_timeout_);
}
};
// Trivial implementation of data consumer agent.
class temperature_data_consumer : public so_5::agent_t {
public:
...
void so_evt_start() override {
// Make subscription to MPMC mbox.
// NOTE: it is important to perform subscription in so_evt_start() method.
so_subscribe(data_delivery_mbox_, &temperature_data_consumer::on_data);
...
}
...
};
A retained_msg mbox contains a map where message type is used as a key and a message instance is stored as a value.
When a new message of type T is sent the retained_msg mbox stores this instance in this map and then delivers this message to all current subscribers.
When a new subscriber creates a subscription for a message of type T and there is a message instance of that type then this message instance is delivered to that subscriber (but only if subscriber's delivery filter allows it).
A retained_msg mbox stores the last message for every type. It means that if two messages of type T and three messages of type M are sent there will be two messages stored inside the retained_msg mbox: the last one of type T and the last one of type M.
A retained_msg mbox is created by make_mbox
template function from so_5::extra::mboxes::retained_msg
namespace. Usually it is done this way:
#include <so_5_extra/mboxes/retained_msg.hpp>
...
so_5::environment_t & env = ...;
auto retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
A retained_msg mbox with std::mutex
inside will be created. An instance of std::mutex
will be used for protection of retained_msg mbox internals in a multithreaded environment.
Sometimes multithreading protection is not necessary. For example, when non-thread-safe environment infrastructure is used. In these cases, some kind of null-mutex can be used for retained_msg mbox. Something like so_5::null_mutex_t
which does nothing (it just a std::mutex
-like class with empty inline methods). It can be done this way:
so_5::environment_t & env = ...;
auto retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<
so_5::extra::mboxes::retained_msg::default_traits_t,
so_5::null_mutex_t>(env);
A user can specify own std::mutex
-like class to be used as a lock type. This type must be DefaultConstructible and its interface should be compatible with std::lock_guard
.
A retained_msg mbox respect delivery filters for subscribers. It means that if a subscriber installs a delivery filter for messages of type T then this delivery filter will be used for filtering messages to be delivered. For example:
struct data final : public so_5::message_t {
int data_;
data(int v) : data_{v} {}
};
class demo : public so_5::agent_t {
public:
...
void so_evt_start() override {
auto retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(
so_environment());
// Send a message which will be held by retained_mbox.
so_5::send<data>(retained_mbox, 42);
// Setup a delivery filter.
so_set_delivery_filter(retained_mbox,
[](const data & cmd) { return cmd.data_ > 40; });
// Make a subscription.
so_subscribe(retained_mbox).event([](mhood_t<data> cmd) {
std::cout << cmd->data_ << std::endl;
}
}
};
In this case, the retained message with data_=42
will be delivered to demo agent after subscription.
But if we change the delivery filter:
so_set_delivery_filter(retained_mbox,
[](const data & cmd) { return cmd.data_ > 42; });
then retained message won't be delivered because it will be blocked by the delivery filter.
Mutable messages can't be delivered via retained_msg mbox because it is MPMC-mbox and message deliverance via MPMC-mboxes is prohibited.
An instance of so_5::exception_t
will be thrown on an attempt to send a mutable message via retained_msg mbox.
If a subscriber makes a subscription to retained_msg mbox in so_define_agent
method then the retained message won't be delivered to the subscriber. It is because at the moment when so_defined_agent
works there is no event queue for the agent. So the delivery attempt from retained_msg mbox will be ignored.
It means that there is no sense to create subscription to retained_msg mbox in so_define_agent
. Subscription to retained_msg mbox should be performed only after the successful registration of the agent. For example: in so_evt_start
method or later.
If a message which is sent via retained_msg mbox holds a smart reference to that mbox then there will be a cyclic reference which will lead to a memory leak. For example:
struct demo_msg final : public so_5::message_t
{
...
const so_5::mbox_t mbox_;
...
demo_msg(..., so_5::mbox_t mbox, ...) : ..., mbox_{std::move(mbox)}, ... {}
};
...
auto retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
// ATTENTION! A cyclic reference will be created here!
so_5::send<demo_msg>(retained_mbox, ..., retained_mbox, ...);
It is because retained_msg mbox holds a smart reference to the last message. And the last message holds the reference to the retained_msg mbox.