Skip to content

Commit

Permalink
Add result code to response messages
Browse files Browse the repository at this point in the history
In some cases, the response is marked as accepted even if there was an
error. Therefore, including the accepted tag in the response message is
not enough: a follower may believe that an auto forwarded request was
successful when it wasn't.

Signed-off-by: Alex Michon <[email protected]>
  • Loading branch information
amichon-kalray committed Jun 19, 2024
1 parent 8731295 commit 3b2d3b3
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ limitations under the License.
// If set, each log entry will contain a CRC on the payload.
#define CRC_ON_PAYLOAD (0x10)

// If set, RPC message (response) includes result code
#define INCLUDE_RESULT_CODE (0x20)

// =======================

namespace nuraft {
Expand Down Expand Up @@ -735,6 +738,7 @@ class rpc_session
try {
ptr<buffer> resp_ctx = resp->get_ctx();
int32 resp_ctx_size = (resp_ctx) ? resp_ctx->size() : 0;
int32 result_code_size = sizeof(cmd_result_code);

uint32_t flags = 0x0;
size_t resp_meta_size = 0;
Expand All @@ -759,6 +763,13 @@ class rpc_session

size_t carried_data_size = resp_meta_size + resp_hint_size + resp_ctx_size;

if (req->get_type() == msg_type::client_request ||
req->get_type() == msg_type::add_server_request ||
req->get_type() == msg_type::remove_server_request) {
flags |= INCLUDE_RESULT_CODE;
carried_data_size += result_code_size;
}

int buf_size = RPC_RESP_HEADER_SIZE + carried_data_size;
ptr<buffer> resp_buf = buffer::alloc(buf_size);
buffer_serializer bs(resp_buf);
Expand Down Expand Up @@ -798,6 +809,11 @@ class rpc_session
bs.put_buffer(*resp_ctx);
}

/* Put result code at the end to avoid breaking backward compatibility */
if (flags & INCLUDE_RESULT_CODE) {
bs.put_i32(resp->get_result_code());
}

aa::write( ssl_enabled_, ssl_socket_, socket_,
asio::buffer(resp_buf->data_begin(), resp_buf->size()),
[this, self, resp_buf]
Expand Down Expand Up @@ -1688,8 +1704,9 @@ class asio_rpc_client
size_t bytes_transferred)
{
if ( !(flags & INCLUDE_META) &&
!(flags & INCLUDE_HINT) ) {
// Neither meta nor hint exists,
!(flags & INCLUDE_HINT) &&
!(flags & INCLUDE_RESULT_CODE)) {
// Neither meta nor hint nor result code exists,
// just use the buffer as it is for ctx.
ctx_buf->pos(0);
rsp->set_ctx(ctx_buf);
Expand Down Expand Up @@ -1739,9 +1756,21 @@ class asio_rpc_client
assert(remaining_len >= 0);
if (remaining_len) {
// It has context, read it.
ptr<buffer> actual_ctx = buffer::alloc(remaining_len);
size_t ctx_len = remaining_len;
if (flags & INCLUDE_RESULT_CODE) {
ctx_len -= sizeof(int32_t);
}
ptr<buffer> actual_ctx = buffer::alloc(ctx_len);
bs.get_buffer(actual_ctx);
rsp->set_ctx(actual_ctx);
remaining_len -= ctx_len;
}

// 4) Result code
if (flags & INCLUDE_RESULT_CODE) {
assert((size_t)remaining_len >= sizeof(int32_t));
cmd_result_code res = static_cast<cmd_result_code>(bs.get_i32());
rsp->set_result_code(res);
}

operation_timer_.cancel();
Expand Down

0 comments on commit 3b2d3b3

Please sign in to comment.