From c43d5b435d3bbe0aef67fdf6c7f6c71af8814c39 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 13 Jan 2021 11:19:35 +0800 Subject: [PATCH 1/9] refactor: reimplement pegasus_server_write::on_batched_write_requests --- rdsn | 2 +- src/server/pegasus_server_write.cpp | 124 ++++++++++++++++++---------- src/server/pegasus_server_write.h | 12 +++ 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/rdsn b/rdsn index ec34633e46..ab9520df9b 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit ec34633e4695e881209973460469b2b069c0f52e +Subproject commit ab9520df9bb47be2eb7d5720098ff463cd83ec36 diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index f2544bbc8b..ea54fe846f 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -50,44 +50,13 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return _write_svc->empty_put(_decree); } - dsn::task_code rpc_code(requests[0]->rpc_code()); - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { - dassert(count == 1, "count = %d", count); - auto rpc = multi_put_rpc::auto_reply(requests[0]); - return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); + auto iter = _single_put_methods.find(requests[0]->rpc_code()); + if (iter != _single_put_methods.end()) { + dassert_f(count == 1, "count = {}", count); + return iter->second(this, requests[0]); + } else { + return on_batched_writes(requests, count); } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { - dassert(count == 1, "count = %d", count); - auto rpc = multi_remove_rpc::auto_reply(requests[0]); - return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) { - dassert(count == 1, "count = %d", count); - auto rpc = incr_rpc::auto_reply(requests[0]); - return _write_svc->incr(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dassert(count == 1, "count = %d", count); - auto rpc = duplicate_rpc::auto_reply(requests[0]); - return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { - dassert(count == 1, "count = %d", count); - auto rpc = check_and_set_rpc::auto_reply(requests[0]); - return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { - dassert(count == 1, "count = %d", count); - auto rpc = check_and_mutate_rpc::auto_reply(requests[0]); - return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); - } - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { - dassert(count == 1, "count = %d", count); - auto rpc = ingestion_rpc::auto_reply(requests[0]); - return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); - } - - return on_batched_writes(requests, count); } void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_default_ttl(ttl); } @@ -116,13 +85,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun local_err = on_single_remove_in_batch(rpc); _remove_rpc_batch.emplace_back(std::move(rpc)); } else { - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT || - rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || - rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR || - rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dfatal("rpc code not allow batch: %s", rpc_code.to_string()); + if (_single_put_methods.find(rpc_code) != _single_put_methods.end()) { + dfatal_f("rpc code not allow batch: {}", rpc_code.to_string()); } else { - dfatal("rpc code not handled: %s", rpc_code.to_string()); + dfatal_f("rpc code not handled: {}", rpc_code.to_string()); } } @@ -170,5 +136,77 @@ void pegasus_server_write::request_key_check(int64_t decree, } } +pegasus_server_write::single_put_rpc_map pegasus_server_write::_single_put_methods = { + {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->multi_put(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->multi_remove(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_INCR, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->incr(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->duplicate(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->check_and_set(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->check_and_mutate(request); + }}, + {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, + [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { + return server_write->ingestion_files(request); + }}, +}; + +int pegasus_server_write::multi_put(dsn::message_ex *request) +{ + auto rpc = multi_put_rpc::auto_reply(request); + return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); +} + +int pegasus_server_write::multi_remove(dsn::message_ex *request) +{ + auto rpc = multi_remove_rpc::auto_reply(request); + return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); +} + +int pegasus_server_write::incr(dsn::message_ex *request) +{ + auto rpc = incr_rpc::auto_reply(request); + return _write_svc->incr(_decree, rpc.request(), rpc.response()); +} + +int pegasus_server_write::duplicate(dsn::message_ex *request) +{ + auto rpc = duplicate_rpc::auto_reply(request); + return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); +} + +int pegasus_server_write::check_and_set(dsn::message_ex *request) +{ + auto rpc = check_and_set_rpc::auto_reply(request); + return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); +} + +int pegasus_server_write::check_and_mutate(dsn::message_ex *request) +{ + auto rpc = check_and_mutate_rpc::auto_reply(request); + return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); +} + +int pegasus_server_write::ingestion_files(dsn::message_ex *request) +{ + auto rpc = ingestion_rpc::auto_reply(request); + return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); +} } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 33386cfd88..be4c975381 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -71,6 +71,14 @@ class pegasus_server_write : public dsn::replication::replica_base void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key); private: + int multi_put(dsn::message_ex *request); + int multi_remove(dsn::message_ex *request); + int incr(dsn::message_ex *request); + int duplicate(dsn::message_ex *request); + int check_and_set(dsn::message_ex *request); + int check_and_mutate(dsn::message_ex *request); + int ingestion_files(dsn::message_ex *request); + friend class pegasus_server_write_test; friend class pegasus_write_service_test; friend class pegasus_write_service_impl_test; @@ -84,6 +92,10 @@ class pegasus_server_write : public dsn::replication::replica_base int64_t _decree; const bool _verbose_log; + + typedef std::map> + single_put_rpc_map; + static single_put_rpc_map _single_put_methods; }; } // namespace server From 8d57d190d6364308a76b5f33b0dd996cf4277dbe Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 13 Jan 2021 11:22:12 +0800 Subject: [PATCH 2/9] fix --- src/server/pegasus_server_write.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index be4c975381..e41bbe43cf 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -71,6 +71,8 @@ class pegasus_server_write : public dsn::replication::replica_base void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key); private: + /// TODO(zlw): using member functions of pegasus_write_service to refactor these functions + /// below, in order to remove class pegasus_write_service in the later int multi_put(dsn::message_ex *request); int multi_remove(dsn::message_ex *request); int incr(dsn::message_ex *request); From ea9f3304936ca5e3644f5d8a92c87debd736adad Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 13 Jan 2021 15:32:43 +0800 Subject: [PATCH 3/9] fix --- src/server/pegasus_server_write.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index ea54fe846f..6c92328d12 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -52,7 +52,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, auto iter = _single_put_methods.find(requests[0]->rpc_code()); if (iter != _single_put_methods.end()) { - dassert_f(count == 1, "count = {}", count); + dcheck_eq(count, 1); return iter->second(this, requests[0]); } else { return on_batched_writes(requests, count); From b8f07fb7ee7fc075c0a6de4773aa78ac4f159702 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Fri, 15 Jan 2021 15:10:10 +0800 Subject: [PATCH 4/9] Update src/server/pegasus_server_write.cpp Co-authored-by: Wu Tao --- src/server/pegasus_server_write.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 6c92328d12..3b2496c86b 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -54,9 +54,8 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, if (iter != _single_put_methods.end()) { dcheck_eq(count, 1); return iter->second(this, requests[0]); - } else { - return on_batched_writes(requests, count); } + return on_batched_writes(requests, count); } void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_default_ttl(ttl); } From bd0d5633e4d7f80feda7000780116c0d88a941e1 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 15 Jan 2021 15:36:51 +0800 Subject: [PATCH 5/9] fix --- src/server/pegasus_server_write.cpp | 58 ++++++++++++----------------- src/server/pegasus_server_write.h | 6 +-- 2 files changed, 27 insertions(+), 37 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 6c92328d12..44ea425193 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -33,6 +33,7 @@ namespace server { pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log) : replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log) { + init_non_batch_write_handler(); } int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, @@ -50,10 +51,10 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return _write_svc->empty_put(_decree); } - auto iter = _single_put_methods.find(requests[0]->rpc_code()); - if (iter != _single_put_methods.end()) { + auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code()); + if (iter != _non_batch_write_handlers.end()) { dcheck_eq(count, 1); - return iter->second(this, requests[0]); + return iter->second(requests[0]); } else { return on_batched_writes(requests, count); } @@ -85,7 +86,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun local_err = on_single_remove_in_batch(rpc); _remove_rpc_batch.emplace_back(std::move(rpc)); } else { - if (_single_put_methods.find(rpc_code) != _single_put_methods.end()) { + if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) { dfatal_f("rpc code not allow batch: {}", rpc_code.to_string()); } else { dfatal_f("rpc code not handled: {}", rpc_code.to_string()); @@ -136,36 +137,25 @@ void pegasus_server_write::request_key_check(int64_t decree, } } -pegasus_server_write::single_put_rpc_map pegasus_server_write::_single_put_methods = { - {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->multi_put(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->multi_remove(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_INCR, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->incr(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->duplicate(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->check_and_set(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->check_and_mutate(request); - }}, - {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, - [](pegasus_server_write *server_write, dsn::message_ex *request) -> int { - return server_write->ingestion_files(request); - }}, -}; +void pegasus_server_write::init_non_batch_write_handler() +{ + _non_batch_write_handlers = { + {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, + [this](dsn::message_ex *request) -> int { return multi_put(request); }}, + {dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, + [this](dsn::message_ex *request) -> int { return multi_remove(request); }}, + {dsn::apps::RPC_RRDB_RRDB_INCR, + [this](dsn::message_ex *request) -> int { return incr(request); }}, + {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + [this](dsn::message_ex *request) -> int { return duplicate(request); }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + [this](dsn::message_ex *request) -> int { return check_and_set(request); }}, + {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE, + [this](dsn::message_ex *request) -> int { return check_and_mutate(request); }}, + {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, + [this](dsn::message_ex *request) -> int { return ingestion_files(request); }}, + }; +} int pegasus_server_write::multi_put(dsn::message_ex *request) { diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index e41bbe43cf..3613b365f4 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -80,6 +80,7 @@ class pegasus_server_write : public dsn::replication::replica_base int check_and_set(dsn::message_ex *request); int check_and_mutate(dsn::message_ex *request); int ingestion_files(dsn::message_ex *request); + void init_non_batch_write_handler(); friend class pegasus_server_write_test; friend class pegasus_write_service_test; @@ -95,9 +96,8 @@ class pegasus_server_write : public dsn::replication::replica_base const bool _verbose_log; - typedef std::map> - single_put_rpc_map; - static single_put_rpc_map _single_put_methods; + typedef std::map> non_batch_writes_map; + non_batch_writes_map _non_batch_write_handlers; }; } // namespace server From 5df840a4bbf646190a8bd13725eb57b9983a6470 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 15 Jan 2021 15:42:13 +0800 Subject: [PATCH 6/9] fix --- src/server/pegasus_server_write.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 3613b365f4..b59f9bf687 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -71,8 +71,6 @@ class pegasus_server_write : public dsn::replication::replica_base void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key); private: - /// TODO(zlw): using member functions of pegasus_write_service to refactor these functions - /// below, in order to remove class pegasus_write_service in the later int multi_put(dsn::message_ex *request); int multi_remove(dsn::message_ex *request); int incr(dsn::message_ex *request); From 30c31c7f3269e938d695752c67ae2ca7ebf855a9 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 15 Jan 2021 15:45:56 +0800 Subject: [PATCH 7/9] fix --- src/server/pegasus_server_write.cpp | 4 ++-- src/server/pegasus_server_write.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 7ab5a23f86..30274eb100 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -33,7 +33,7 @@ namespace server { pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log) : replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log) { - init_non_batch_write_handler(); + init_non_batch_write_handlers(); } int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, @@ -136,7 +136,7 @@ void pegasus_server_write::request_key_check(int64_t decree, } } -void pegasus_server_write::init_non_batch_write_handler() +void pegasus_server_write::init_non_batch_write_handlers() { _non_batch_write_handlers = { {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index b59f9bf687..d7cd5d0bd3 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -78,7 +78,7 @@ class pegasus_server_write : public dsn::replication::replica_base int check_and_set(dsn::message_ex *request); int check_and_mutate(dsn::message_ex *request); int ingestion_files(dsn::message_ex *request); - void init_non_batch_write_handler(); + void init_non_batch_write_handlers(); friend class pegasus_server_write_test; friend class pegasus_write_service_test; From 7856fcc205f67986e3f7b0e7d70a17914f5f2e06 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 15 Jan 2021 18:38:09 +0800 Subject: [PATCH 8/9] fix --- src/server/pegasus_server_write.cpp | 77 +++++++++++------------------ src/server/pegasus_server_write.h | 7 --- 2 files changed, 28 insertions(+), 56 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 30274eb100..b42ee9ae7b 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -140,62 +140,41 @@ void pegasus_server_write::init_non_batch_write_handlers() { _non_batch_write_handlers = { {dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, - [this](dsn::message_ex *request) -> int { return multi_put(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = multi_put_rpc::auto_reply(request); + return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, - [this](dsn::message_ex *request) -> int { return multi_remove(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = multi_remove_rpc::auto_reply(request); + return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_INCR, - [this](dsn::message_ex *request) -> int { return incr(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = incr_rpc::auto_reply(request); + return _write_svc->incr(_decree, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, - [this](dsn::message_ex *request) -> int { return duplicate(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = duplicate_rpc::auto_reply(request); + return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, - [this](dsn::message_ex *request) -> int { return check_and_set(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = check_and_set_rpc::auto_reply(request); + return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE, - [this](dsn::message_ex *request) -> int { return check_and_mutate(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = check_and_mutate_rpc::auto_reply(request); + return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); + }}, {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, - [this](dsn::message_ex *request) -> int { return ingestion_files(request); }}, + [this](dsn::message_ex *request) -> int { + auto rpc = ingestion_rpc::auto_reply(request); + return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); + }}, }; } - -int pegasus_server_write::multi_put(dsn::message_ex *request) -{ - auto rpc = multi_put_rpc::auto_reply(request); - return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); -} - -int pegasus_server_write::multi_remove(dsn::message_ex *request) -{ - auto rpc = multi_remove_rpc::auto_reply(request); - return _write_svc->multi_remove(_decree, rpc.request(), rpc.response()); -} - -int pegasus_server_write::incr(dsn::message_ex *request) -{ - auto rpc = incr_rpc::auto_reply(request); - return _write_svc->incr(_decree, rpc.request(), rpc.response()); -} - -int pegasus_server_write::duplicate(dsn::message_ex *request) -{ - auto rpc = duplicate_rpc::auto_reply(request); - return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); -} - -int pegasus_server_write::check_and_set(dsn::message_ex *request) -{ - auto rpc = check_and_set_rpc::auto_reply(request); - return _write_svc->check_and_set(_decree, rpc.request(), rpc.response()); -} - -int pegasus_server_write::check_and_mutate(dsn::message_ex *request) -{ - auto rpc = check_and_mutate_rpc::auto_reply(request); - return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response()); -} - -int pegasus_server_write::ingestion_files(dsn::message_ex *request) -{ - auto rpc = ingestion_rpc::auto_reply(request); - return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response()); -} } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index d7cd5d0bd3..c75d469a66 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -71,13 +71,6 @@ class pegasus_server_write : public dsn::replication::replica_base void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key); private: - int multi_put(dsn::message_ex *request); - int multi_remove(dsn::message_ex *request); - int incr(dsn::message_ex *request); - int duplicate(dsn::message_ex *request); - int check_and_set(dsn::message_ex *request); - int check_and_mutate(dsn::message_ex *request); - int ingestion_files(dsn::message_ex *request); void init_non_batch_write_handlers(); friend class pegasus_server_write_test; From 4bb671571c7908800949b3bcc7d145a979c7a62d Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 15 Jan 2021 18:39:44 +0800 Subject: [PATCH 9/9] fix --- src/server/pegasus_server_write.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index b42ee9ae7b..9f4fc4aaff 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -53,7 +53,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code()); if (iter != _non_batch_write_handlers.end()) { - dcheck_eq(count, 1); + dassert_f(count == 1, "count = {}", count); return iter->second(requests[0]); } return on_batched_writes(requests, count);