Skip to content

Commit

Permalink
Deliver commit results for auto-forwarding in async mode (#350)
Browse files Browse the repository at this point in the history
* In async mode, the commit result will be ready later. If the request
is come from the other server through auto-forwarding, the request
should wait for the result before returning the response.
  • Loading branch information
greensky00 authored Apr 14, 2022
1 parent 262e90b commit 877a60a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
14 changes: 14 additions & 0 deletions include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ public:

public:

/**
* Clear all internal data.
*/
void reset() {
std::lock_guard<std::mutex> guard(lock_);
err_ = TE();
code_ = cmd_result_code::OK;
has_result_ = false;
accepted_ = false;
handler_ = nullptr;
handler2_ = nullptr;
result_ = T();
}

/**
* Install a handler that will be invoked when
* we get the result of replication.
Expand Down
46 changes: 41 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,13 @@ class rpc_session
{
if (err) {
p_er( "session %zu failed to read rpc header from socket %s:%u "
"due to error %d, %s",
"due to error %d, %s, ref count %u",
session_id_,
cached_address_.c_str(),
cached_port_,
err.value(),
err.message().c_str() );
err.message().c_str(),
self.use_count() );
this->stop();
return;
}
Expand Down Expand Up @@ -560,11 +561,46 @@ class rpc_session
return;
}

if (resp->has_cb()) {
// If callback function exists, get new response message.
resp = resp->call_cb(resp);
if (resp->has_async_cb()) {
// Response will be ready later, setup a callback function
// (only for auto-forwarding with `client_request` type
// in async handling mode).
ptr< cmd_result< ptr<buffer> > > ret = resp->call_async_cb();

// WARNING: `self` should be captured to avoid releasing this `rpc_session`.
ret->when_ready(
[this, self, req, resp]
( cmd_result<ptr<buffer>, ptr<std::exception>>& res,
ptr<std::exception>& exp ) {
resp->set_ctx(res.get());
on_resp_ready(req, resp);
// This is needed to avoid circular reference.
res.reset();
}
);

} else {
// Response should already be ready when we reach here.
if (resp->has_cb()) {
// If callback function exists, get new response message.
resp = resp->call_cb(resp);
}
on_resp_ready(req, resp);
}

} catch (std::exception& ex) {
p_er( "session %zu failed to process request message "
"due to error: %s",
this->session_id_,
ex.what() );
this->stop();
}
}

void on_resp_ready(ptr<req_msg> req, ptr<resp_msg> resp) {
ptr<rpc_session> self = this->shared_from_this();

try {
ptr<buffer> resp_ctx = resp->get_ctx();
int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0;

Expand Down
17 changes: 17 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#include "buffer_serializer.hxx"
#include "debugging_options.hxx"
#include "raft_package_asio.hxx"

Expand Down Expand Up @@ -1467,6 +1468,22 @@ int auto_forwarding_test(bool async) {
CHK_GT(s1.getTestSm()->isCommitted(test_msg), 0);
}

// All handlers should have the result.
{
std::set<uint64_t> commit_results;
std::lock_guard<std::mutex> l(handlers_lock);
for (auto& handler: handlers) {
ptr<buffer> h_result = handler->get();
CHK_NONNULL(h_result);
CHK_EQ(8, h_result->size());
buffer_serializer bs(h_result);
uint64_t val = bs.get_u64();
commit_results.insert(val);
}
// All messages should have delivered their results.
CHK_EQ(NUM_PARALLEL_MSGS, commit_results.size());
}

// State machine should be identical.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );
Expand Down

0 comments on commit 877a60a

Please sign in to comment.