Skip to content

Commit

Permalink
[SKV-636] fix: Fault-tolerant storage engine errors for read operatio…
Browse files Browse the repository at this point in the history
…ns (apache#1447)

对应社区commit: https://github.com/apache/incubator-pegasus/pull/1447/files

注: 单测部分变更较大,本次未合入

apache#1383

ReplicaServer doesn't handle the error returned from storage engine, thus
even if the storage engine is corrupted, the server doesn't recognize these
situactions, and still running happily. However, the client always gets an
error status.
This situaction will not recover automatically except stopping the server
and moving away the corrupted RocksDB directories manually.

This patch handle the kCorruption error returned from storage engine, then
close the replcia, move the directory to ".err" trash path. The replica is
able to recover automatically (if RF > 1).
  • Loading branch information
acelyc111 authored and 王聃 committed May 5, 2023
1 parent 4c24c98 commit 072c022
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class replication_app_base : public replica_base
//
virtual replication::decree last_durable_decree() const = 0;
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) = 0;
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;

//
// Parameters:
Expand Down
35 changes: 28 additions & 7 deletions src/rdsn/include/dsn/dist/replication/storage_serverlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
#include <unordered_map>
#include <functional>

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h>
#include <dsn/service_api_cpp.h>
#include <rrdb/rrdb.code.definition.h>

namespace dsn {
namespace replication {
template <typename T>
class storage_serverlet
{
protected:
typedef std::function<void(T *, dsn::message_ex *req)> rpc_handler;
typedef std::function<int(T *, dsn::message_ex *req)> rpc_handler;
static std::unordered_map<std::string, rpc_handler> s_handlers;
static std::vector<rpc_handler> s_vhandlers;

Expand All @@ -48,11 +51,19 @@ class storage_serverlet
const char *name,
void (*handler)(T *svc, const TReq &req, rpc_replier<TResp> &resp))
{
// Only allowed to register simple.kv rpc handler.
dassert_f(dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_READ == rpc_code ||
dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_WRITE == rpc_code ||
dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_APPEND == rpc_code,
"Not allowed to register with rpc_code {}",
rpc_code);
rpc_handler h = [handler](T *p, dsn::message_ex *r) {
TReq req;
::dsn::unmarshall(r, req);
rpc_replier<TResp> replier(r->create_response());
handler(p, req, replier);
// For simple.kv, always return 0 which means success.
return 0;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -64,7 +75,9 @@ class storage_serverlet
void (*handler)(T *svc, TRpcHolder))
{
rpc_handler h = [handler](T *p, dsn::message_ex *request) {
handler(p, TRpcHolder::auto_reply(request));
auto rh = TRpcHolder::auto_reply(request);
handler(p, rh);
return rh.response().error;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -75,10 +88,14 @@ class storage_serverlet
const char *name,
void (*handler)(T *svc, const TReq &req))
{
// Only allowed to register RPC_RRDB_RRDB_CLEAR_SCANNER handler.
dcheck_eq(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER, rpc_code);
rpc_handler h = [handler](T *p, dsn::message_ex *r) {
TReq req;
::dsn::unmarshall(r, req);
handler(p, req);
// For RPC_RRDB_RRDB_CLEAR_SCANNER, always return 0 which means success.
return 0;
};

return register_async_rpc_handler(rpc_code, name, h);
Expand All @@ -102,11 +119,15 @@ class storage_serverlet

static const rpc_handler *find_handler(dsn::task_code rpc_code)
{
if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr)
if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr) {
return &s_vhandlers[rpc_code];
}

auto iter = s_handlers.find(rpc_code.to_string());
if (iter != s_handlers.end())
if (iter != s_handlers.end()) {
return &(iter->second);
}

return nullptr;
}

Expand All @@ -116,16 +137,16 @@ class storage_serverlet
dsn::task_code t = request->rpc_code();
const rpc_handler *ptr = find_handler(t);
if (ptr != nullptr) {
// TODO(yingchun): add return value
(*ptr)(static_cast<T *>(this), request);
return (*ptr)(static_cast<T *>(this), request);
} else {
dwarn("recv message with unhandled rpc name %s from %s, trace_id = %016" PRIx64,
t.to_string(),
request->header->from_address.to_string(),
request->header->trace_id);
dsn_rpc_reply(request->create_response(), ::dsn::ERR_HANDLER_NOT_FOUND);
// TODO(yingchun): return a non-zero value
return 0;
}
return 0;
}
};

Expand Down
16 changes: 14 additions & 2 deletions src/rdsn/src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <dsn/utility/string_conv.h>
#include <dsn/utility/strings.h>
#include <dsn/tool-api/rpc_message.h>
#include <rocksdb/status.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -257,8 +258,19 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)

uint64_t start_time_ns = dsn_now_ns();
dassert(_app != nullptr, "");
// TODO(yingchun): check the return value.
_app->on_request(request);
auto storage_error = _app->on_request(request);
if (dsn_unlikely(storage_error != ERR_OK)) {
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
case rocksdb::Status::kCorruption:
handle_local_failure(ERR_RDB_CORRUPTION);
break;
default:
derror_replica("client read encountered an unhandled error: {}", storage_error);
}
return;
}

// If the corresponding perf counter exist, count the duration of this operation.
// rpc code of request is already checked in message_ex::rpc_code, so it will always be legal
Expand Down
2 changes: 1 addition & 1 deletion src/rdsn/src/replica/storage/simple_kv/simple_kv.client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <dsn/utility/optional.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/replication/partition_resolver.h>
#include "simple_kv.code.definition.h"
#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h>
#include "simple_kv_types.h"

namespace dsn {
Expand Down
8 changes: 6 additions & 2 deletions src/rdsn/src/replica/storage/simple_kv/simple_kv.server.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/replication/storage_serverlet.h>

#include "simple_kv.code.definition.h"
#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h>
#include "simple_kv_types.h"

namespace dsn {
Expand All @@ -51,7 +51,11 @@ class simple_kv_service : public replication_app_base, public storage_serverlet<
simple_kv_service(replica *r) : replication_app_base(r) {}
virtual ~simple_kv_service() {}

virtual int on_request(dsn::message_ex *request) override { return handle_request(request); }
virtual int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}

protected:
// all service handlers to be implemented further
// RPC_SIMPLE_KV_SIMPLE_KV_READ
Expand Down
2 changes: 1 addition & 1 deletion src/rdsn/src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class mock_replication_app_base : public replication_app_base
utils::filesystem::create_file(fmt::format("{}/checkpoint.file", checkpoint_dir));
return ERR_OK;
}
int on_request(message_ex *request) override { return 0; }
int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 0; }
std::string query_compact_state() const { return ""; };

// we mock the followings
Expand Down
5 changes: 4 additions & 1 deletion src/server/pegasus_read_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ class pegasus_read_service : public dsn::replication::replication_app_base,
{
}

int on_request(dsn::message_ex *request) override { return handle_request(request); }
int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
{
return handle_request(request);
}

protected:
// all service handlers to be implemented further
Expand Down
10 changes: 10 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ DSN_DEFINE_int32("pegasus.server",
hotkey_analyse_time_interval_s,
10,
"hotkey analyse interval in seconds");
DSN_DEFINE_int32("pegasus.server",
inject_read_error_for_test,
0,
"Which error code to inject in read path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_read_error_for_test, FT_MUTABLE);

static std::string chkpt_get_dir_name(int64_t decree)
{
Expand Down Expand Up @@ -283,6 +288,11 @@ void pegasus_server_impl::on_get(get_rpc rpc)
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;

if (dsn_unlikely(FLAGS_inject_read_error_for_test != rocksdb::Status::kOk)) {
resp.error = FLAGS_inject_read_error_for_test;
return;
}

if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
Expand Down

0 comments on commit 072c022

Please sign in to comment.