Skip to content

Commit

Permalink
refactor: return Status::code instead of meanless integer (#1417)
Browse files Browse the repository at this point in the history
#1383

To handle the return code of read and write requests, it would be great
to refactor the return code of the related functions.
This patch change to use rocksdb::Status::code insteadn of meanless integer,
and left some TODOs to be dealt in follow up patchs.
  • Loading branch information
acelyc111 authored Mar 27, 2023
1 parent 5761893 commit 58bad74
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 43 deletions.
2 changes: 2 additions & 0 deletions src/common/storage_serverlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ class storage_serverlet
return nullptr;
}

// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
int handle_request(dsn::message_ex *request)
{
dsn::task_code t = request->rpc_code();
const rpc_handler *ptr = find_handler(t);
if (ptr != nullptr) {
// TODO(yingchun): add return value
(*ptr)(static_cast<T *>(this), request);
} else {
LOG_WARNING("recv message with unhandled rpc name {} from {}, trace_id = {:#018x} ",
Expand Down
1 change: 1 addition & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)

uint64_t start_time_ns = dsn_now_ns();
CHECK(_app, "");
// TODO(yingchun): check the return value.
_app->on_request(request);

// If the corresponding perf counter exist, count the duration of this operation.
Expand Down
14 changes: 7 additions & 7 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <alloca.h>
#include <fcntl.h>
#include <rocksdb/status.h>
#include <algorithm>
#include <fstream>
#include <memory>
Expand Down Expand Up @@ -373,12 +374,11 @@ int replication_app_base::on_batched_write_requests(int64_t decree,
message_ex **requests,
int request_length)
{
int storage_error = 0;
int storage_error = rocksdb::Status::kOk;
for (int i = 0; i < request_length; ++i) {
// TODO(yingchun): better to return error_code
int e = on_request(requests[i]);
if (e != 0) {
LOG_ERROR_PREFIX("got storage error when handler request({})",
if (e != rocksdb::Status::kOk) {
LOG_ERROR_PREFIX("got storage engine error when handler request({})",
requests[i]->header->rpc_name);
storage_error = e;
}
Expand Down Expand Up @@ -425,16 +425,16 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
}
}

int perror = on_batched_write_requests(
int storage_error = on_batched_write_requests(
mu->data.header.decree, mu->data.header.timestamp, batched_requests, batched_count);

// release faked requests
for (int i = 0; i < faked_count; i++) {
faked_requests[i]->release_ref();
}

if (perror != 0) {
LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
if (storage_error != rocksdb::Status::kOk) {
LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), storage_error);
// For normal write requests, if got rocksdb error, this replica will be set error and evoke
// learn.
// For ingestion requests, should not do as normal write requests, there are two reasons:
Expand Down
5 changes: 3 additions & 2 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class replication_app_base : public replica_base
//
virtual replication::decree last_durable_decree() const = 0;
virtual replication::decree last_flushed_decree() const { return last_durable_decree(); }
// return the error generated by storage engine
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) = 0;

//
Expand All @@ -225,8 +225,9 @@ class replication_app_base : public replica_base
// The base class gives a naive implementation that just call on_request
// repeatedly. Storage engine may override this function to get better performance.
//
// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_batched_write_requests(int64_t decree,
uint64_t timstamp,
uint64_t timestamp,
message_ex **requests,
int request_length);

Expand Down
8 changes: 2 additions & 6 deletions src/server/pegasus_read_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ class pegasus_read_service : public dsn::replication::replication_app_base,
pegasus_read_service(dsn::replication::replica *r) : dsn::replication::replication_app_base(r)
{
}
virtual ~pegasus_read_service() {}
virtual int on_request(dsn::message_ex *request) override
{
handle_request(request);
return 0;
}

int on_request(dsn::message_ex *request) override { return handle_request(request); }

protected:
// all service handlers to be implemented further
Expand Down
14 changes: 8 additions & 6 deletions src/server/pegasus_server_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

#include <rocksdb/status.h>
#include <stdio.h>
#include <thrift/transport/TTransportException.h>
#include <algorithm>
Expand Down Expand Up @@ -84,7 +85,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
return 0;
return rocksdb::Status::kOk;
}

return on_batched_writes(requests, count);
Expand All @@ -94,7 +95,7 @@ void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_defau

int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
int err = 0;
int err = rocksdb::Status::kOk;
{
_write_svc->batch_prepare(_decree);

Expand All @@ -104,7 +105,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.
int local_err = 0;
int local_err = rocksdb::Status::kOk;
try {
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
Expand All @@ -130,13 +131,14 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
ex.what());
}

if (!err && local_err) {
if (err == rocksdb::Status::kOk && local_err != rocksdb::Status::kOk) {
err = local_err;
}
}

if (dsn_unlikely(err != 0 || _put_rpc_batch.size() + _remove_rpc_batch.size() == 0)) {
_write_svc->batch_abort(_decree, err == 0 ? -1 : err);
if (dsn_unlikely(err != rocksdb::Status::kOk ||
(_put_rpc_batch.empty() && _remove_rpc_batch.empty()))) {
_write_svc->batch_abort(_decree, err == rocksdb::Status::kOk ? -1 : err);
} else {
err = _write_svc->batch_commit(_decree);
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class pegasus_server_write : public dsn::replication::replica_base
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
/// As long as the returned error is 0, the operation is guaranteed to be
/// As long as the returned error is rocksdb::Status::kOk, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn::message_ex **requests,
Expand Down
4 changes: 2 additions & 2 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ int pegasus_write_service::duplicate(int64_t decree,
remove_rpc remove;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
int err = 0;
int err = rocksdb::Status::kOk;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
put = put_rpc(write);
err = _impl->batch_put(ctx, put.request(), put.response());
Expand Down Expand Up @@ -422,7 +422,7 @@ int pegasus_write_service::ingest_files(int64_t decree,
resp.err = dsn::ERR_OK;
// write empty put to flush decree
resp.rocksdb_error = empty_put(decree);
if (resp.rocksdb_error != 0) {
if (resp.rocksdb_error != rocksdb::Status::kOk) {
resp.err = dsn::ERR_TRY_AGAIN;
return resp.rocksdb_error;
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@ class pegasus_write_service : dsn::replication::replica_base
void batch_prepare(int64_t decree);

// Add PUT record in batch write.
// \returns 0 if success, non-0 if failure.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);

// Add REMOVE record in batch write.
// \returns 0 if success, non-0 if failure.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);

// Commit batch write.
// \returns 0 if success, non-0 if failure.
// NOTE that if the batch contains no updates, 0 is returned.
// \returns rocksdb::Status::Code.
// NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned.
int batch_commit(int64_t decree);

// Abort batch write.
Expand Down
22 changes: 11 additions & 11 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace pegasus {
namespace server {

/// internal error codes used for fail injection
// TODO(yingchun): Use real rocksdb::Status::code.
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
Expand Down Expand Up @@ -99,12 +100,11 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
int err =
_rocksdb_wrapper->write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
if (err) {
if (err != rocksdb::Status::kOk) {
return err;
}

err = _rocksdb_wrapper->write(decree);
return err;
return _rocksdb_wrapper->write(decree);
}

int multi_put(const db_write_context &ctx,
Expand Down Expand Up @@ -170,7 +170,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == 0) {
if (resp.error == rocksdb::Status::kOk) {
resp.count = update.sort_keys.size();
}
return resp.error;
Expand All @@ -188,7 +188,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
resp.error = err;
return err;
}
Expand Down Expand Up @@ -252,7 +252,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == 0) {
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}
return resp.error;
Expand Down Expand Up @@ -283,7 +283,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, "
"check_sort_key: {}",
Expand Down Expand Up @@ -352,7 +352,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}

return 0;
return rocksdb::Status::kOk;
}

int check_and_mutate(int64_t decree,
Expand Down Expand Up @@ -404,7 +404,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
db_get_context get_context;
dsn::string_view check_raw_key(check_key.data(), check_key.length());
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != 0) {
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, "
"check_sort_key: {}",
Expand Down Expand Up @@ -475,7 +475,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
return 0;
return rocksdb::Status::kOk;
}

// \return ERR_INVALID_VERSION: replay or commit out-date ingest request
Expand Down Expand Up @@ -508,7 +508,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

// ingest external files
if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
0)) {
rocksdb::Status::kOk)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,

db_get_context get_ctx;
int err = get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return err;
}
// if record exists and is not expired.
Expand Down
7 changes: 4 additions & 3 deletions src/server/rocksdb_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class rocksdb_wrapper : public dsn::replication::replica_base
rocksdb_wrapper(pegasus_server_impl *server);

/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
/// \returns rocksdb::Status::kOk if Get succeeded. On failure, a non-zero rocksdb status code
/// is returned.
/// \result ctx.expired=true if record expired. Still rocksdb::Status::kOk is returned.
/// \result ctx.found=false if record is not found. Still rocksdb::Status::kOk is returned.
int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);

int write_batch_put(int64_t decree,
Expand Down

0 comments on commit 58bad74

Please sign in to comment.