diff --git a/src/rdsn/include/dsn/dist/replication/replication_app_base.h b/src/rdsn/include/dsn/dist/replication/replication_app_base.h index 65835b9311..55d307efdb 100644 --- a/src/rdsn/include/dsn/dist/replication/replication_app_base.h +++ b/src/rdsn/include/dsn/dist/replication/replication_app_base.h @@ -211,7 +211,7 @@ class replication_app_base : public replica_base // virtual replication::decree last_durable_decree() const = 0; // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. - virtual int on_request(message_ex *request) = 0; + virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0; // // Parameters: diff --git a/src/rdsn/src/replica/storage/simple_kv/simple_kv.code.definition.h b/src/rdsn/include/dsn/dist/replication/simple_kv/simple_kv.code.definition.h similarity index 100% rename from src/rdsn/src/replica/storage/simple_kv/simple_kv.code.definition.h rename to src/rdsn/include/dsn/dist/replication/simple_kv/simple_kv.code.definition.h diff --git a/src/rdsn/include/dsn/dist/replication/storage_serverlet.h b/src/rdsn/include/dsn/dist/replication/storage_serverlet.h index 98e140b689..384a88330b 100644 --- a/src/rdsn/include/dsn/dist/replication/storage_serverlet.h +++ b/src/rdsn/include/dsn/dist/replication/storage_serverlet.h @@ -30,7 +30,10 @@ #include <unordered_map> #include <functional> +#include <dsn/dist/fmt_logging.h> +#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h> #include <dsn/service_api_cpp.h> +#include <rrdb/rrdb.code.definition.h> namespace dsn { namespace replication { @@ -38,7 +41,7 @@ template <typename T> class storage_serverlet { protected: - typedef std::function<void(T *, dsn::message_ex *req)> rpc_handler; + typedef std::function<int(T *, dsn::message_ex *req)> rpc_handler; static std::unordered_map<std::string, rpc_handler> s_handlers; static std::vector<rpc_handler> s_vhandlers; @@ -48,11 +51,19 @@ class storage_serverlet const char *name, void (*handler)(T *svc, const TReq &req, rpc_replier<TResp> &resp)) { + // Only allowed to register simple.kv rpc handler. + dassert_f(dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_READ == rpc_code || + dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_WRITE == rpc_code || + dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_APPEND == rpc_code, + "Not allowed to register with rpc_code {}", + rpc_code); rpc_handler h = [handler](T *p, dsn::message_ex *r) { TReq req; ::dsn::unmarshall(r, req); rpc_replier<TResp> replier(r->create_response()); handler(p, req, replier); + // For simple.kv, always return 0 which means success. + return 0; }; return register_async_rpc_handler(rpc_code, name, h); @@ -64,7 +75,9 @@ class storage_serverlet void (*handler)(T *svc, TRpcHolder)) { rpc_handler h = [handler](T *p, dsn::message_ex *request) { - handler(p, TRpcHolder::auto_reply(request)); + auto rh = TRpcHolder::auto_reply(request); + handler(p, rh); + return rh.response().error; }; return register_async_rpc_handler(rpc_code, name, h); @@ -75,10 +88,14 @@ class storage_serverlet const char *name, void (*handler)(T *svc, const TReq &req)) { + // Only allowed to register RPC_RRDB_RRDB_CLEAR_SCANNER handler. + dcheck_eq(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER, rpc_code); rpc_handler h = [handler](T *p, dsn::message_ex *r) { TReq req; ::dsn::unmarshall(r, req); handler(p, req); + // For RPC_RRDB_RRDB_CLEAR_SCANNER, always return 0 which means success. + return 0; }; return register_async_rpc_handler(rpc_code, name, h); @@ -102,11 +119,15 @@ class storage_serverlet static const rpc_handler *find_handler(dsn::task_code rpc_code) { - if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr) + if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr) { return &s_vhandlers[rpc_code]; + } + auto iter = s_handlers.find(rpc_code.to_string()); - if (iter != s_handlers.end()) + if (iter != s_handlers.end()) { return &(iter->second); + } + return nullptr; } @@ -116,16 +137,16 @@ class storage_serverlet dsn::task_code t = request->rpc_code(); const rpc_handler *ptr = find_handler(t); if (ptr != nullptr) { - // TODO(yingchun): add return value - (*ptr)(static_cast<T *>(this), request); + return (*ptr)(static_cast<T *>(this), request); } else { dwarn("recv message with unhandled rpc name %s from %s, trace_id = %016" PRIx64, t.to_string(), request->header->from_address.to_string(), request->header->trace_id); dsn_rpc_reply(request->create_response(), ::dsn::ERR_HANDLER_NOT_FOUND); + // TODO(yingchun): return a non-zero value + return 0; } - return 0; } }; diff --git a/src/rdsn/src/replica/replica.cpp b/src/rdsn/src/replica/replica.cpp index 3f215b2f67..17d9ddf808 100644 --- a/src/rdsn/src/replica/replica.cpp +++ b/src/rdsn/src/replica/replica.cpp @@ -47,6 +47,7 @@ #include <dsn/utility/string_conv.h> #include <dsn/utility/strings.h> #include <dsn/tool-api/rpc_message.h> +#include <rocksdb/status.h> namespace dsn { namespace replication { @@ -257,8 +258,19 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) uint64_t start_time_ns = dsn_now_ns(); dassert(_app != nullptr, ""); - // TODO(yingchun): check the return value. - _app->on_request(request); + auto storage_error = _app->on_request(request); + if (dsn_unlikely(storage_error != ERR_OK)) { + switch (storage_error) { + // TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage + // engine errors. + case rocksdb::Status::kCorruption: + handle_local_failure(ERR_RDB_CORRUPTION); + break; + default: + derror_replica("client read encountered an unhandled error: {}", storage_error); + } + return; + } // If the corresponding perf counter exist, count the duration of this operation. // rpc code of request is already checked in message_ex::rpc_code, so it will always be legal diff --git a/src/rdsn/src/replica/storage/simple_kv/simple_kv.client.h b/src/rdsn/src/replica/storage/simple_kv/simple_kv.client.h index ff951c0b32..3c39db2383 100644 --- a/src/rdsn/src/replica/storage/simple_kv/simple_kv.client.h +++ b/src/rdsn/src/replica/storage/simple_kv/simple_kv.client.h @@ -29,7 +29,7 @@ #include <dsn/utility/optional.h> #include <dsn/tool-api/async_calls.h> #include <dsn/dist/replication/partition_resolver.h> -#include "simple_kv.code.definition.h" +#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h> #include "simple_kv_types.h" namespace dsn { diff --git a/src/rdsn/src/replica/storage/simple_kv/simple_kv.server.h b/src/rdsn/src/replica/storage/simple_kv/simple_kv.server.h index a994db29f2..70babb6cb8 100644 --- a/src/rdsn/src/replica/storage/simple_kv/simple_kv.server.h +++ b/src/rdsn/src/replica/storage/simple_kv/simple_kv.server.h @@ -39,7 +39,7 @@ #include <dsn/dist/replication/replication_app_base.h> #include <dsn/dist/replication/storage_serverlet.h> -#include "simple_kv.code.definition.h" +#include <dsn/dist/replication/simple_kv/simple_kv.code.definition.h> #include "simple_kv_types.h" namespace dsn { @@ -51,7 +51,11 @@ class simple_kv_service : public replication_app_base, public storage_serverlet< simple_kv_service(replica *r) : replication_app_base(r) {} virtual ~simple_kv_service() {} - virtual int on_request(dsn::message_ex *request) override { return handle_request(request); } + virtual int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT + { + return handle_request(request); + } + protected: // all service handlers to be implemented further // RPC_SIMPLE_KV_SIMPLE_KV_READ diff --git a/src/rdsn/src/replica/test/mock_utils.h b/src/rdsn/src/replica/test/mock_utils.h index 87eda8ba27..08d199ec78 100644 --- a/src/rdsn/src/replica/test/mock_utils.h +++ b/src/rdsn/src/replica/test/mock_utils.h @@ -74,7 +74,7 @@ class mock_replication_app_base : public replication_app_base utils::filesystem::create_file(fmt::format("{}/checkpoint.file", checkpoint_dir)); return ERR_OK; } - int on_request(message_ex *request) override { return 0; } + int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 0; } std::string query_compact_state() const { return ""; }; // we mock the followings diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h index c124ff56ff..767b11aa0d 100644 --- a/src/server/pegasus_read_service.h +++ b/src/server/pegasus_read_service.h @@ -43,7 +43,10 @@ class pegasus_read_service : public dsn::replication::replication_app_base, { } - int on_request(dsn::message_ex *request) override { return handle_request(request); } + int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT + { + return handle_request(request); + } protected: // all service handlers to be implemented further diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 944f317173..d90412dc5e 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -55,6 +55,11 @@ DSN_DEFINE_int32("pegasus.server", hotkey_analyse_time_interval_s, 10, "hotkey analyse interval in seconds"); +DSN_DEFINE_int32("pegasus.server", + inject_read_error_for_test, + 0, + "Which error code to inject in read path, 0 means no error. Only for test."); +DSN_TAG_VARIABLE(inject_read_error_for_test, FT_MUTABLE); static std::string chkpt_get_dir_name(int64_t decree) { @@ -283,6 +288,11 @@ void pegasus_server_impl::on_get(get_rpc rpc) resp.partition_index = _gpid.get_partition_index(); resp.server = _primary_address; + if (dsn_unlikely(FLAGS_inject_read_error_for_test != rocksdb::Status::kOk)) { + resp.error = FLAGS_inject_read_error_for_test; + return; + } + if (!_read_size_throttling_controller->available()) { rpc.error() = dsn::ERR_BUSY; _counter_recent_read_throttling_reject_count->increment();