Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: catch exception if unmarshall the rpc request encounters error #790

Merged
merged 14 commits into from
Jul 28, 2021
65 changes: 44 additions & 21 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
char name[256];
snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string());
_pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent corrupt write count");

init_non_batch_write_handlers();
}

Expand All @@ -51,11 +58,20 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return _write_svc->empty_put(_decree);
}

auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
try {
Smityz marked this conversation as resolved.
Show resolved Hide resolved
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
return iter->second(requests[0]);
}
} catch (TTransportException ex) {
_pfc_recent_corrupt_write_count->increment();
derror_replica("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
return 0;
}

return on_batched_writes(requests, count);
}

Expand All @@ -73,34 +89,41 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.

int local_err = 0;
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
try {
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
if (_non_batch_write_handlers.find(rpc_code) !=
_non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}
} catch (TTransportException ex) {
_pfc_recent_corrupt_write_count->increment();
derror_replica("pegasus batch writes handler failed, from = {}, exception = {}",
requests[i]->header->from_address.to_string(),
ex.what());
}

if (!err && local_err) {
err = local_err;
}
}

if (err == 0) {
err = _write_svc->batch_commit(_decree);
if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
_write_svc->batch_abort(_decree, err == 0 ? -1 : err);
} else {
_write_svc->batch_abort(_decree, err);
err = _write_svc->batch_commit(_decree);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class pegasus_server_write : public dsn::replication::replica_base

typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;

::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
};

} // namespace server
Expand Down