-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.4 Synchronous Interaction
Versions 5.3-5.5 of SObjectizer supported synchronous interaction between agents. This stuff has been removed from SObjectizer-5.6. Now SObjectizer Core doesn't support synchronous interaction at all.
The so_5::extra::sync
submodule provides support for synchronous interaction between agents or worker threads (if only mchains are used) in the form of request_reply_t
message type and request_reply
, request_opt_reply
free functions.
All stuff related to synchronous interaction are defined in so_5_extra/sync/pub.hpp
header file. So to use this functionality it is necessary to include that file and so_5/all.hpp
file:
#include <so_5_extra/sync/pub.hpp>
#include <so_5/all.hpp>
The recommended way of issuing a synchronous request is the following:
// Define a short name for request-reply pair.
using my_request_reply = so_5::extra::sync::request_reply_t<my_request, my_reply>;
...
// Define a subscription for synchronous request in some agent.
class request_handler final : public so_5::agent_t {
...
void on_request(typename my_request_reply::request_mhood_t cmd) {
... // Handling of the request.
// The content of the request is available via cmd->request() method.
// Sending the reply for the request.
cmd->make_reply(...); // All arguments are going to my_reply's constructor.
}
...
void so_define_agent() override {
so_subscribe_self().event(&request_handler::on_request);
}
};
...
// Issuing a request to the direct mbox of some request_handling.
so_5::mbox_t handler_mbox = ...;
// Sending a request and waiting the reply for no more than 15s.
// An exception will be sent if the reply is not received.
my_reply reply = my_request_reply::ask_value(handler_mbox, 15s,
...); // All other arguments are goint to my_request's constructor.
There is also ask_opt_value
method that doesn't throw but returns an std::optional<Reply>
instance:
// Sending a request and waiting the reply for no more than 15s.
// An empty optional will be returned if the reply is not received.
std::optional<my_reply> reply = my_request_reply::ask_opt_value(handler_mbox, 15s,
...); // All other arguments are goint to my_request's constructor.
if(reply) {
... // Usage of *reply value.
}
Note. Method make_reply
can be called only once. An attempt to call make_reply
several times will lead to an exception.
There is no need to use an alias like my_requet_reply
as shown above. A request can be issued by request_reply()
or request_opt_reply()
free functions:
// Sending a request and waiting the reply for no more than 15s.
// An exception will be sent if the reply is not received.
my_reply reply = so_5::extra::sync::request_value<my_request, my_reply>(handler_mbox, 15s,
...); // All other arguments are goint to my_request's constructor.
// Sending a request and waiting the reply for no more than 15s.
// An empty optional will be returned if the reply is not received.
std::optional<my_reply> opt_reply =
so_5::extra::sync::request_opt_value<my_request, my_reply>(handler_mbox, 15s,
...); // All other arguments are goint to my_request's constructor.
if(opt_reply) {
... // Usage of *opt_reply value.
}
An event handler for a request can be defined that way:
void request_handler::on_request(
mutable_mhood_t<so_5::extra::sync::request_reply_t<my_request, my_reply>> cmd) {
...
}
Or, in more simple form:
void request_handler::on_request(
typename so_5::extra::sync::request_reply_t<my_request, my_reply>::request_mhood_t cmd) {
...
}
Or, in yet more simple form:
void request_handler::on_request(
so_5::extra::sync::request_mhood_t<my_request, my_reply> cmd) {
...
}
Synchronous interaction can also be used with mchains:
// Define a short name for request-reply pair.
using my_request_reply = so_5::extra::sync::request_reply_t<my_request, my_reply>;
...
// A worker thread that will handle incoming requests.
void request_handler(so_5::mchain_t in_chain) {
so_5::receive(from(in_chain).handle_all(),
[](typename my_request_reply::request_mhood_t cmd) {
... // Handling of the request.
// cmd->request() returns a reference to my_request object.
// Make the reply.
cmd->make_reply(...); // All arguments are going to my_reply's constructor.
});
}
// A worker thread that will issue requests.
void request_producer(so_5::mchain_t out_chain) {
// Issue a request and wait no more than 15s for the reply.
auto reply = my_request_reply::ask_value(out_chain, 15s,
...); // All other arguments are going to my_request's constructor.
}
When ask_value()
or ask_op_value()
method of request_reply_t<Q,A>
is called then:
- a new mchain is created. This mchain will be used for sending/receiving the reply;
- a new instance of
request_reply_t<Q,A>
is created. That instance holds the reply mchain and an instance ofQ
type. Note that additional arguments passed toask_value()
orask_opt_value()
are forwared to the constructor or thatQ
instance; - this new message is sent as a mutable message to the specified destination;
-
ask_value()
/ask_opt_value()
sleeps on the reply mchain waiting for the reply. If reply is received it will be returned. If not thenask_value()
throws an exception andask_opt_value()
returns an emptystd::optional
; - when
make_reply()
is called onrequest_reply_t<Q,A>
object then a new instance ofA
type is created. Then this instance is sent to the reply mchain.
Note that the destructor of request_reply_t<Q,A>
closes the reply mchain. It means that if the request_reply_t
message won't be handled the reply mchain will be automatically closed and this will awaken ask_value()
/ask_opt_value()
.
Note that request_reply()
and request_opt_value()
are just shotcuts for ask_value()
and ask_opt_value()
methods. Because of that request_reply()
and request_opt_value()
work as described above.
Because request_reply_t<Q,A>
instance are sent as a usual message then it can be easily forwarded to another destination:
class load_balancer final : public so_5::agent_t {
void on_request(typename some_request::request_mhood_t cmd) {
const auto new_dest = select_appropriate_destination();
// Forward the request to a new destination.
so_5::send(new_dest, std::move(cmd));
}
...
};
It's also possible to store and process request_reply_t<Q, A>
instance later:
class bunch_processor final : public so_5::agent_t {
// Container for holding pending requests.
std::vector<typename some_request::holder_t> pending_requests_;
// Reaction to the new incoming request.
void on_request(typename some_request::request_mhood_t cmd) {
// Just store request to process it later.
pending_requests_.push_back(cmd.make_holder());
}
// Time to process all pending requests.
void on_processing_time(mhood_t<some_timer_msg>) {
for(auto & r : pending_requests_)
r->make_reply(...);
}
};
Methods ask_value()
and ask_opt_value()
(as well as free functions request_reply()
/request_opt_value()
) create a new mchain and place this mchain into request_reply_t
to be used in make_reply()
method. Sometimes it could be inappropriate: you may want to use an already existing destination for request's reply.
There are initiate_with_custom_reply_to
methods in request_reply_t
which allows to issue a request with a specific destination for the reply.
For example, let's see how to issue a request with replying to the direct mbox of agent-issuer:
class request_issuer final : public so_5::agent_t {
// The mbox of request handler.
const so_5::mbox_t service_;
...
void on_some_event(mhood_t<some_event> cmd) {
...
// We want to issue a request here.
// Create a subscription of the reply.
so_subscribe_self().event(&request_issuer::on_reply);
// Issuing a new request.
some_request::initiate_with_custom_reply_to(
// The destination for the request.
service_,
// The destination for the reply.
*this,
...);
}
void on_reply(typename some_request::reply_mhood_t cmd) {
... // Handling of the request.
}
};
There is another example: redirection of replies from two separate requests to one mchain and handling of them in one receive
call:
using first_dialog = so_5::extra::sync::request_reply_t<first_request, first_reply>;
using second_dialog = so_5::extra::sync::request_reply_t<second_request, second_reply>;
// The single mchain for both replies.
auto reply_ch = create_mchain(env);
// Issuing requests. Please note the usage of do_not_close_reply_chain.
first_dialog::initiate_with_custom_reply_to(
service, reply_ch, so_5::extra::sync::do_not_close_reply_chain,
...);
second_dialog::initiate_with_custom_reply_to(
service, reply_ch, so_5::extra::sync::do_not_close_reply_chain,
...);
// Waiting and handling of replies.
receive(from(reply_ch).handle_n(2).empty_timeout(15s),
[](typename first_dialog::reply_mhood_t cmd) {...},
[](typename second_dialog::reply_mhood_t cmd) {...});
It is important to use do_not_close_reply_chain
here becase the reply chain will be closed after processing of the first request.