diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h index ccacd5ba0c..6943fff994 100644 --- a/src/base/pegasus_value_schema.h +++ b/src/base/pegasus_value_schema.h @@ -21,6 +21,12 @@ namespace pegasus { #define PEGASUS_DATA_VERSION_MAX 0u +/// Generates timetag in host endian. +inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag) +{ + return timestamp << 8u | cluster_id << 1u | delete_tag; +} + /// Extracts expire_ts from rocksdb value with given version. /// The value schema must be in v0. /// \return expire_ts in host endian @@ -46,7 +52,7 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob version, PEGASUS_DATA_VERSION_MAX); - std::string *s = new std::string(std::move(raw_value)); + auto *s = new std::string(std::move(raw_value)); dsn::data_input input(*s); input.skip(sizeof(uint32_t)); dsn::string_view view = input.read_str(); @@ -127,7 +133,7 @@ class pegasus_value_generator _write_slices.emplace_back(user_data.data(), user_data.length()); } - return rocksdb::SliceParts(&_write_slices[0], static_cast(_write_slices.size())); + return {&_write_slices[0], static_cast(_write_slices.size())}; } private: diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index ef9e9a08c9..78adfd8797 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4281,6 +4281,12 @@ void duplicate_request::__set_cluster_id(const int8_t val) __isset.cluster_id = true; } +void duplicate_request::__set_verify_timetag(const bool val) +{ + this->verify_timetag = val; + __isset.verify_timetag = true; +} + uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4332,6 +4338,14 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->verify_timetag); + this->__isset.verify_timetag = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -4370,6 +4384,11 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeByte(this->cluster_id); xfer += oprot->writeFieldEnd(); } + if (this->__isset.verify_timetag) { + xfer += oprot->writeFieldBegin("verify_timetag", ::apache::thrift::protocol::T_BOOL, 5); + xfer += oprot->writeBool(this->verify_timetag); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4382,6 +4401,7 @@ void swap(duplicate_request &a, duplicate_request &b) swap(a.task_code, b.task_code); swap(a.raw_message, b.raw_message); swap(a.cluster_id, b.cluster_id); + swap(a.verify_timetag, b.verify_timetag); swap(a.__isset, b.__isset); } @@ -4391,6 +4411,7 @@ duplicate_request::duplicate_request(const duplicate_request &other126) task_code = other126.task_code; raw_message = other126.raw_message; cluster_id = other126.cluster_id; + verify_timetag = other126.verify_timetag; __isset = other126.__isset; } duplicate_request::duplicate_request(duplicate_request &&other127) @@ -4399,6 +4420,7 @@ duplicate_request::duplicate_request(duplicate_request &&other127) task_code = std::move(other127.task_code); raw_message = std::move(other127.raw_message); cluster_id = std::move(other127.cluster_id); + verify_timetag = std::move(other127.verify_timetag); __isset = std::move(other127.__isset); } duplicate_request &duplicate_request::operator=(const duplicate_request &other128) @@ -4407,6 +4429,7 @@ duplicate_request &duplicate_request::operator=(const duplicate_request &other12 task_code = other128.task_code; raw_message = other128.raw_message; cluster_id = other128.cluster_id; + verify_timetag = other128.verify_timetag; __isset = other128.__isset; return *this; } @@ -4416,6 +4439,7 @@ duplicate_request &duplicate_request::operator=(duplicate_request &&other129) task_code = std::move(other129.task_code); raw_message = std::move(other129.raw_message); cluster_id = std::move(other129.cluster_id); + verify_timetag = std::move(other129.verify_timetag); __isset = std::move(other129.__isset); return *this; } @@ -4434,6 +4458,9 @@ void duplicate_request::printTo(std::ostream &out) const out << ", " << "cluster_id="; (__isset.cluster_id ? (out << to_string(cluster_id)) : (out << "")); + out << ", " + << "verify_timetag="; + (__isset.verify_timetag ? (out << to_string(verify_timetag)) : (out << "")); out << ")"; } @@ -4445,6 +4472,12 @@ void duplicate_response::__set_error(const int32_t val) __isset.error = true; } +void duplicate_response::__set_error_hint(const std::string &val) +{ + this->error_hint = val; + __isset.error_hint = true; +} + uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4472,6 +4505,14 @@ uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->error_hint); + this->__isset.error_hint = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -4495,6 +4536,11 @@ uint32_t duplicate_response::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeI32(this->error); xfer += oprot->writeFieldEnd(); } + if (this->__isset.error_hint) { + xfer += oprot->writeFieldBegin("error_hint", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->error_hint); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4504,28 +4550,33 @@ void swap(duplicate_response &a, duplicate_response &b) { using ::std::swap; swap(a.error, b.error); + swap(a.error_hint, b.error_hint); swap(a.__isset, b.__isset); } duplicate_response::duplicate_response(const duplicate_response &other130) { error = other130.error; + error_hint = other130.error_hint; __isset = other130.__isset; } duplicate_response::duplicate_response(duplicate_response &&other131) { error = std::move(other131.error); + error_hint = std::move(other131.error_hint); __isset = std::move(other131.__isset); } duplicate_response &duplicate_response::operator=(const duplicate_response &other132) { error = other132.error; + error_hint = other132.error_hint; __isset = other132.__isset; return *this; } duplicate_response &duplicate_response::operator=(duplicate_response &&other133) { error = std::move(other133.error); + error_hint = std::move(other133.error_hint); __isset = std::move(other133.__isset); return *this; } @@ -4535,6 +4586,9 @@ void duplicate_response::printTo(std::ostream &out) const out << "duplicate_response("; out << "error="; (__isset.error ? (out << to_string(error)) : (out << "")); + out << ", " + << "error_hint="; + (__isset.error_hint ? (out << to_string(error_hint)) : (out << "")); out << ")"; } } diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 5718c95f9f..ae17330149 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -264,11 +264,17 @@ struct duplicate_request // ID of the cluster where this write comes from. 4: optional byte cluster_id + + // Whether to compare the timetag of old value with the new write's. + 5: optional bool verify_timetag } struct duplicate_response { 1: optional i32 error; + + // hints on the reason why this duplicate failed. + 2: optional string error_hint; } service rrdb diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index b9ed126f5a..0baf6d40b9 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1799,13 +1799,18 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) typedef struct _duplicate_request__isset { _duplicate_request__isset() - : timestamp(false), task_code(false), raw_message(false), cluster_id(false) + : timestamp(false), + task_code(false), + raw_message(false), + cluster_id(false), + verify_timetag(false) { } bool timestamp : 1; bool task_code : 1; bool raw_message : 1; bool cluster_id : 1; + bool verify_timetag : 1; } _duplicate_request__isset; class duplicate_request @@ -1815,13 +1820,14 @@ class duplicate_request duplicate_request(duplicate_request &&); duplicate_request &operator=(const duplicate_request &); duplicate_request &operator=(duplicate_request &&); - duplicate_request() : timestamp(0), cluster_id(0) {} + duplicate_request() : timestamp(0), cluster_id(0), verify_timetag(0) {} virtual ~duplicate_request() throw(); int64_t timestamp; ::dsn::task_code task_code; ::dsn::blob raw_message; int8_t cluster_id; + bool verify_timetag; _duplicate_request__isset __isset; @@ -1833,6 +1839,8 @@ class duplicate_request void __set_cluster_id(const int8_t val); + void __set_verify_timetag(const bool val); + bool operator==(const duplicate_request &rhs) const { if (__isset.timestamp != rhs.__isset.timestamp) @@ -1851,6 +1859,10 @@ class duplicate_request return false; else if (__isset.cluster_id && !(cluster_id == rhs.cluster_id)) return false; + if (__isset.verify_timetag != rhs.__isset.verify_timetag) + return false; + else if (__isset.verify_timetag && !(verify_timetag == rhs.verify_timetag)) + return false; return true; } bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } @@ -1873,8 +1885,9 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj) typedef struct _duplicate_response__isset { - _duplicate_response__isset() : error(false) {} + _duplicate_response__isset() : error(false), error_hint(false) {} bool error : 1; + bool error_hint : 1; } _duplicate_response__isset; class duplicate_response @@ -1884,21 +1897,28 @@ class duplicate_response duplicate_response(duplicate_response &&); duplicate_response &operator=(const duplicate_response &); duplicate_response &operator=(duplicate_response &&); - duplicate_response() : error(0) {} + duplicate_response() : error(0), error_hint() {} virtual ~duplicate_response() throw(); int32_t error; + std::string error_hint; _duplicate_response__isset __isset; void __set_error(const int32_t val); + void __set_error_hint(const std::string &val); + bool operator==(const duplicate_response &rhs) const { if (__isset.error != rhs.__isset.error) return false; else if (__isset.error && !(error == rhs.error)) return false; + if (__isset.error_hint != rhs.__isset.error_hint) + return false; + else if (__isset.error_hint && !(error_hint == rhs.error_hint)) + return false; return true; } bool operator!=(const duplicate_response &rhs) const { return !(*this == rhs); } diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 7b77542387..f7d91d58fd 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -135,6 +135,8 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(), rpc.request().timestamp); } + // duplicating an illegal write to server is unacceptable, fail fast. + dassert_replica(perr != PERR_INVALID_ARGUMENT, rpc.response().error_hint); } else { _shipped_ops->increment(); _total_shipped_size += diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index be398e8b27..7645c9d68f 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -3,20 +3,21 @@ // can be found in the LICENSE file in the root directory of this source tree. #include +#include +#include -#include "base/pegasus_utils.h" #include "base/pegasus_key_schema.h" #include "pegasus_server_write.h" #include "pegasus_server_impl.h" #include "logging_utils.h" +#include "pegasus_mutation_duplicator.h" namespace pegasus { namespace server { pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log) - : replica_base(*server), _verbose_log(verbose_log) + : replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log) { - _write_svc = dsn::make_unique(server); } int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, @@ -24,6 +25,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, int64_t decree, uint64_t timestamp) { + _write_ctx = db_write_context::create(decree, timestamp); _decree = decree; // Write down empty record (RPC_REPLICATION_WRITE_EMPTY) to update @@ -37,7 +39,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { dassert(count == 1, "count = %d", count); auto rpc = multi_put_rpc::auto_reply(requests[0]); - return _write_svc->multi_put(_decree, rpc.request(), rpc.response()); + return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response()); } if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { dassert(count == 1, "count = %d", count); @@ -49,6 +51,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, auto rpc = incr_rpc::auto_reply(requests[0]); return _write_svc->incr(_decree, rpc.request(), rpc.response()); } + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { + dassert(count == 1, "count = %d", count); + auto rpc = duplicate_rpc::auto_reply(requests[0]); + return _write_svc->duplicate(_decree, rpc.request(), rpc.response()); + } if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { dassert(count == 1, "count = %d", count); auto rpc = check_and_set_rpc::auto_reply(requests[0]); @@ -91,7 +98,8 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } else { if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT || rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || - rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + rpc_code == dsn::apps::RPC_RRDB_RRDB_INCR || + rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { dfatal("rpc code not allow batch: %s", rpc_code.to_string()); } else { dfatal("rpc code not handled: %s", rpc_code.to_string()); @@ -117,10 +125,10 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } void pegasus_server_write::request_key_check(int64_t decree, - dsn::message_ex *m, + dsn::message_ex *msg, const dsn::blob &key) { - auto msg = (dsn::message_ex *)m; + // TODO(wutao1): server should not assert when client's hash is incorrect. if (msg->header->client.partition_hash != 0) { uint64_t partition_hash = pegasus_key_hash(key); dassert(msg->header->client.partition_hash == partition_hash, diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 1fefde1f2d..0d4a36ec0a 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -39,7 +39,7 @@ class pegasus_server_write : public dsn::replication::replica_base int on_single_put_in_batch(put_rpc &rpc) { - int err = _write_svc->batch_put(_decree, rpc.request(), rpc.response()); + int err = _write_svc->batch_put(_write_ctx, rpc.request(), rpc.response()); request_key_check(_decree, rpc.dsn_request(), rpc.request().key); return err; } @@ -63,6 +63,7 @@ class pegasus_server_write : public dsn::replication::replica_base std::vector _put_rpc_batch; std::vector _remove_rpc_batch; + db_write_context _write_ctx; int64_t _decree; const bool _verbose_log; diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 7772ddaa17..09e50e12cb 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -2,10 +2,13 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include "base/pegasus_rpc_types.h" #include "pegasus_write_service.h" #include "pegasus_write_service_impl.h" #include "capacity_unit_calculator.h" +#include + namespace pegasus { namespace server { @@ -95,19 +98,24 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) name.c_str(), COUNTER_TYPE_NUMBER_PERCENTILES, "statistic the latency of CHECK_AND_MUTATE request"); + + _pfc_duplicate_qps.init_app_counter("app.pegasus", + fmt::format("duplicate_qps@{}", str_gpid).c_str(), + COUNTER_TYPE_RATE, + "statistic the qps of DUPLICATE requests"); } pegasus_write_service::~pegasus_write_service() {} int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); } -int pegasus_write_service::multi_put(int64_t decree, +int pegasus_write_service::multi_put(const db_write_context &ctx, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp) { uint64_t start_time = dsn_now_ns(); _pfc_multi_put_qps->increment(); - int err = _impl->multi_put(decree, update, resp); + int err = _impl->multi_put(ctx, update, resp); if (_server->is_primary()) { _cu_calculator->add_multi_put_cu(resp.error, update.kvs); @@ -189,7 +197,7 @@ void pegasus_write_service::batch_prepare(int64_t decree) _batch_start_time = dsn_now_ns(); } -int pegasus_write_service::batch_put(int64_t decree, +int pegasus_write_service::batch_put(const db_write_context &ctx, const dsn::apps::update_request &update, dsn::apps::update_response &resp) { @@ -197,7 +205,7 @@ int pegasus_write_service::batch_put(int64_t decree, _batch_qps_perfcounters.push_back(_pfc_put_qps.get()); _batch_latency_perfcounters.push_back(_pfc_put_latency.get()); - int err = _impl->batch_put(decree, update, resp); + int err = _impl->batch_put(ctx, update, resp); if (_server->is_primary()) { _cu_calculator->add_put_cu(resp.error, update.key, update.value); @@ -256,5 +264,62 @@ void pegasus_write_service::clear_up_batch_states() _batch_start_time = 0; } +int pegasus_write_service::duplicate(int64_t decree, + const dsn::apps::duplicate_request &request, + dsn::apps::duplicate_response &resp) +{ + // Verifies the cluster_id. + if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) { + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint("request cluster id is unconfigured"); + return empty_put(decree); + } + if (request.cluster_id == get_current_cluster_id()) { + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint("self-duplicating"); + return empty_put(decree); + } + + _pfc_duplicate_qps->increment(); + dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, request.raw_message); + bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || + request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE; + auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete); + auto ctx = db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag); + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { + multi_put_rpc rpc(write); + resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response())); + return resp.error; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { + multi_remove_rpc rpc(write); + resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response())); + return resp.error; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT || + request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + int err = 0; + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) { + put_rpc rpc(write); + err = _impl->batch_put(ctx, rpc.request(), rpc.response()); + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + remove_rpc rpc(write); + err = _impl->batch_remove(ctx.decree, rpc.request(), rpc.response()); + } + if (!err) { + err = _impl->batch_commit(ctx.decree); + } else { + _impl->batch_abort(ctx.decree, err); + } + resp.__set_error(err); + return resp.error; + } + resp.__set_error(rocksdb::Status::kInvalidArgument); + resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); + return empty_put(ctx.decree); +} + } // namespace server } // namespace pegasus diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 284ad23195..1a3b47b03d 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -23,6 +23,52 @@ inline uint8_t get_current_cluster_id() return cluster_id; } +// The context of an mutation to the database. +struct db_write_context +{ + // the mutation decree + int64_t decree{0}; + + // The time when this mutation is generated. + // This is used to calculate the new timetag. + uint64_t timestamp{0}; + + // timetag of the remote write, 0 if it's not from remote. + uint64_t remote_timetag{0}; + + // Whether to compare the timetag of old value with the new write's. + // - If true, it requires a read to the DB before write. If the old record has a larger timetag + // than the `remote_timetag`, the write will be ignored, otherwise it will be applied using + // the new timetag generated by local cluster. + // - If false, no overhead for the write but the eventual consistency on duplication + // is not guaranteed. + bool verfiy_timetag{false}; + + static inline db_write_context empty(int64_t d) { return create(d, 0); } + + // Creates a context for normal write. + static inline db_write_context create(int64_t decree, uint64_t timestamp) + { + db_write_context ctx; + ctx.decree = decree; + ctx.timestamp = timestamp; + return ctx; + } + + // Creates a context for duplicated write. + static inline db_write_context + create_duplicate(int64_t decree, uint64_t remote_timetag, bool verify_timetag) + { + db_write_context ctx; + ctx.decree = decree; + ctx.remote_timetag = remote_timetag; + ctx.verfiy_timetag = verify_timetag; + return ctx; + } + + bool is_duplicated_write() const { return remote_timetag > 0; } +}; + class pegasus_server_impl; class capacity_unit_calculator; @@ -43,7 +89,7 @@ class pegasus_write_service int empty_put(int64_t decree); // Write MULTI_PUT record. - int multi_put(int64_t decree, + int multi_put(const db_write_context &ctx, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp); @@ -65,11 +111,12 @@ class pegasus_write_service const dsn::apps::check_and_mutate_request &update, dsn::apps::check_and_mutate_response &resp); + // Handles DUPLICATE duplicated from remote. + int duplicate(int64_t decree, + const dsn::apps::duplicate_request &update, + dsn::apps::duplicate_response &resp); + /// For batch write. - /// NOTE: A batch write may incur a database read for consistency check of timetag. - /// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag) - /// To disable the consistency check, unset `verify_timetag` under `pegasus.server` section - /// in configuration. // Prepare batch write. void batch_prepare(int64_t decree); @@ -77,7 +124,7 @@ class pegasus_write_service // Add PUT record in batch write. // \returns 0 if success, non-0 if failure. // NOTE that `resp` should not be moved or freed while the batch is not committed. - int batch_put(int64_t decree, + int batch_put(const db_write_context &ctx, const dsn::apps::update_request &update, dsn::apps::update_response &resp); @@ -119,6 +166,7 @@ class pegasus_write_service ::dsn::perf_counter_wrapper _pfc_incr_qps; ::dsn::perf_counter_wrapper _pfc_check_and_set_qps; ::dsn::perf_counter_wrapper _pfc_check_and_mutate_qps; + ::dsn::perf_counter_wrapper _pfc_duplicate_qps; ::dsn::perf_counter_wrapper _pfc_put_latency; ::dsn::perf_counter_wrapper _pfc_multi_put_latency; diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index cb93d2a0d9..cf5ab10f52 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -25,7 +25,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base { public: explicit impl(pegasus_server_impl *server) - : replica_base(*server), + : replica_base(server), _primary_address(server->_primary_address), _pegasus_data_version(server->_pegasus_data_version), _db(server->_db), @@ -50,10 +50,11 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return err; } - int multi_put(int64_t decree, + int multi_put(const db_write_context &ctx, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp) { + int64_t decree = ctx.decree; resp.app_id = get_gpid().get_app_id(); resp.partition_index = get_gpid().get_partition_index(); resp.decree = decree; @@ -69,10 +70,10 @@ class pegasus_write_service::impl : public dsn::replication::replica_base } for (auto &kv : update.kvs) { - resp.error = db_write_batch_put(decree, - composite_raw_key(update.hash_key, kv.key), - kv.value, - static_cast(update.expire_ts_seconds)); + resp.error = db_write_batch_put_ctx(ctx, + composite_raw_key(update.hash_key, kv.key), + kv.value, + static_cast(update.expire_ts_seconds)); if (resp.error) { clear_up_batch_states(decree, resp.error); return resp.error; @@ -460,12 +461,12 @@ class pegasus_write_service::impl : public dsn::replication::replica_base /// For batch write. - int batch_put(int64_t decree, + int batch_put(const db_write_context &ctx, const dsn::apps::update_request &update, dsn::apps::update_response &resp) { - resp.error = db_write_batch_put( - decree, update.key, update.value, static_cast(update.expire_ts_seconds)); + resp.error = db_write_batch_put_ctx( + ctx, update.key, update.value, static_cast(update.expire_ts_seconds)); _update_responses.emplace_back(&resp); return resp.error; } @@ -499,10 +500,22 @@ class pegasus_write_service::impl : public dsn::replication::replica_base dsn::string_view raw_key, dsn::string_view value, uint32_t expire_sec) + { + return db_write_batch_put_ctx(db_write_context::empty(decree), raw_key, value, expire_sec); + } + + int db_write_batch_put_ctx(const db_write_context &ctx, + dsn::string_view raw_key, + dsn::string_view value, + uint32_t expire_sec) { FAIL_POINT_INJECT_F("db_write_batch_put", [](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; }); + if (ctx.verfiy_timetag) { + // TBD(wutao1) + } + rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key); rocksdb::SliceParts skey_parts(&skey, 1); rocksdb::SliceParts svalue = @@ -514,7 +527,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base derror_rocksdb("WriteBatchPut", s.ToString(), "decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}", - decree, + ctx.decree, utils::c_escape_string(hash_key), utils::c_escape_string(sort_key), expire_sec); @@ -574,7 +587,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base _batch.Clear(); } - dsn::blob composite_raw_key(dsn::string_view hash_key, dsn::string_view sort_key) + static dsn::blob composite_raw_key(dsn::string_view hash_key, dsn::string_view sort_key) { dsn::blob raw_key; pegasus_generate_key(raw_key, hash_key, sort_key); @@ -582,7 +595,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base } // return true if the check type is supported - bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type) + static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type) { return check_type >= ::dsn::apps::cas_check_type::CT_NO_CHECK && check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER; diff --git a/src/server/test/pegasus_write_service_test.cpp b/src/server/test/pegasus_write_service_test.cpp index 7b36ae93de..dbd8403b02 100644 --- a/src/server/test/pegasus_write_service_test.cpp +++ b/src/server/test/pegasus_write_service_test.cpp @@ -32,11 +32,12 @@ class pegasus_write_service_test : public pegasus_server_test_base dsn::apps::update_response response; int64_t decree = 10; + auto ctx = db_write_context::empty(decree); std::string hash_key = "hash_key"; // alarm for empty request request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size()); - int err = _write_svc->multi_put(decree, request, response); + int err = _write_svc->multi_put(ctx, request, response); ASSERT_EQ(err, 0); verify_response(response, rocksdb::Status::kInvalidArgument, decree); @@ -57,20 +58,20 @@ class pegasus_write_service_test : public pegasus_server_test_base { dsn::fail::cfg("db_write_batch_put", "100%1*return()"); - err = _write_svc->multi_put(decree, request, response); + err = _write_svc->multi_put(ctx, request, response); ASSERT_EQ(err, FAIL_DB_WRITE_BATCH_PUT); verify_response(response, err, decree); } { dsn::fail::cfg("db_write", "100%1*return()"); - err = _write_svc->multi_put(decree, request, response); + err = _write_svc->multi_put(ctx, request, response); ASSERT_EQ(err, FAIL_DB_WRITE); verify_response(response, err, decree); } { // success - err = _write_svc->multi_put(decree, request, response); + err = _write_svc->multi_put(ctx, request, response); ASSERT_EQ(err, 0); verify_response(response, 0, decree); } @@ -153,7 +154,7 @@ class pegasus_write_service_test : public pegasus_server_test_base for (int i = 0; i < kv_num; i++) { dsn::apps::update_request req; req.key = key[i]; - _write_svc->batch_put(decree, req, responses[i]); + _write_svc->batch_put(db_write_context::empty(decree), req, responses[i]); } for (int i = 0; i < kv_num; i++) { _write_svc->batch_remove(decree, key[i], responses[i]);