From fa62e0d8e7070924c2f54da7d493ac7a7133b3e3 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Mon, 18 Jan 2021 18:24:00 +0800 Subject: [PATCH] refactor: reimplement pegasus_server_write::on_batched_write_requests (#680) --- rdsn | 2 +- src/server/pegasus_server_write.cpp | 90 +++++++++++++++-------------- src/server/pegasus_server_write.h | 5 ++ 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/rdsn b/rdsn index ec34633e46..ab9520df9b 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit ec34633e4695e881209973460469b2b069c0f52e +Subproject commit ab9520df9bb47be2eb7d5720098ff463cd83ec36 diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index f2544bbc8b..9f4fc4aaff 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -33,6 +33,7 @@ namespace server { pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log) : replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log) { + init_non_batch_write_handlers(); } int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, @@ -50,43 +51,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return _write_svc->empty_put(_decree); } - dsn::task_code rpc_code(requests[0]->rpc_code()); - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { - dassert(count == 1, "count = %d", count); - auto rpc = multi_put_rpc::auto_reply(requests[0]); - return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); + auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code()); + if (iter != _non_batch_write_handlers.end()) { + dassert_f(count == 1, "count = {}", count); + return iter->second(requests[0]); } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { - dassert(count == 1, "count = %d", count); - auto rpc = multi_remove_rpc::auto_reply(requests[0]); - return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) { - dassert(count == 1, "count = %d", count); - auto rpc = incr_rpc::auto_reply(requests[0]); - return _write_svc->incr(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dassert(count == 1, "count = %d", count); - auto rpc = duplicate_rpc::auto_reply(requests[0]); - return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { - dassert(count == 1, "count = %d", count); - auto rpc = check_and_set_rpc::auto_reply(requests[0]); - return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - dassert(count == 1, "count = %d", count); - auto rpc = check_and_mutate_rpc::auto_reply(requests[0]); - return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { - dassert(count == 1, "count = %d", count); - auto rpc = ingestion_rpc::auto_reply(requests[0]); - return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); - } - return on_batched_writes(requests, count); } @@ -116,13 +85,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun local_err = on_single_remove_in_batch(rpc); _remove_rpc_batch.emplace_back(std::move(rpc)); } else { - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT || - rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || - rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR || - rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dfatal("rpc code not allow batch: %s", rpc_code.to_string()); + if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) { + dfatal_f("rpc code not allow batch: {}", rpc_code.to_string()); } else { - dfatal("rpc code not handled: %s", rpc_code.to_string()); + dfatal_f("rpc code not handled: {}", rpc_code.to_string()); } } @@ -170,5 +136,45 @@ void pegasus_server_write::request_key_check(int64_t decree, } } +void pegasus_server_write::init_non_batch_write_handlers() +{ + _non_batch_write_handlers = { + {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, + [this](dsn::message_ex *request) -> int { + auto rpc = multi_put_rpc::auto_reply(request); + return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, + [this](dsn::message_ex *request) -> int { + auto rpc = multi_remove_rpc::auto_reply(request); + return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_INCR, + [this](dsn::message_ex *request) -> int { + auto rpc = incr_rpc::auto_reply(request); + return _write_svc->incr(_decree, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + [this](dsn::message_ex *request) -> int { + auto rpc = duplicate_rpc::auto_reply(request); + return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + [this](dsn::message_ex *request) -> int { + auto rpc = check_and_set_rpc::auto_reply(request); + return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE, + [this](dsn::message_ex *request) -> int { + auto rpc = check_and_mutate_rpc::auto_reply(request); + return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); + }}, + {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, + [this](dsn::message_ex *request) -> int { + auto rpc = ingestion_rpc::auto_reply(request); + return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); + }}, + }; +} } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 33386cfd88..c75d469a66 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -71,6 +71,8 @@ class pegasus_server_write : public dsn::replication::replica_base void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key); private: + void init_non_batch_write_handlers(); + friend class pegasus_server_write_test; friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; @@ -84,6 +86,9 @@ class pegasus_server_write : public dsn::replication::replica_base int64_t _decree; const bool _verbose_log; + + typedef std::map> non_batch_writes_map; + non_batch_writes_map _non_batch_write_handlers; }; } // namespace server