-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.5 Collecting Mbox
Sometimes an agent X wants to receive exactly N messages of type M. For example, a parent agent starts five children agents. Every child needs to do some time-consuming actions at the start (loading configuration, establishing a connection to a database, initializing external devices and so on). When the starting action is done a child agent sends special ready signal to the parent. The parent needs to collect all five ready signals. Only then the parent can do its own work.
Without collecting_mbox this task can be done via a separate counter inside the parent agent. Something like that:
// A ready signal from child.
struct child_started_t final : public so_5::signal_t {};
class parent_agent_t final : public so_5::agent_t
{
// How many children are already started.
unsigned int m_child_started = 0;
// Handler for ready signal.
void on_child_started(mhood_t<child_started_t>)
{
++m_child_started;
if( 5 == m_child_started )
... // We can do out main work.
}
...
};
There is no need to count received signals by hand if a collecting_mbox is used. We just create a collecting_mbox for child_started_t
signals and tell how many signals needs to be collected. Then we take this mbox to the children agents and they send child_started_t
signals to that mbox. The mbox collects signals and only all of them are received the mbox sends special messages_collected_t
message to the parent agent.
This is a simple example of how the task like the one described above can be solved:
#include <so_5_extra/mboxes/collecting_mbox.hpp>
#include <so_5/all.hpp>
// Type of signal about readyness of child agent.
struct child_started_t final : public so_5::signal_t {};
// Type of a child agent.
class child_t final : public so_5::agent_t
{
public:
child_t(context_t ctx, so_5::mbox_t ready_mbox)
: so_5::agent_t(std::move(ctx))
, m_ready_mbox(std::move(ready_mbox))
{}
void so_evt_start() override
{
// Parent must be informed about child readyness.
so_5::send<child_started_t>(m_ready_mbox);
}
private:
const so_5::mbox_t m_ready_mbox;
};
// Type of example performer agent.
class sample_performer_t final : public so_5::agent_t
{
// Type of collecting mbox template for child_started_t signal.
using child_started_mbox_t =
so_5::extra::mboxes::collecting_mbox::mbox_template_t<child_started_t>;
public:
sample_performer_t(context_t ctx, std::size_t child_count)
: so_5::agent_t(std::move(ctx))
, m_child_count(child_count)
{
so_subscribe_self()
.event(&sample_performer_t::on_all_child_started);
}
void so_evt_start() override
{
// Create instance of collecting mbox.
auto ready_mbox = child_started_mbox_t::make(
so_direct_mbox(),
m_child_count);
// All children agents will work on the same thread_pool dispatcher.
auto tp_disp = so_5::disp::thread_pool::make_dispatcher(
so_environment(),
3 );
for(std::size_t i = 0; i != m_child_count; ++i)
{
introduce_child_coop(*this,
tp_disp.binder(),
[&ready_mbox](so_5::coop_t & coop) {
coop.make_agent<child_t>(ready_mbox);
});
}
std::cout << "All children agents are created" << std::endl;
}
private:
// Count of children agents to be created.
const std::size_t m_child_count;
void on_all_child_started(mhood_t<child_started_mbox_t::messages_collected_t>)
{
std::cout << "All children agents are started" << std::endl;
so_deregister_agent_coop_normally();
}
};
int main()
{
try
{
so_5::launch([](so_5::environment_t & env) {
// Make parent coop.
env.introduce_coop([&](so_5::coop_t & coop) {
// Example performer will work on the default dispatcher.
coop.make_agent<sample_performer_t>(25u);
});
});
}
catch(const std::exception & x)
{
std::cerr << "Exception caught: " << x.what() << std::endl;
}
}
An instance of collecting_mbox accepts messages of the specified type and collects them in its own internal buffer. When N messages are received the collecting_mbox sends a message of special type messages_collected_t
to the target mbox (where N and the target mbox are specified by a programmer). An instance of messages_collected_t
contains N collected messages. After that, the collecting_mbox collects new N messages and sends another instance of messages_collected_t
and so on.
Collecting_mbox can collect messages of just one type. An attempt to send a message of any other type will lead to an exception.
Two things need to be done for the creation of a collecting_mbox.
The first one is tunning a mbox_template_t
from namespace so_5::extra::mboxes::collecting_mbox
. This template receives three template parameters. Just one of them is mandatory:
-
Collecting_Msg
specifies the type of messages/signals to be collected. In the example above it waschild_started_t
; -
Traits
specifies additional traits which are dependent on the count of collecting messages. By default a typeruntime_size_traits_t
is used. More aboutTraits
below; -
Lock_Type
specifies a type of object for thread safety of mbox's internal data. Typestd::mutex
is used by default. Aso_5::null_mutex_t
can be used for single-threaded SObjectizer's environments. Or it can be any user-specific type which can be used withstd::lock_guard
.
Usually a precise collecting_mbox type is defined by a typedef. Something like this:
using my_msg_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
// The type of messages to be collected.
// All other parameters have default values.
my_msg>;
using another_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
// The type of messages to be collected.
another_msg_type,
// Count of messages will be known only at run-time.
so_5::extra::mboxes::collecting_mbox::runtime_size_traits_t,
// Null_mutex will be used instead of std::mutex.
// It means that this mbox type can be used only in single-threaded environment.
so_5::null_mutex_t>;
Such typedefs make life easier for mbox creation and subscription to messages_collected_t
type.
The second thing is a creation of instance of collecting_mbox. Static method make
of mbox_template_t
class is used for collecting_mbox creation. A parameter list for make
depends on template parameter Traits
for mbox_template_t
. If runtime_size_traits_t
is used then make
will have the following format:
so_5::mbox_t make(
// SObjectizer Environment to work in.
so_5::environment_t & env,
// A target mbox for messages_collected_t.
const so_5::mbox_t & target_mbox,
// How many messages need to be collected.
std::size_t messages_to_collect);
If constexpr_size_traits_t
is used then make
will have the following format without messages_to_collect
parameter:
so_5::mbox_t make(
// SObjectizer Environment to work in.
so_5::environment_t & env,
// A target mbox for messages_collected_t.
const so_5::mbox_t & target_mbox);
If some other type is used as Traits
parameter then the format of make
will depend on that type. At the moment of writing so_5_extra has only two types which can be used for Traits
template parameter: runtime_size_traits_t
and constexpr_size_traits_t
.
A typename messages_collected_t
is defined inside template class mbox_template_t
. This is the name of type for a message that will be sent when a full bunch of messages of type Collecting_Msg
is collected. Actual type behind messages_collected_t
typename depends on several factors:
If Collecting_Msg
is a signal type then messages_collected_t
will be a type with an interface like this:
class messages_collected_t final : public so_5::message_t
{
...
public:
std::size_t size();
};
It means that in the case of signal there will no any other information except the count of collected signals.
If Collecting_Msg
is a message type then messages_collected_t
will be a type with an interface like this:
class messages_collected_t final : public so_5::message_t
{
some_container_t m_collected_messages;
...
public:
std::size_t size() const;
template< typename F >
decltype(auto) with_nth( std::size_t index, F && f ) const;
template< typename F >
void for_each( F && f ) const;
template< typename F >
void for_each_with_index( F && f ) const;
};
It means that is the case of message there will be all collected messages inside an instance of messages_collected_t
. Access to these collected messages can be obtained via with_nth
, for_each
and for_each_with_index
methods.
A Traits
template parameter is the trickiest one. Cases, where the count of messages being collected is known at the compile-time and at the run-time, are handled separately in implementation of mbox_template_t
.
If the count of collected messages is known at the compile-time then messages_collected_t
uses std::array
as a container for messages received. If the count of messages is not known at the compile time then messages_collected_t
uses std::vector
(but it requires additional memory allocation at run-time).
By default runtime_size_traits_t
is used as Traits
. It means that the count of messages to be collected must be specified at run-time during the creation of a mbox instance. It also means that std::vector
will be used inside messages_collected_t
.
If count of messages to be collected is known at the compile-time then constexpr_size_traits_t
can be used as Traits
:
using my_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
// Type of messages to be collected.
my_msg_type,
// Count of messages is known at compile-time.
// Only 5 messages needs to be collected.
so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<5>>;
If a collecting_mbox is used for messages but not signals then all collected messages are accessible from messages_collected_t
instance. It is possible to access them by with_nth
, for_each
and for_each_with_index
methods.
Method with_nth
provides access to i-th message. Method with_nth
receives two parameters. The first is index (from 0 to (size()-1)) and the second is lambda-function/functor. This lambda-function will be called inside with_nth
and single parameter of type mhood_t<Collecting_Msg>
will be passed to it. A value returned by this lambda-function will be the return value of with_nth
. An example:
void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
auto v = cmd->with_nth(0, [](mhood_t<my_msg> m) { return m->some_value(); });
...
}
Method for_each
does enumeration of all collected messages. It has just one parameter: a lambda-function/functor with the same format as a lambda-function for with_nth
method. This lambda-function will be called for every collected message. Return value from lambda-function is ignored because for_each
returns void. An example:
void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
std::vector<std::string> names;
names.reserve(cmd->size());
cmd->for_each([&names](mhood_t<my_msg> m) { names.push_back(m->some_value()); });
...
}
Method for_each_with_index
also does enumeration of all collected messages. But it accepts a lambda-function/functor with two parameters: the first has type std::size_t
-- this is an ordinal number of a message, the second has type mhood_t<Collecting_Msg>
-- this is a reference to a message. Return value from lambda-function is ignored because for_each_with_index
is ignored. An example:
void my_agent::on_collected_msg(mhood_t<my_mbox_type::messages_collected_t> cmd)
{
std::vector<std::string> names(cmd->size());
cmd->for_each_with_index(
[&names](std::size_t index, mhood_t<my_msg> m) {
names[index] = m->some_value();
});
...
}
A collecting_mbox can collect mutable message but three conditions must be fulfilled:
The first one: a type Collecting_Msg
must contain mutable_msg
marker:
using my_mutable_mbox_type = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
// Type of collected messages.
so_5::mutable_msg<my_msg_type>>;
The second one: a target mbox must be multi-producer/single-consumer mbox. For example, it can be the direct mbox of an agent or mchain. An attempt to specify multi-producer/multi-consumer mbox as target mbox will lead to an exception during the creation of a collecting_mbox.
The third one: messages_collected_t
will be sent to the target mbox as mutable_msg
:
void my_agent::so_define_agent() override
{
so_subscribe_self().event(&my_agent::on_collected_msg);
...
}
void my_agent::on_collected_msg(
mhood_t<so_5::mutable_msg<my_mutable_mbox_type::messages_collected_t>> cmd)
{
...
}
// Or:
void my_agent::on_collected_msg(
mutable_mhood_t<my_mutable_mbox_type::messages_collected_t> cmd)
{
...
}
Note: a lambda-function which is passed to with_nth
, for_each
and for_each_with_index
will receve mhood_t<mutable_msg<Collecting_Msg>>
:
void my_agent::on_collected_msg(
mutable_mhood_t<my_mutable_mbox_type::messages_collected_t> cmd)
{
cmd->with_nth(0, [](mutable_mhood_t<my_msg_type> m) {
...
});
}
Let's look at a more complex example of collecting_mbox usage. Suppose that we want to store info about books. But this info must be split between several shards: one shard-agent holds only book's author, another shard-agent holds only book's title, third shard-agent holds only book's summary.
To store a book we should send a message to all shard-agents, then we should wait for acknowledgment from every shard. To retrieve book's info, we should send another message to all shard-agents and should wait for reply messages with parts of the book's info.
This is how that task can look like in the code.
We start with basic things and tools:
// Which field from book's description will be stored by shared-agent.
enum class field_id_t
{
author,
title,
summary
};
// Type for book's description.
struct book_description_t
{
std::string m_author;
std::string m_title;
std::string m_summary;
};
// Helpers for extraction of fields from book's description.
template<field_id_t Field>
std::string get(const book_description_t &);
template<>
std::string get<field_id_t::author>(const book_description_t & b)
{
return b.m_author;
}
template<>
std::string get<field_id_t::title>(const book_description_t & b)
{
return b.m_title;
}
template<>
std::string get<field_id_t::summary>(const book_description_t & b)
{
return b.m_summary;
}
Now the messages which will be used for interaction between agents:
// This message will be sent when description of a new book must be stored.
struct store_book_t : public so_5::message_t
{
// Unique book ID.
const int m_key;
// Description of a new book.
const book_description_t m_book;
// Mbox for store_book_ack_t message.
const so_5::mbox_t m_ack_to;
store_book_t(int key, book_description_t book, so_5::mbox_t ack_to)
: m_key(key), m_book(std::move(book)), m_ack_to(std::move(ack_to))
{}
};
// This message will be sent as acknowledgement for store_book_t.
struct store_book_ack_t : public so_5::message_t
{
// Unique book ID.
const int m_key;
store_book_ack_t(int key)
: m_key(key)
{}
};
// This message will be send when someone wants to get book's description.
struct request_data_t : public so_5::message_t
{
// Unique book ID.
const int m_key;
// Mbox for data_t message.
const so_5::mbox_t m_reply_to;
request_data_t(int key, so_5::mbox_t reply_to)
: m_key(key), m_reply_to(std::move(reply_to))
{}
};
// This message will be sent by shard-agents as reply to request_data_t message.
struct data_t : public so_5::message_t
{
const int m_key;
const field_id_t m_field;
const std::string m_data;
data_t(int key, field_id_t field, std::string data)
: m_key(key), m_field(field), m_data(std::move(data))
{}
};
Now we can define shard-agents. It will be template agent:
// Type of shard-agent.
template<
// Which part of book's description is stored by shard.
field_id_t Field>
class shard_t final : public so_5::agent_t
{
public:
shard_t(context_t ctx, so_5::mbox_t command_mbox)
: so_5::agent_t(std::move(ctx))
{
so_subscribe(command_mbox)
.event(&shard_t::on_store_book)
.event(&shard_t::on_request_data);
}
private:
using map_t = std::map<int, std::string>;
map_t m_data;
void on_store_book(mhood_t<store_book_t> cmd)
{
m_data[cmd->m_key] = get<Field>(cmd->m_book);
so_5::send<store_book_ack_t>(cmd->m_ack_to, cmd->m_key);
}
void on_request_data(mhood_t<request_data_t> cmd)
{
so_5::send<data_t>(cmd->m_reply_to, cmd->m_key, Field, m_data[cmd->m_key]);
}
};
Now we can write an agent which interacts with shard-agents.
Agent of type sample_performer_t
sends several store_book
messages at the start. When it receives an ack for store book command (in the form of store_book_ack
message) it requests a description of that book (by sending request_data
message). When it receives descriptions for all books it finishes the example.
There are two things which require some explanations:
- we know the count of shard-agents at the compile-time and this count can't be changed. Because of that, we use
constexpr_size_traits_t
; - reply messages
store_book_ack
anddata
can easily be mixed. For example, a shard-agent can receive twostore_book
requests and process them before these messages will be received by other shard-agents. It means that we should not only collect replies but we also should group them by unique book's ID. If we don't do that we can receive amessages_collected_t
which contains replies for different books. We will use a very simple solution for that problem: we will create a new collecting_mbox for every new request. It means that replies will go to different mboxes and because of that replies won't be mixed. The creation of new collecting_mbox is a rather cheap procedure and a collecting_mbox that is no more in use will be destroyed automatically.
This is the implementation of sample_performer_t
agent:
// Type of example performer agent.
class sample_performer_t final : public so_5::agent_t
{
// Count of books is known at the compile-time.
static constexpr std::size_t total_books = 3;
public:
sample_performer_t(context_t ctx, so_5::mbox_t command_mbox)
: so_5::agent_t(std::move(ctx))
, m_command_mbox(std::move(command_mbox))
{
so_subscribe_self()
.event(&sample_performer_t::on_store_ack)
.event(&sample_performer_t::on_data);
}
virtual void so_evt_start() override
{
// Store all example books at the start of work.
std::array<book_description_t, total_books> books{{
{ "Miguel De Cervantes", "Don Quixote",
"The story of the gentle knight and his servant Sancho Panza has "
"entranced readers for centuries. " },
{ "Jonathan Swift", "Gulliver's Travels",
"A wonderful satire that still works for all ages, despite the "
"savagery of Swift's vision." },
{ "Stendhal", "The Charterhouse of Parma",
"Penetrating and compelling chronicle of life in an Italian "
"court in post-Napoleonic France." }
}};
int key = 0;
for( const auto & b : books )
{
so_5::send<store_book_t>(m_command_mbox,
key,
b,
// Create new collecting mbox for every book.
// This prevents from collection of acks for different books
// in one messages_collected instance.
store_ack_mbox_t::make(so_direct_mbox()));
++key;
}
}
private:
// Type of collecting mbox for store_book_ack_t messages.
using store_ack_mbox_t = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
store_book_ack_t,
so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<3>>;
// Type of collecting mbox for data_t messages.
using data_mbox_t = so_5::extra::mboxes::collecting_mbox::mbox_template_t<
data_t,
so_5::extra::mboxes::collecting_mbox::constexpr_size_traits_t<3>>;
// Mbox to be used for communication with shard-agents.
const so_5::mbox_t m_command_mbox;
// Count of processed books.
// It is necessary for shutdown of example.
std::size_t m_books_received = 0;
void on_store_ack(mhood_t<store_ack_mbox_t::messages_collected_t> cmd)
{
// Key will be same for every message.
// So we extract it from the first message.
const auto key = cmd->with_nth(0, [](auto m) { return m->m_key; });
std::cout << "Book with key=" << key << " is stored" << std::endl;
// Initiate request_data_t for this book.
so_5::send<request_data_t>(m_command_mbox,
key,
// Create a new collecting mbox for this request.
data_mbox_t::make(so_direct_mbox()));
}
void on_data(mhood_t<data_mbox_t::messages_collected_t> cmd)
{
// Key will be same for every message.
// So we extract it from the first message.
const auto key = cmd->with_nth(0, [](auto m) { return m->m_key; });
// A full book's description must be constructed from different parts.
// Use for_each to handle every collected message.
book_description_t book;
cmd->for_each([&book](auto m) {
if(field_id_t::author == m->m_field)
book.m_author = m->m_data;
else if(field_id_t::title == m->m_field)
book.m_title = m->m_data;
else if(field_id_t::summary == m->m_field)
book.m_summary = m->m_data;
});
std::cout << "Book with key=" << key << " is {"
<< book.m_author << ", '" << book.m_title << "', "
<< book.m_summary << "}" << std::endl;
++m_books_received;
if(total_books == m_books_received)
so_deregister_agent_coop_normally();
}
};