From 1c8874f6c2f4f8976c2267c21d8512d85f150627 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Wed, 13 Apr 2022 21:19:25 -0700 Subject: [PATCH] Deliver commit results for auto-forwarding in async mode * In async mode, the commit result will be ready later. If the request is come from the other server through auto-forwarding, the request should wait for the result before returning the response. --- include/libnuraft/async.hxx | 14 ++++++++++ src/asio_service.cxx | 46 ++++++++++++++++++++++++++++---- tests/unit/asio_service_test.cxx | 17 ++++++++++++ 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/include/libnuraft/async.hxx b/include/libnuraft/async.hxx index a6ffe785..654743d2 100644 --- a/include/libnuraft/async.hxx +++ b/include/libnuraft/async.hxx @@ -117,6 +117,20 @@ public: public: + /** + * Clear all internal data. + */ + void reset() { + std::lock_guard guard(lock_); + err_ = TE(); + code_ = cmd_result_code::OK; + has_result_ = false; + accepted_ = false; + handler_ = nullptr; + handler2_ = nullptr; + result_ = T(); + } + /** * Install a handler that will be invoked when * we get the result of replication. diff --git a/src/asio_service.cxx b/src/asio_service.cxx index 8e0d2e8e..d16645ab 100644 --- a/src/asio_service.cxx +++ b/src/asio_service.cxx @@ -313,12 +313,13 @@ class rpc_session { if (err) { p_er( "session %zu failed to read rpc header from socket %s:%u " - "due to error %d, %s", + "due to error %d, %s, ref count %u", session_id_, cached_address_.c_str(), cached_port_, err.value(), - err.message().c_str() ); + err.message().c_str(), + self.use_count() ); this->stop(); return; } @@ -560,11 +561,46 @@ class rpc_session return; } - if (resp->has_cb()) { - // If callback function exists, get new response message. - resp = resp->call_cb(resp); + if (resp->has_async_cb()) { + // Response will be ready later, setup a callback function + // (only for auto-forwarding with `client_request` type + // in async handling mode). + ptr< cmd_result< ptr > > ret = resp->call_async_cb(); + + // WARNING: `self` should be captured to avoid releasing this `rpc_session`. + ret->when_ready( + [this, self, req, resp] + ( cmd_result, ptr>& res, + ptr& exp ) { + resp->set_ctx(res.get()); + on_resp_ready(req, resp); + // This is needed to avoid circular reference. + res.reset(); + } + ); + + } else { + // Response should already be ready when we reach here. + if (resp->has_cb()) { + // If callback function exists, get new response message. + resp = resp->call_cb(resp); + } + on_resp_ready(req, resp); } + } catch (std::exception& ex) { + p_er( "session %zu failed to process request message " + "due to error: %s", + this->session_id_, + ex.what() ); + this->stop(); + } + } + + void on_resp_ready(ptr req, ptr resp) { + ptr self = this->shared_from_this(); + + try { ptr resp_ctx = resp->get_ctx(); int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0; diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index 7975e529..157c4fe8 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -15,6 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. **************************************************************************/ +#include "buffer_serializer.hxx" #include "debugging_options.hxx" #include "raft_package_asio.hxx" @@ -1467,6 +1468,22 @@ int auto_forwarding_test(bool async) { CHK_GT(s1.getTestSm()->isCommitted(test_msg), 0); } + // All handlers should have the result. + { + std::set commit_results; + std::lock_guard l(handlers_lock); + for (auto& handler: handlers) { + ptr h_result = handler->get(); + CHK_NONNULL(h_result); + CHK_EQ(8, h_result->size()); + buffer_serializer bs(h_result); + uint64_t val = bs.get_u64(); + commit_results.insert(val); + } + // All messages should have delivered their results. + CHK_EQ(NUM_PARALLEL_MSGS, commit_results.size()); + } + // State machine should be identical. CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) ); CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );