Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deliver commit results for auto-forwarding in async mode #350

Merged
merged 1 commit into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ public:

public:

/**
* Clear all internal data.
*/
void reset() {
std::lock_guard<std::mutex> guard(lock_);
err_ = TE();
code_ = cmd_result_code::OK;
has_result_ = false;
accepted_ = false;
handler_ = nullptr;
handler2_ = nullptr;
result_ = T();
}

/**
* Install a handler that will be invoked when
* we get the result of replication.
Expand Down
46 changes: 41 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,13 @@ class rpc_session
{
if (err) {
p_er( "session %zu failed to read rpc header from socket %s:%u "
"due to error %d, %s",
"due to error %d, %s, ref count %u",
session_id_,
cached_address_.c_str(),
cached_port_,
err.value(),
err.message().c_str() );
err.message().c_str(),
self.use_count() );
this->stop();
return;
}
Expand Down Expand Up @@ -560,11 +561,46 @@ class rpc_session
return;
}

if (resp->has_cb()) {
// If callback function exists, get new response message.
resp = resp->call_cb(resp);
if (resp->has_async_cb()) {
// Response will be ready later, setup a callback function
// (only for auto-forwarding with `client_request` type
// in async handling mode).
ptr< cmd_result< ptr<buffer> > > ret = resp->call_async_cb();

// WARNING: `self` should be captured to avoid releasing this `rpc_session`.
ret->when_ready(
[this, self, req, resp]
( cmd_result<ptr<buffer>, ptr<std::exception>>& res,
ptr<std::exception>& exp ) {
resp->set_ctx(res.get());
on_resp_ready(req, resp);
// This is needed to avoid circular reference.
res.reset();
}
);

} else {
// Response should already be ready when we reach here.
if (resp->has_cb()) {
// If callback function exists, get new response message.
resp = resp->call_cb(resp);
}
on_resp_ready(req, resp);
}

} catch (std::exception& ex) {
p_er( "session %zu failed to process request message "
"due to error: %s",
this->session_id_,
ex.what() );
this->stop();
}
}

void on_resp_ready(ptr<req_msg> req, ptr<resp_msg> resp) {
ptr<rpc_session> self = this->shared_from_this();

try {
ptr<buffer> resp_ctx = resp->get_ctx();
int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0;

Expand Down
17 changes: 17 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#include "buffer_serializer.hxx"
#include "debugging_options.hxx"
#include "raft_package_asio.hxx"

Expand Down Expand Up @@ -1467,6 +1468,22 @@ int auto_forwarding_test(bool async) {
CHK_GT(s1.getTestSm()->isCommitted(test_msg), 0);
}

// All handlers should have the result.
{
std::set<uint64_t> commit_results;
std::lock_guard<std::mutex> l(handlers_lock);
for (auto& handler: handlers) {
ptr<buffer> h_result = handler->get();
CHK_NONNULL(h_result);
CHK_EQ(8, h_result->size());
buffer_serializer bs(h_result);
uint64_t val = bs.get_u64();
commit_results.insert(val);
}
// All messages should have delivered their results.
CHK_EQ(NUM_PARALLEL_MSGS, commit_results.size());
}

// State machine should be identical.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );
Expand Down