Skip to content

Commit

Permalink
fix: catch exception if unmarshall the rpc request encounters error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Jul 28, 2021
1 parent fa5bfbb commit 8039277
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
65 changes: 44 additions & 21 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
char name[256];
snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string());
_pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent corrupt write count");

init_non_batch_write_handlers();
}

Expand All @@ -51,11 +58,20 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}

auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
try {
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
}
} catch (TTransportException ex) {
_pfc_recent_corrupt_write_count->increment();
derror_replica("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
return 0;
}

return on_batched_writes(requests, count);
}

Expand All @@ -73,34 +89,41 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.

int local_err = 0;
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
try {
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
if (_non_batch_write_handlers.find(rpc_code) !=
_non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}
} catch (TTransportException ex) {
_pfc_recent_corrupt_write_count->increment();
derror_replica("pegasus batch writes handler failed, from = {}, exception = {}",
requests[i]->header->from_address.to_string(),
ex.what());
}

if (!err && local_err) {
err = local_err;
}
}

if (err == 0) {
err = _write_svc->batch_commit(_decree);
if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
_write_svc->batch_abort(_decree, err == 0 ? -1 : err);
} else {
_write_svc->batch_abort(_decree, err);
err = _write_svc->batch_commit(_decree);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class pegasus_server_write : public dsn::replication::replica_base

typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;

::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
};

} // namespace server
Expand Down

0 comments on commit 8039277

Please sign in to comment.