From 061a1dc79dd1a9866dcb8e8e19327516bf2b3b79 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 15 Jul 2021 14:49:01 +0800 Subject: [PATCH 01/10] fix --- src/server/pegasus_server_write.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 9f4fc4aaff..0a1ac404cb 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -90,6 +90,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } else { dfatal_f("rpc code not handled: {}", rpc_code.to_string()); } + local_err = -1; } if (!err && local_err) { From fbaaf3edc852e5362c294ca3ff7a6e1199282b28 Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 15 Jul 2021 15:55:19 +0800 Subject: [PATCH 02/10] fix --- src/server/pegasus_read_service.h | 8 ++++- src/server/pegasus_server_write.cpp | 51 +++++++++++++++++++---------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h index 67b6a8027f..8c8e6d1d91 100644 --- a/src/server/pegasus_read_service.h +++ b/src/server/pegasus_read_service.h @@ -43,7 +43,13 @@ class pegasus_read_service : public dsn::replication::replication_app_base, virtual ~pegasus_read_service() {} virtual int on_request(dsn::message_ex *request) override { - handle_request(request); + try { + handle_request(request); + } catch (TTransportException ex) { + derror_f("pegasus handle_request failed, from = {}, exception = {}", + request->header->from_address.to_string(), + ex.what()); + } return 0; } diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 0a1ac404cb..b219da74a6 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -51,11 +51,19 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return _write_svc->empty_put(_decree); } - auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code()); - if (iter != _non_batch_write_handlers.end()) { - dassert_f(count == 1, "count = {}", count); - return iter->second(requests[0]); + try { + auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code()); + if (iter != _non_batch_write_handlers.end()) { + dassert_f(count == 1, "count = {}", count); + return iter->second(requests[0]); + } + } catch (TTransportException ex) { + derror_f("pegasus on_batched_write_requests failed, from = {}, exception = {}", + requests[0]->header->from_address.to_string(), + ex.what()); + return -1; } + return on_batched_writes(requests, count); } @@ -73,23 +81,30 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun // Make sure all writes are batched even if they are failed, // since we need to record the total qps and rpc latencies, // and respond for all RPCs regardless of their result. - int local_err = 0; - dsn::task_code rpc_code(requests[i]->rpc_code()); - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) { - auto rpc = put_rpc::auto_reply(requests[i]); - local_err = on_single_put_in_batch(rpc); - _put_rpc_batch.emplace_back(std::move(rpc)); - } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { - auto rpc = remove_rpc::auto_reply(requests[i]); - local_err = on_single_remove_in_batch(rpc); - _remove_rpc_batch.emplace_back(std::move(rpc)); - } else { - if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) { - dfatal_f("rpc code not allow batch: {}", rpc_code.to_string()); + try { + dsn::task_code rpc_code(requests[i]->rpc_code()); + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) { + auto rpc = put_rpc::auto_reply(requests[i]); + local_err = on_single_put_in_batch(rpc); + _put_rpc_batch.emplace_back(std::move(rpc)); + } else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + auto rpc = remove_rpc::auto_reply(requests[i]); + local_err = on_single_remove_in_batch(rpc); + _remove_rpc_batch.emplace_back(std::move(rpc)); } else { - dfatal_f("rpc code not handled: {}", rpc_code.to_string()); + if (_non_batch_write_handlers.find(rpc_code) != + _non_batch_write_handlers.end()) { + dfatal_f("rpc code not allow batch: {}", rpc_code.to_string()); + } else { + dfatal_f("rpc code not handled: {}", rpc_code.to_string()); + } + local_err = -1; } + } catch (TTransportException ex) { + derror_f("pegasus on_batched_write_requests failed, from = {}, exception = {}", + requests[i]->header->from_address.to_string(), + ex.what()); local_err = -1; } From 285bf3370f1ee49516197551108e4542118dde4e Mon Sep 17 00:00:00 2001 From: levy Date: Thu, 15 Jul 2021 18:48:08 +0800 Subject: [PATCH 03/10] fix --- rdsn | 2 +- src/server/pegasus_server_write.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rdsn b/rdsn index 1275b1bfe8..0f45921d1f 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 1275b1bfe886289900f29a7091f6697257a11a6f +Subproject commit 0f45921d1f8382aa90869955d5728a4c3c4df6d7 diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index b219da74a6..a409b2aa10 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -58,7 +58,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return iter->second(requests[0]); } } catch (TTransportException ex) { - derror_f("pegasus on_batched_write_requests failed, from = {}, exception = {}", + derror_f("pegasus not batch write handler failed, from = {}, exception = {}", requests[0]->header->from_address.to_string(), ex.what()); return -1; @@ -102,7 +102,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun local_err = -1; } } catch (TTransportException ex) { - derror_f("pegasus on_batched_write_requests failed, from = {}, exception = {}", + derror_f("pegasus batch writes handler failed, from = {}, exception = {}", requests[i]->header->from_address.to_string(), ex.what()); local_err = -1; From bf4665792c7ea52987ce1b07374d5fa4f78bb3e2 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 20 Jul 2021 17:30:28 +0800 Subject: [PATCH 04/10] fix --- src/server/pegasus_server_write.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index a409b2aa10..5ecdd18907 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -61,7 +61,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, derror_f("pegasus not batch write handler failed, from = {}, exception = {}", requests[0]->header->from_address.to_string(), ex.what()); - return -1; + return 0; } return on_batched_writes(requests, count); @@ -99,13 +99,11 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } else { dfatal_f("rpc code not handled: {}", rpc_code.to_string()); } - local_err = -1; } } catch (TTransportException ex) { derror_f("pegasus batch writes handler failed, from = {}, exception = {}", requests[i]->header->from_address.to_string(), ex.what()); - local_err = -1; } if (!err && local_err) { From 680d7eaf43e1b784f4131eb968fb071e972fc6ba Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 21 Jul 2021 17:11:04 +0800 Subject: [PATCH 05/10] fix --- src/server/pegasus_server_write.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 5ecdd18907..f41ca79f02 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -111,10 +111,12 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } } - if (err == 0) { - err = _write_svc->batch_commit(_decree); - } else { - _write_svc->batch_abort(_decree, err); + if (dsn_likely(_put_rpc_batch.size() + _remove_rpc_batch.size() != 0)) { + if (err == 0) { + err = _write_svc->batch_commit(_decree); + } else { + _write_svc->batch_abort(_decree, err); + } } } From fcdbe50598821c7dc259c58492ed198bbfeb6a21 Mon Sep 17 00:00:00 2001 From: levy Date: Wed, 21 Jul 2021 18:28:58 +0800 Subject: [PATCH 06/10] fix --- src/server/config.min.ini | 2 +- src/server/pegasus_read_service.h | 8 +------- src/server/pegasus_server_write.cpp | 17 +++++++++++------ src/server/pegasus_server_write.h | 2 ++ src/server/pegasus_write_service.h | 1 - 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/server/config.min.ini b/src/server/config.min.ini index d7bd9085aa..4cfeeb69b2 100644 --- a/src/server/config.min.ini +++ b/src/server/config.min.ini @@ -126,7 +126,7 @@ partition_count = 4 [pegasus.server] - perf_counter_enable_logging = false + perf_counter_enable_logging = true # Where the metrics are collected. If no value is given, no sink is used. # Options: # - falcon diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h index 8c8e6d1d91..67b6a8027f 100644 --- a/src/server/pegasus_read_service.h +++ b/src/server/pegasus_read_service.h @@ -43,13 +43,7 @@ class pegasus_read_service : public dsn::replication::replication_app_base, virtual ~pegasus_read_service() {} virtual int on_request(dsn::message_ex *request) override { - try { - handle_request(request); - } catch (TTransportException ex) { - derror_f("pegasus handle_request failed, from = {}, exception = {}", - request->header->from_address.to_string(), - ex.what()); - } + handle_request(request); return 0; } diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index f41ca79f02..65c2be323b 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -33,6 +33,11 @@ namespace server { pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log) : replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log) { + char name[256]; + snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string()); + _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus", + name, COUNTER_TYPE_VOLATILE_NUMBER, "statistic the recent corrupt write count"); + init_non_batch_write_handlers(); } @@ -58,6 +63,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return iter->second(requests[0]); } } catch (TTransportException ex) { + _pfc_recent_corrupt_write_count->increment(); derror_f("pegasus not batch write handler failed, from = {}, exception = {}", requests[0]->header->from_address.to_string(), ex.what()); @@ -101,6 +107,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } } } catch (TTransportException ex) { + _pfc_recent_corrupt_write_count->increment(); derror_f("pegasus batch writes handler failed, from = {}, exception = {}", requests[i]->header->from_address.to_string(), ex.what()); @@ -111,12 +118,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } } - if (dsn_likely(_put_rpc_batch.size() + _remove_rpc_batch.size() != 0)) { - if (err == 0) { - err = _write_svc->batch_commit(_decree); - } else { - _write_svc->batch_abort(_decree, err); - } + if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) { + _write_svc->batch_abort(_decree, err == 0 ? -1 : err); + } else { + err = _write_svc->batch_commit(_decree); } } diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index c75d469a66..c73fc0d50d 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -89,6 +89,8 @@ class pegasus_server_write : public dsn::replication::replica_base typedef std::map> non_batch_writes_map; non_batch_writes_map _non_batch_write_handlers; + + ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count; }; } // namespace server diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 6852a4cfdb..4bfda3f810 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -173,7 +173,6 @@ class pegasus_write_service : dsn::replication::replica_base void set_default_ttl(uint32_t ttl); -private: void clear_up_batch_states(); private: From 79a8dbaee438f315d68dcfdbd45ca4ebca773ac8 Mon Sep 17 00:00:00 2001 From: levy Date: Fri, 23 Jul 2021 10:14:07 +0800 Subject: [PATCH 07/10] fix --- rdsn | 2 +- src/server/config.min.ini | 2 +- src/server/pegasus_write_service.h | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rdsn b/rdsn index 0f45921d1f..18d953b9f9 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 0f45921d1f8382aa90869955d5728a4c3c4df6d7 +Subproject commit 18d953b9f9d306844fa82fb047e07b0c0b39b7c6 diff --git a/src/server/config.min.ini b/src/server/config.min.ini index 4cfeeb69b2..d7bd9085aa 100644 --- a/src/server/config.min.ini +++ b/src/server/config.min.ini @@ -126,7 +126,7 @@ partition_count = 4 [pegasus.server] - perf_counter_enable_logging = true + perf_counter_enable_logging = false # Where the metrics are collected. If no value is given, no sink is used. # Options: # - falcon diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 4bfda3f810..6852a4cfdb 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -173,6 +173,7 @@ class pegasus_write_service : dsn::replication::replica_base void set_default_ttl(uint32_t ttl); +private: void clear_up_batch_states(); private: From 615f345281f364903b433252b7e05d585d314bfd Mon Sep 17 00:00:00 2001 From: levy5307 Date: Sun, 25 Jul 2021 18:11:28 +0800 Subject: [PATCH 08/10] update rdsn --- rdsn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdsn b/rdsn index 18d953b9f9..7e3eab786f 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 18d953b9f9d306844fa82fb047e07b0c0b39b7c6 +Subproject commit 7e3eab786fb5e0675d84eac22d5508b03538d5f4 From 43fdfecee74ddccfd489b2ca991819744bcf9018 Mon Sep 17 00:00:00 2001 From: levy Date: Mon, 26 Jul 2021 11:54:12 +0800 Subject: [PATCH 09/10] fix --- src/server/pegasus_server_write.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 65c2be323b..efb89dd6b7 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -36,7 +36,9 @@ pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool ver char name[256]; snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string()); _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus", - name, COUNTER_TYPE_VOLATILE_NUMBER, "statistic the recent corrupt write count"); + name, + COUNTER_TYPE_VOLATILE_NUMBER, + "statistic the recent corrupt write count"); init_non_batch_write_handlers(); } From aa6fad43a3bc42a0092186bff51da01f3eabf99f Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 27 Jul 2021 12:01:26 +0800 Subject: [PATCH 10/10] update --- src/server/pegasus_server_write.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index efb89dd6b7..ef41b2b534 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -66,9 +66,9 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, } } catch (TTransportException ex) { _pfc_recent_corrupt_write_count->increment(); - derror_f("pegasus not batch write handler failed, from = {}, exception = {}", - requests[0]->header->from_address.to_string(), - ex.what()); + derror_replica("pegasus not batch write handler failed, from = {}, exception = {}", + requests[0]->header->from_address.to_string(), + ex.what()); return 0; } @@ -110,9 +110,9 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } } catch (TTransportException ex) { _pfc_recent_corrupt_write_count->increment(); - derror_f("pegasus batch writes handler failed, from = {}, exception = {}", - requests[i]->header->from_address.to_string(), - ex.what()); + derror_replica("pegasus batch writes handler failed, from = {}, exception = {}", + requests[i]->header->from_address.to_string(), + ex.what()); } if (!err && local_err) {