From a1a3e7556cc8ad3c09487a873c0211c5ce759136 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 20 Sep 2019 13:56:22 +0800 Subject: [PATCH 01/17] shell: add debugging commands for hex and escaped-bytes conversion --- src/shell/commands.h | 7 +++- src/shell/commands/debugger.cpp | 58 +++++++++++++++++++++++++++++++-- src/shell/main.cpp | 18 ++++++++++ 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/shell/commands.h b/src/shell/commands.h index 6515a0352e..3a7319c3b2 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -206,3 +205,9 @@ bool sst_dump(command_executor *e, shell_context *sc, arguments args); bool mlog_dump(command_executor *e, shell_context *sc, arguments args); bool local_get(command_executor *e, shell_context *sc, arguments args); + +bool rdb_key_hex2str(command_executor *e, shell_context *sc, arguments args); + +bool rdb_key_str2hex(command_executor *e, shell_context *sc, arguments args); + +bool rdb_value_hex2str(command_executor *e, shell_context *sc, arguments args); diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp index 5d70b43811..7121cfb01b 100644 --- a/src/shell/commands/debugger.cpp +++ b/src/shell/commands/debugger.cpp @@ -3,6 +3,9 @@ // can be found in the LICENSE file in the root directory of this source tree. #include "shell/commands.h" +#include +#include +#include bool sst_dump(command_executor *e, shell_context *sc, arguments args) { @@ -67,8 +70,10 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) std::function callback; if (detailed) { - callback = [&os, sc]( - int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count) mutable { + callback = [&os, sc](int64_t decree, + int64_t timestamp, + dsn::message_ex **requests, + int count) mutable { for (int i = 0; i < count; ++i) { dsn::message_ex *request = requests[i]; dassert(request != nullptr, ""); @@ -189,3 +194,52 @@ bool local_get(command_executor *e, shell_context *sc, arguments args) delete db; return true; } + +bool rdb_key_str2hex(command_executor *e, shell_context *sc, arguments args) +{ + if (args.argc != 3) { + return false; + } + std::string hash_key = sds_to_string(args.argv[1]); + std::string sort_key = sds_to_string(args.argv[2]); + ::dsn::blob key; + pegasus::pegasus_generate_key(key, hash_key, sort_key); + rocksdb::Slice skey(key.data(), key.length()); + fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str()); + return true; +} + +bool rdb_key_hex2str(command_executor *e, shell_context *sc, arguments args) +{ + if (args.argc != 2) { + return false; + } + std::string hex_rdb_key = sds_to_string(args.argv[1]); + dsn::blob key = dsn::blob::create_from_bytes(rocksdb::LDBCommand::HexToString(hex_rdb_key)); + std::string hash_key, sort_key; + pegasus::pegasus_restore_key(key, hash_key, sort_key); + fmt::print( + stderr, "\nhash key: \"{}\"\n", pegasus::utils::c_escape_string(hash_key, sc->escape_all)); + fmt::print( + stderr, "\nsort key: \"{}\"\n", pegasus::utils::c_escape_string(sort_key, sc->escape_all)); + return true; +} + +bool rdb_value_hex2str(command_executor *e, shell_context *sc, arguments args) +{ + if (args.argc != 2) { + return false; + } + std::string hex_rdb_value = sds_to_string(args.argv[1]); + std::string pegasus_value = rocksdb::LDBCommand::HexToString(hex_rdb_value); + auto expire_ts = static_cast(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) + + pegasus::utils::epoch_begin; // TODO(wutao): pass user specified version + fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n", *std::localtime(&expire_ts)); + + dsn::blob user_data; + pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data); + fprintf(stderr, + "user_data:\n \"%s\"\n", + pegasus::utils::c_escape_string(user_data.to_string(), sc->escape_all).c_str()); + return true; +} diff --git a/src/shell/main.cpp b/src/shell/main.cpp index e28810fdd7..46f0b9cd96 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -325,6 +325,24 @@ static command_executor commands[] = { { "local_get", "get value from local db", " ", local_get, }, + { + "rdb_key_str2hex", + "transform the given hashkey and sortkey to rocksdb raw key in hex representation", + " ", + rdb_key_str2hex, + }, + { + "rdb_key_hex2str", + "transform the given rocksdb raw key in hex representation to hash key and sort key", + "", + rdb_key_hex2str, + }, + { + "rdb_value_hex2str", + "parse the given rocksdb raw value in hex representation", + "", + rdb_value_hex2str, + }, { "sst_dump", "dump sstable dir or files", From 64cdf424c1ee556b5987ceaf29a610e711b89ed8 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 20 Sep 2019 15:10:24 +0800 Subject: [PATCH 02/17] fix format --- src/shell/commands/debugger.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp index 7121cfb01b..d174643057 100644 --- a/src/shell/commands/debugger.cpp +++ b/src/shell/commands/debugger.cpp @@ -70,10 +70,8 @@ bool mlog_dump(command_executor *e, shell_context *sc, arguments args) std::function callback; if (detailed) { - callback = [&os, sc](int64_t decree, - int64_t timestamp, - dsn::message_ex **requests, - int count) mutable { + callback = [&os, sc]( + int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count) mutable { for (int i = 0; i < count; ++i) { dsn::message_ex *request = requests[i]; dassert(request != nullptr, ""); From 6feda05cce0bc236031c3efbb013c6e72ed5dc01 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 23 Sep 2019 17:40:32 +0800 Subject: [PATCH 03/17] feat(dup): implement pegasus_mutation_duplicator --- CMakeLists.txt | 0 src/base/pegasus_key_schema.h | 6 + src/base/pegasus_rpc_types.h | 2 + src/base/pegasus_value_schema.h | 19 + src/client_lib/pegasus_client_impl.h | 13 +- src/include/rrdb/rrdb.client.h | 12 + src/include/rrdb/rrdb.code.definition.h | 1 + src/include/rrdb/rrdb_types.h | 116 ++++++ src/server/pegasus_mutation_duplicator.cpp | 238 +++++++++++ src/server/pegasus_mutation_duplicator.h | 60 +++ src/server/pegasus_write_service.h | 9 + .../test/pegasus_mutation_duplicator_test.cpp | 383 ++++++++++++++++++ 12 files changed, 856 insertions(+), 3 deletions(-) create mode 100644 CMakeLists.txt create mode 100644 src/server/pegasus_mutation_duplicator.cpp create mode 100644 src/server/pegasus_mutation_duplicator.h create mode 100644 src/server/test/pegasus_mutation_duplicator_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index 3222fcd0de..135b858f93 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -150,4 +150,10 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key) } } +/// Calculate hash value from hash key. +inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key) +{ + return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0); +} + } // namespace pegasus diff --git a/src/base/pegasus_rpc_types.h b/src/base/pegasus_rpc_types.h index 3d7373413d..ac1eff49cd 100644 --- a/src/base/pegasus_rpc_types.h +++ b/src/base/pegasus_rpc_types.h @@ -22,6 +22,8 @@ using incr_rpc = dsn::rpc_holder; +using duplicate_rpc = dsn::apps::duplicate_rpc; + using check_and_mutate_rpc = dsn::rpc_holder; diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h index ccacd5ba0c..d5ac9eeece 100644 --- a/src/base/pegasus_value_schema.h +++ b/src/base/pegasus_value_schema.h @@ -21,6 +21,25 @@ namespace pegasus { #define PEGASUS_DATA_VERSION_MAX 0u +/// Generates timetag in host endian. +/// \see comment on pegasus_value_generator::generate_value_v1 +inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag) +{ + return timestamp << 8 | cluster_id << 1 | delete_tag; +} + +inline uint8_t extract_cluster_id_from_timetag(uint64_t timetag) +{ + // 7bit: 0x7F + return static_cast((timetag >> 1) & 0x7F); +} + +inline uint64_t extract_timestamp_from_timetag(uint64_t timetag) +{ + // 56bit: 0xFFFFFFFFFFFFFFL + return static_cast((timetag >> 8) & 0xFFFFFFFFFFFFFFL); +} + /// Extracts expire_ts from rocksdb value with given version. /// The value schema must be in v0. /// \return expire_ts in host endian diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h index b4edd0e1cb..1d11a84bc7 100644 --- a/src/client_lib/pegasus_client_impl.h +++ b/src/client_lib/pegasus_client_impl.h @@ -221,6 +221,13 @@ class pegasus_client_impl : public pegasus_client const scan_options &options, async_get_unordered_scanners_callback_t &&callback) override; + /// \internal + /// This is an internal function for duplication. + /// \see pegasus::server::pegasus_mutation_duplicator + void async_duplicate(dsn::apps::duplicate_rpc rpc, + std::function &&callback, + dsn::task_tracker *tracker); + virtual const char *get_error_string(int error_code) const override; static void init_error(); @@ -279,6 +286,9 @@ class pegasus_client_impl : public pegasus_client static const ::dsn::blob _max; }; + static int get_client_error(int server_error); + static int get_rocksdb_server_error(int rocskdb_error); + private: class pegasus_scanner_impl_wrapper : public abstract_pegasus_scanner { @@ -298,9 +308,6 @@ class pegasus_client_impl : public pegasus_client } }; - static int get_client_error(int server_error); - static int get_rocksdb_server_error(int rocskdb_error); - private: std::string _cluster_name; std::string _app_name; diff --git a/src/include/rrdb/rrdb.client.h b/src/include/rrdb/rrdb.client.h index 553817e705..438eefa6cb 100644 --- a/src/include/rrdb/rrdb.client.h +++ b/src/include/rrdb/rrdb.client.h @@ -8,6 +8,9 @@ namespace dsn { namespace apps { + +typedef rpc_holder duplicate_rpc; + class rrdb_client { public: @@ -405,6 +408,15 @@ class rrdb_client partition_hash); } + // ---------- call RPC_RRDB_RRDB_DUPLICATE ------------ + + // - asynchronous with on-stack duplicate_request and duplicate_response + template + task_ptr duplicate(duplicate_rpc &rpc, TCallback &&callback, dsn::task_tracker *tracker) + { + return rpc.call(_resolver, tracker, std::forward(callback)); + } + private: dsn::replication::partition_resolver_ptr _resolver; dsn::task_tracker _tracker; diff --git a/src/include/rrdb/rrdb.code.definition.h b/src/include/rrdb/rrdb.code.definition.h index 4d4a1bbd9f..621b49497a 100644 --- a/src/include/rrdb/rrdb.code.definition.h +++ b/src/include/rrdb/rrdb.code.definition.h @@ -10,6 +10,7 @@ DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_REMOVE, NOT_ALLOW_BATCH, IS_ID DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_INCR, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_SET, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_MUTATE, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_DUPLICATE, NOT_ALLOW_BATCH, IS_IDEMPOTENT) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_SORTKEY_COUNT) diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 563019414c..e04d524361 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -113,6 +113,10 @@ class scan_request; class scan_response; +class duplicate_request; + +class duplicate_response; + typedef struct _update_request__isset { _update_request__isset() : key(false), value(false), expire_ts_seconds(false) {} @@ -1791,6 +1795,118 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) obj.printTo(out); return out; } + +typedef struct _duplicate_request__isset +{ + _duplicate_request__isset() : timetag(false), task_code(false), raw_message(false), hash(false) + { + } + bool timetag : 1; + bool task_code : 1; + bool raw_message : 1; + bool hash : 1; +} _duplicate_request__isset; + +class duplicate_request +{ +public: + duplicate_request(const duplicate_request &); + duplicate_request(duplicate_request &&); + duplicate_request &operator=(const duplicate_request &); + duplicate_request &operator=(duplicate_request &&); + duplicate_request() : timetag(0), hash(0) {} + + virtual ~duplicate_request() throw(); + int64_t timetag; + ::dsn::task_code task_code; + ::dsn::blob raw_message; + int64_t hash; + + _duplicate_request__isset __isset; + + void __set_timetag(const int64_t val); + + void __set_task_code(const ::dsn::task_code &val); + + void __set_raw_message(const ::dsn::blob &val); + + void __set_hash(const int64_t val); + + bool operator==(const duplicate_request &rhs) const + { + if (!(timetag == rhs.timetag)) + return false; + if (!(task_code == rhs.task_code)) + return false; + if (!(raw_message == rhs.raw_message)) + return false; + if (!(hash == rhs.hash)) + return false; + return true; + } + bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } + + bool operator<(const duplicate_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(duplicate_request &a, duplicate_request &b); + +inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _duplicate_response__isset +{ + _duplicate_response__isset() : error(false) {} + bool error : 1; +} _duplicate_response__isset; + +class duplicate_response +{ +public: + duplicate_response(const duplicate_response &); + duplicate_response(duplicate_response &&); + duplicate_response &operator=(const duplicate_response &); + duplicate_response &operator=(duplicate_response &&); + duplicate_response() : error(0) {} + + virtual ~duplicate_response() throw(); + int32_t error; + + _duplicate_response__isset __isset; + + void __set_error(const int32_t val); + + bool operator==(const duplicate_response &rhs) const + { + if (!(error == rhs.error)) + return false; + return true; + } + bool operator!=(const duplicate_response &rhs) const { return !(*this == rhs); } + + bool operator<(const duplicate_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(duplicate_response &a, duplicate_response &b); + +inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp new file mode 100644 index 0000000000..04e91b5f9b --- /dev/null +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -0,0 +1,238 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "pegasus_mutation_duplicator.h" +#include "pegasus_server_impl.h" +#include "base/pegasus_rpc_types.h" + +#include +#include +#include +#include + +namespace dsn { +namespace replication { + +/// static definition of mutation_duplicator::creator. +/*static*/ std::function( + replica_base *, string_view, string_view)> + mutation_duplicator::creator = [](replica_base *r, string_view remote, string_view app) { + return make_unique(r, remote, app); + }; + +} // namespace replication +} // namespace dsn + +namespace pegasus { +namespace server { + +using namespace dsn::literals::chrono_literals; + +/*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data) +{ + if (tc == dsn::apps::RPC_RRDB_RRDB_PUT) { + dsn::apps::update_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_key_hash(thrift_request.key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + dsn::blob raw_key; + dsn::from_blob_to_thrift(data, raw_key); + return pegasus_key_hash(raw_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { + dsn::apps::multi_put_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { + dsn::apps::multi_remove_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + dfatal("unexpected task code: %s", tc.to_string()); + __builtin_unreachable(); +} + +pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::replica_base *r, + dsn::string_view remote_cluster, + dsn::string_view app) + : mutation_duplicator(r), _remote_cluster(remote_cluster) +{ + static bool _dummy = pegasus_client_factory::initialize(nullptr); + + pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data()); + _client = static_cast(client); + + auto ret = dsn::replication::get_duplication_cluster_id(remote_cluster.data()); + dassert_replica(ret.is_ok(), // never possible, meta server disallows such remote_cluster. + "invalid remote cluster: {}, err_ret: {}", + remote_cluster, + ret.get_error().description()); + _remote_cluster_id = static_cast(ret.get_value()); + + ddebug_replica("initialize mutation duplicator for local cluster [id:{}], " + "remote cluster [id:{}, addr:{}]", + get_current_cluster_id(), + _remote_cluster_id, + remote_cluster); + dassert_replica(get_current_cluster_id() != _remote_cluster_id, + "invalid remote cluster: {} {}", + remote_cluster, + _remote_cluster_id); + + std::string str_gpid = fmt::format("{}", get_gpid()); + _shipped_ops.init_app_counter("app.pegasus", + fmt::format("dup_shipped_ops@{}", str_gpid).c_str(), + COUNTER_TYPE_RATE, + "the total ops of DUPLICATE requests sent from this app"); + _failed_shipping_ops.init_app_counter( + "app.pegasus", + fmt::format("dup_failed_shipping_ops@{}", str_gpid).c_str(), + COUNTER_TYPE_RATE, + "the qps of failed DUPLICATE requests sent from this app"); +} + +static bool is_delete_operation(dsn::task_code code) +{ + return code == dsn::apps::RPC_RRDB_RRDB_REMOVE || code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE; +} + +void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) +{ + uint64_t start = dsn_now_ns(); + + duplicate_rpc rpc; + { + dsn::zauto_lock _(_lock); + rpc = _inflights[hash].front(); + _inflights[hash].pop_front(); + } + + _client->async_duplicate(rpc, + [cb, rpc, start, this](dsn::error_code err) mutable { + on_duplicate_reply(std::move(cb), std::move(rpc), start, err); + }, + _env.__conf.tracker); +} + +void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callback cb, + duplicate_rpc rpc, + uint64_t start_ns, + dsn::error_code err) +{ + int perr = PERR_OK; + if (err == dsn::ERR_OK) { + perr = client::pegasus_client_impl::get_client_error( + client::pegasus_client_impl::get_rocksdb_server_error(rpc.response().error)); + } + + if (perr != PERR_OK || err != dsn::ERR_OK) { + _failed_shipping_ops->increment(); + + // randomly log the 1% of the failed duplicate rpc. + if (dsn::rand::next_double01() <= 0.01) { + derror_replica("duplicate_rpc failed: {} [code:{}, cluster_id:{}, timestamp:{}]", + err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(), + rpc.request().task_code, + extract_cluster_id_from_timetag(rpc.request().timetag), + extract_timestamp_from_timetag(rpc.request().timetag)); + } + } else { + _shipped_ops->increment(); + _total_shipped_size += + rpc.dsn_request()->header->body_length + rpc.dsn_request()->header->hdr_length; + } + + auto hash = static_cast(rpc.request().hash); + { + dsn::zauto_lock _(_lock); + if (perr != PERR_OK || err != dsn::ERR_OK) { + // retry this rpc + _inflights[hash].push_front(rpc); + _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s); + return; + } + if (_inflights[hash].empty()) { + _inflights.erase(hash); + if (_inflights.empty()) { + // move forward to the next step. + cb(_total_shipped_size); + } + } else { + // start next rpc immediately + _env.schedule([hash, cb, this]() { send(hash, cb); }); + return; + } + } +} + +void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) +{ + _total_shipped_size = 0; + + for (auto mut : muts) { + uint64_t timestamp = std::get<0>(mut); + dsn::task_code rpc_code = std::get<1>(mut); + dsn::blob data = std::get<2>(mut); + uint64_t hash; + + // a mutation must be a write + dsn::task_spec *task = dsn::task_spec::get(rpc_code); + dassert_replica(task != nullptr && task->rpc_request_is_write_operation, + "invalid rpc type({})", + rpc_code); + + // extract the rpc wrapped inside if this is a DUPLICATE rpc + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { + dsn::apps::duplicate_request dreq; + dsn::from_blob_to_thrift(data, dreq); + + auto timetag = static_cast(dreq.timetag); + uint8_t from_cluster_id = extract_cluster_id_from_timetag(timetag); + if (from_cluster_id == _remote_cluster_id) { + // ignore this mutation to prevent infinite duplication loop. + continue; + } + if (!dsn::replication::is_cluster_id_configured(from_cluster_id)) { + derror_replica( + "illegal duplicate request [from_cluster_id: {}, remote_cluster_id:{}]", + from_cluster_id, + _remote_cluster_id); + continue; + } + + hash = static_cast(dreq.hash); + data = std::move(dreq.raw_message); + rpc_code = dreq.task_code; + timestamp = extract_timestamp_from_timetag(timetag); + } else { + hash = get_hash_from_request(rpc_code, data); + } + + auto dreq = dsn::make_unique(); + dreq->task_code = rpc_code; + dreq->hash = hash; + dreq->raw_message = std::move(data); + dreq->timetag = + generate_timetag(timestamp, get_current_cluster_id(), is_delete_operation(rpc_code)); + duplicate_rpc rpc(std::move(dreq), + dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + 10_s, // TODO(wutao1): configurable timeout. + hash); + _inflights[hash].push_back(std::move(rpc)); + } + + if (_inflights.empty()) { + cb(0); + return; + } + auto inflights = _inflights; + for (const auto &kv : inflights) { + send(kv.first, cb); + } +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h new file mode 100644 index 0000000000..0144308136 --- /dev/null +++ b/src/server/pegasus_mutation_duplicator.h @@ -0,0 +1,60 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include + +#include "client_lib/pegasus_client_factory_impl.h" + +namespace pegasus { +namespace server { + +using namespace dsn::literals::chrono_literals; + +// Duplicates the loaded mutations to the remote pegasus cluster. +class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator +{ + using mutation_tuple_set = dsn::replication::mutation_tuple_set; + using mutation_tuple = dsn::replication::mutation_tuple; + using duplicate_rpc = dsn::apps::duplicate_rpc; + +public: + pegasus_mutation_duplicator(dsn::replication::replica_base *r, + dsn::string_view remote_cluster, + dsn::string_view app); + + void duplicate(mutation_tuple_set muts, callback cb) override; + + ~pegasus_mutation_duplicator() override { _env.__conf.tracker->wait_outstanding_tasks(); } + +private: + void send(uint64_t hash, callback cb); + + void on_duplicate_reply(callback, duplicate_rpc, uint64_t start_ns, dsn::error_code err); + +private: + friend class pegasus_mutation_duplicator_test; + + client::pegasus_client_impl *_client; + + uint8_t _remote_cluster_id{0}; + std::string _remote_cluster; + + // hash -> duplicate_rpc + std::map> _inflights; + dsn::zlock _lock; + + size_t _total_shipped_size{0}; + + dsn::perf_counter_wrapper _shipped_ops; + dsn::perf_counter_wrapper _failed_shipping_ops; +}; + +extern uint64_t get_hash_from_request(dsn::task_code rpc_code, const dsn::blob &request_data); + +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 6f48cffa0f..284ad23195 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -6,6 +6,7 @@ #include #include +#include #include "base/pegasus_value_schema.h" #include "base/pegasus_utils.h" @@ -14,6 +15,14 @@ namespace pegasus { namespace server { +inline uint8_t get_current_cluster_id() +{ + static const uint8_t cluster_id = + dsn::replication::get_duplication_cluster_id(dsn::replication::get_current_cluster_name()) + .get_value(); + return cluster_id; +} + class pegasus_server_impl; class capacity_unit_calculator; diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp new file mode 100644 index 0000000000..a829dad4b5 --- /dev/null +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -0,0 +1,383 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "server/pegasus_mutation_duplicator.h" +#include "base/pegasus_rpc_types.h" +#include "pegasus_server_test_base.h" + +#include +#include +#include +#include + +namespace pegasus { +namespace server { + +using namespace dsn::replication; + +class pegasus_mutation_duplicator_test : public pegasus_server_test_base +{ + dsn::task_tracker _tracker; + dsn::pipeline::environment _env; + +public: + pegasus_mutation_duplicator_test() + { + _env.thread_pool(LPC_REPLICATION_LOW).task_tracker(&_tracker); + } + + void test_duplicate() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 100; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + size_t total_shipped_size = 0; + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + size_t total_size = 100; + while (total_size > 0) { + // ensure mutations having the same hash are sending sequentially. + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + + total_size--; + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), total_size); + + auto rpc = duplicate_rpc::mail_box().back(); + duplicate_rpc::mail_box().pop_back(); + + total_shipped_size += + rpc.dsn_request()->body_size() + rpc.dsn_request()->header->hdr_length; + duplicator_impl->on_duplicate_reply( + [total_shipped_size](size_t final_size) { + ASSERT_EQ(total_shipped_size, final_size); + }, + rpc, + dsn_now_ns(), + dsn::ERR_OK); + + // schedule next round + _tracker.wait_outstanding_tasks(); + } + + ASSERT_EQ(duplicator_impl->_total_shipped_size, total_shipped_size); + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + } + } + + void test_duplicate_failed() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 10; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + auto rpc = duplicate_rpc::mail_box().back(); + duplicate_rpc::mail_box().pop_back(); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + + // failed + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_TIMEOUT); + + // schedule next round + _tracker.wait_outstanding_tasks(); + + // retry infinitely + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + + // with other error + rpc.response().error = PERR_INVALID_ARGUMENT; + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_OK); + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + + // with other error + rpc.response().error = PERR_OK; + duplicator_impl->on_duplicate_reply( + [](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_IO_PENDING); + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + } + } + + void test_duplicate_isolated_hashkeys() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + size_t total_size = 3000; + mutation_tuple_set muts; + for (uint64_t i = 0; i < total_size; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key( + request.key, std::string("hash") + std::to_string(i), std::string("sort")); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, code); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + // ensure each bucket has only 1 request and each request is + // isolated with others. + ASSERT_EQ(duplicator_impl->_inflights.size(), total_size); + ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size); + for (const auto &ents : duplicator_impl->_inflights) { + ASSERT_EQ(ents.second.size(), 0); + } + + // reply with success + auto rpc_list = std::move(duplicate_rpc::mail_box()); + for (const auto &rpc : rpc_list) { + rpc.response().error = dsn::ERR_OK; + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_OK); + } + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + } + } + + void test_all_mutations_are_duplicated_from_master() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 3000; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_DUPLICATE; + dsn::apps::duplicate_request request; + + dsn::apps::update_request duplicated_request; + pegasus::pegasus_generate_key(duplicated_request.key, + std::string("hash") + std::to_string(i), + std::string("sort")); + request.timetag = generate_timetag(100, 2, false); // master(onebox2)'s cluster_id = 2 + request.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + request.hash = pegasus_key_hash(duplicated_request.key); + + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); + request.raw_message = dsn::move_message_to_blob(msg.get()); + + msg = dsn::from_thrift_request_to_received_message(request, code); + auto data = dsn::move_message_to_blob(msg.get()); + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t total) { ASSERT_EQ(total, 0); }); + + // all mutation duplicated from master will not be duplicated + // again to prevent infinite loop. + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + ASSERT_EQ(duplicator_impl->_total_shipped_size, 0); + } + } + + void test_mutation_is_duplicated_from_nowhere() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 3000; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_DUPLICATE; + dsn::apps::duplicate_request request; + + dsn::apps::update_request duplicated_request; + pegasus::pegasus_generate_key(duplicated_request.key, + std::string("hash") + std::to_string(i), + std::string("sort")); + request.timetag = generate_timetag(100, 130, false); // cluster_id=130(nowhere) + request.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + request.hash = pegasus_key_hash(duplicated_request.key); + + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); + request.raw_message = dsn::move_message_to_blob(msg.get()); + + msg = dsn::from_thrift_request_to_received_message(request, code); + auto data = dsn::move_message_to_blob(msg.get()); + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t total) { ASSERT_EQ(total, 0); }); + + // ignore those mutations that are duplicated from nowhere + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + ASSERT_EQ(duplicator_impl->_total_shipped_size, 0); + } + } + + void test_create_duplicator() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + auto duplicator_impl = dynamic_cast(duplicator.get()); + ASSERT_EQ(duplicator_impl->_remote_cluster_id, 2); + ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2"); + ASSERT_EQ(get_current_cluster_id(), 1); + } +}; + +TEST_F(pegasus_mutation_duplicator_test, get_hash_from_request) +{ + std::string hash_key("hash"); + std::string sort_key("sort"); + uint64_t hash = + pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length())); + + { + dsn::apps::multi_put_request request; + request.hash_key.assign(hash_key.data(), 0, hash_key.length()); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message( + request, dsn::apps::RPC_RRDB_RRDB_MULTI_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, data)); + } + + { + dsn::apps::multi_remove_request request; + request.hash_key.assign(hash_key.data(), 0, hash_key.length()); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message( + request, dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE); + + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, data)); + } + + { + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data)); + } + + { + dsn::blob key; + pegasus::pegasus_generate_key(key, hash_key, sort_key); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(key, dsn::apps::RPC_RRDB_RRDB_REMOVE); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_REMOVE, data)); + } +} + +// Verifies that calls on `get_hash_key_from_request` won't make +// message unable to read. (if `get_hash_key_from_request` doesn't +// use copy the message internally, it will.) +TEST_F(pegasus_mutation_duplicator_test, read_after_get_hash_key) +{ + std::string hash_key("hash"); + std::string sort_key("sort"); + uint64_t hash = + pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length())); + + dsn::message_ex *msg; + { + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key); + msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + } + auto data = dsn::move_message_to_blob(msg); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data)); + + pegasus::put_rpc rpc(msg); + dsn::blob raw_key; + pegasus::pegasus_generate_key(raw_key, hash_key, sort_key); + ASSERT_EQ(rpc.request().key.to_string(), raw_key.to_string()); +} + +TEST_F(pegasus_mutation_duplicator_test, duplicate) { test_duplicate(); } + +TEST_F(pegasus_mutation_duplicator_test, duplicate_failed) { test_duplicate_failed(); } + +TEST_F(pegasus_mutation_duplicator_test, duplicate_isolated_hashkeys) +{ + test_duplicate_isolated_hashkeys(); +} + +TEST_F(pegasus_mutation_duplicator_test, all_mutations_are_duplicated_from_master) +{ + test_all_mutations_are_duplicated_from_master(); +} + +TEST_F(pegasus_mutation_duplicator_test, create_duplicator) { test_create_duplicator(); } + +TEST_F(pegasus_mutation_duplicator_test, mutation_is_duplicated_from_nowhere) +{ + test_mutation_is_duplicated_from_nowhere(); +} + +} // namespace server +} // namespace pegasus From b9233ef64454b899e0cc13ccce9283f4c83b2d09 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 24 Sep 2019 09:28:45 +0800 Subject: [PATCH 04/17] add idl --- src/idl/rrdb.thrift | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 07f55ba94b..e8ea98cad2 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -251,6 +251,19 @@ struct scan_response 6:string server; } +struct duplicate_request +{ + 1:i64 timetag; + 2:dsn.task_code task_code; + 3:dsn.blob raw_message; + 4:i64 hash; +} + +struct duplicate_response +{ + 1:i32 error; +} + service rrdb { update_response put(1:update_request update); From 7eecda5132ae933ef7b73363b777c18cc27bf4ac Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 24 Sep 2019 09:58:01 +0800 Subject: [PATCH 05/17] add value schema test --- src/base/test/pegasus_value_schema_test.cpp | 47 +++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 src/base/test/pegasus_value_schema_test.cpp diff --git a/src/base/test/pegasus_value_schema_test.cpp b/src/base/test/pegasus_value_schema_test.cpp new file mode 100644 index 0000000000..c6d83b47bd --- /dev/null +++ b/src/base/test/pegasus_value_schema_test.cpp @@ -0,0 +1,47 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "base/pegasus_value_schema.h" + +#include + +using namespace pegasus; + +TEST(value_schema, generate_and_extract_timetag) +{ + struct test_case + { + uint64_t timestamp; + bool delete_tag; + + uint64_t wtimestamp; + } tests[] = { + {1000, true, 1000}, + {1000, false, 1000}, + + {std::numeric_limits::max() >> 8, + true, + std::numeric_limits::max() >> 8}, + + {std::numeric_limits::max() >> 8, + false, + std::numeric_limits::max() >> 8}, + + {std::numeric_limits::max(), false, std::numeric_limits::max() >> 8}, + + {std::numeric_limits::max(), false, std::numeric_limits::max() >> 8}, + + // Wed, 12 Dec 2018 09:48:48 GMT + {1544583472297055, false, 1544583472297055}, + }; + + for (auto &t : tests) { + for (uint8_t cluster_id = 1; cluster_id <= 0x7F; cluster_id++) { + uint64_t timetag = generate_timetag(t.timestamp, cluster_id, t.delete_tag); + ASSERT_EQ(cluster_id, extract_cluster_id_from_timetag(timetag)); + ASSERT_EQ(t.wtimestamp, extract_timestamp_from_timetag(timetag)); + } + } +} + From 60b4d03a8c175afe307decf06ccef1610be65b1d Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 24 Sep 2019 12:59:28 +0800 Subject: [PATCH 06/17] fix travis --- src/base/test/pegasus_value_schema_test.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/base/test/pegasus_value_schema_test.cpp b/src/base/test/pegasus_value_schema_test.cpp index c6d83b47bd..28db232c69 100644 --- a/src/base/test/pegasus_value_schema_test.cpp +++ b/src/base/test/pegasus_value_schema_test.cpp @@ -44,4 +44,3 @@ TEST(value_schema, generate_and_extract_timetag) } } } - From 9c1b474442d537a6f3825308d73943f920704ff1 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 24 Sep 2019 16:17:03 +0800 Subject: [PATCH 07/17] fix travis --- src/base/rrdb_types.cpp | 253 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index 441b1f9d9d..ba376a8c8b 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4254,5 +4254,258 @@ void scan_response::printTo(std::ostream &out) const << "server=" << to_string(server); out << ")"; } + +duplicate_request::~duplicate_request() throw() {} + +void duplicate_request::__set_timetag(const int64_t val) { this->timetag = val; } + +void duplicate_request::__set_task_code(const ::dsn::task_code &val) { this->task_code = val; } + +void duplicate_request::__set_raw_message(const ::dsn::blob &val) { this->raw_message = val; } + +void duplicate_request::__set_hash(const int64_t val) { this->hash = val; } + +uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->timetag); + this->__isset.timetag = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->task_code.read(iprot); + this->__isset.task_code = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->raw_message.read(iprot); + this->__isset.raw_message = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->hash); + this->__isset.hash = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("duplicate_request"); + + xfer += oprot->writeFieldBegin("timetag", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->timetag); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("task_code", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->task_code.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("raw_message", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->raw_message.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("hash", ::apache::thrift::protocol::T_I64, 4); + xfer += oprot->writeI64(this->hash); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(duplicate_request &a, duplicate_request &b) +{ + using ::std::swap; + swap(a.timetag, b.timetag); + swap(a.task_code, b.task_code); + swap(a.raw_message, b.raw_message); + swap(a.hash, b.hash); + swap(a.__isset, b.__isset); +} + +duplicate_request::duplicate_request(const duplicate_request &other126) +{ + timetag = other126.timetag; + task_code = other126.task_code; + raw_message = other126.raw_message; + hash = other126.hash; + __isset = other126.__isset; +} +duplicate_request::duplicate_request(duplicate_request &&other127) +{ + timetag = std::move(other127.timetag); + task_code = std::move(other127.task_code); + raw_message = std::move(other127.raw_message); + hash = std::move(other127.hash); + __isset = std::move(other127.__isset); +} +duplicate_request &duplicate_request::operator=(const duplicate_request &other128) +{ + timetag = other128.timetag; + task_code = other128.task_code; + raw_message = other128.raw_message; + hash = other128.hash; + __isset = other128.__isset; + return *this; +} +duplicate_request &duplicate_request::operator=(duplicate_request &&other129) +{ + timetag = std::move(other129.timetag); + task_code = std::move(other129.task_code); + raw_message = std::move(other129.raw_message); + hash = std::move(other129.hash); + __isset = std::move(other129.__isset); + return *this; +} +void duplicate_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "duplicate_request("; + out << "timetag=" << to_string(timetag); + out << ", " + << "task_code=" << to_string(task_code); + out << ", " + << "raw_message=" << to_string(raw_message); + out << ", " + << "hash=" << to_string(hash); + out << ")"; +} + +duplicate_response::~duplicate_response() throw() {} + +void duplicate_response::__set_error(const int32_t val) { this->error = val; } + +uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->error); + this->__isset.error = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t duplicate_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("duplicate_response"); + + xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(this->error); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(duplicate_response &a, duplicate_response &b) +{ + using ::std::swap; + swap(a.error, b.error); + swap(a.__isset, b.__isset); +} + +duplicate_response::duplicate_response(const duplicate_response &other130) +{ + error = other130.error; + __isset = other130.__isset; +} +duplicate_response::duplicate_response(duplicate_response &&other131) +{ + error = std::move(other131.error); + __isset = std::move(other131.__isset); +} +duplicate_response &duplicate_response::operator=(const duplicate_response &other132) +{ + error = other132.error; + __isset = other132.__isset; + return *this; +} +duplicate_response &duplicate_response::operator=(duplicate_response &&other133) +{ + error = std::move(other133.error); + __isset = std::move(other133.__isset); + return *this; +} +void duplicate_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "duplicate_response("; + out << "error=" << to_string(error); + out << ")"; +} } } // namespace From b18ef59ec5c189137db251dad301f4be7726e9c4 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 26 Sep 2019 17:02:30 +0800 Subject: [PATCH 08/17] fix travis --- src/client_lib/pegasus_client_impl.cpp | 7 +++++++ src/server/test/CMakeLists.txt | 1 + src/server/test/config.ini | 6 ++++++ 3 files changed, 14 insertions(+) diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp index fcf7b2a33d..f866a90eb1 100644 --- a/src/client_lib/pegasus_client_impl.cpp +++ b/src/client_lib/pegasus_client_impl.cpp @@ -1244,6 +1244,13 @@ int pegasus_client_impl::get_unordered_scanners(int max_split_count, return ret; } +void pegasus_client_impl::async_duplicate(dsn::apps::duplicate_rpc rpc, + std::function &&callback, + dsn::task_tracker *tracker) +{ + _client->duplicate(rpc, std::move(callback), tracker); +} + const char *pegasus_client_impl::get_error_string(int error_code) const { auto it = _client_error_to_string.find(error_code); diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 5dc8e3cc0d..df5f5a73f9 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -6,6 +6,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../pegasus_write_service.cpp" "../pegasus_server_write.cpp" "../capacity_unit_calculator.cpp" + "../pegasus_mutation_duplicator.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB") diff --git a/src/server/test/config.ini b/src/server/test/config.ini index a00fe5c9fa..391319eea2 100644 --- a/src/server/test/config.ini +++ b/src/server/test/config.ini @@ -125,6 +125,7 @@ stateful = true [replication] data_dirs_black_list_file = /home/mi/.pegasus_data_dirs_black_list +cluster_name = onebox deny_client_on_start = false delay_for_fd_timeout_on_start = false @@ -497,5 +498,10 @@ profiler::cancelled = false [meta_server] server_list = 0.0.0.0:34701 +[duplication-group] +onebox = 1 +onebox2 = 2 + [pegasus.clusters] onebox = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703 +onebox2 = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703 From bfbb911be68db561b802a8ee7b964b8e966e2b70 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 30 Sep 2019 14:23:28 +0800 Subject: [PATCH 09/17] fix codereview --- src/base/pegasus_value_schema.h | 1 - src/server/pegasus_mutation_duplicator.cpp | 7 ------- 2 files changed, 8 deletions(-) diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h index d5ac9eeece..3a056b8fc9 100644 --- a/src/base/pegasus_value_schema.h +++ b/src/base/pegasus_value_schema.h @@ -22,7 +22,6 @@ namespace pegasus { #define PEGASUS_DATA_VERSION_MAX 0u /// Generates timetag in host endian. -/// \see comment on pegasus_value_generator::generate_value_v1 inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag) { return timestamp << 8 | cluster_id << 1 | delete_tag; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 04e91b5f9b..29e9dc5164 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -195,13 +195,6 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb // ignore this mutation to prevent infinite duplication loop. continue; } - if (!dsn::replication::is_cluster_id_configured(from_cluster_id)) { - derror_replica( - "illegal duplicate request [from_cluster_id: {}, remote_cluster_id:{}]", - from_cluster_id, - _remote_cluster_id); - continue; - } hash = static_cast(dreq.hash); data = std::move(dreq.raw_message); From 602e4d5d5e992d12ce388a1b458faf5e1e7c7833 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 30 Sep 2019 14:24:56 +0800 Subject: [PATCH 10/17] delete CMakeLists.txt --- CMakeLists.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt deleted file mode 100644 index e69de29bb2..0000000000 From a576cf3d702e0c287aed64c441b17d204168a953 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 30 Dec 2019 16:27:44 +0800 Subject: [PATCH 11/17] add comments --- src/base/rrdb_types.cpp | 72 ++++++++++++++-------- src/idl/dsn.thrift | 4 ++ src/idl/rrdb.thrift | 8 +-- src/include/rrdb/rrdb_types.h | 16 +++-- src/server/pegasus_mutation_duplicator.cpp | 13 ++-- src/server/pegasus_mutation_duplicator.h | 12 ++-- 6 files changed, 81 insertions(+), 44 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index ba376a8c8b..67d0b979da 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4257,13 +4257,29 @@ void scan_response::printTo(std::ostream &out) const duplicate_request::~duplicate_request() throw() {} -void duplicate_request::__set_timetag(const int64_t val) { this->timetag = val; } +void duplicate_request::__set_timetag(const int64_t val) +{ + this->timetag = val; + __isset.timetag = true; +} -void duplicate_request::__set_task_code(const ::dsn::task_code &val) { this->task_code = val; } +void duplicate_request::__set_task_code(const ::dsn::task_code &val) +{ + this->task_code = val; + __isset.task_code = true; +} -void duplicate_request::__set_raw_message(const ::dsn::blob &val) { this->raw_message = val; } +void duplicate_request::__set_raw_message(const ::dsn::blob &val) +{ + this->raw_message = val; + __isset.raw_message = true; +} -void duplicate_request::__set_hash(const int64_t val) { this->hash = val; } +void duplicate_request::__set_hash(const int64_t val) +{ + this->hash = val; + __isset.hash = true; +} uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4334,22 +4350,26 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("duplicate_request"); - xfer += oprot->writeFieldBegin("timetag", ::apache::thrift::protocol::T_I64, 1); - xfer += oprot->writeI64(this->timetag); - xfer += oprot->writeFieldEnd(); - - xfer += oprot->writeFieldBegin("task_code", ::apache::thrift::protocol::T_STRUCT, 2); - xfer += this->task_code.write(oprot); - xfer += oprot->writeFieldEnd(); - - xfer += oprot->writeFieldBegin("raw_message", ::apache::thrift::protocol::T_STRUCT, 3); - xfer += this->raw_message.write(oprot); - xfer += oprot->writeFieldEnd(); - - xfer += oprot->writeFieldBegin("hash", ::apache::thrift::protocol::T_I64, 4); - xfer += oprot->writeI64(this->hash); - xfer += oprot->writeFieldEnd(); - + if (this->__isset.timetag) { + xfer += oprot->writeFieldBegin("timetag", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->timetag); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.task_code) { + xfer += oprot->writeFieldBegin("task_code", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->task_code.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.raw_message) { + xfer += oprot->writeFieldBegin("raw_message", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->raw_message.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.hash) { + xfer += oprot->writeFieldBegin("hash", ::apache::thrift::protocol::T_I64, 4); + xfer += oprot->writeI64(this->hash); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4403,13 +4423,17 @@ void duplicate_request::printTo(std::ostream &out) const { using ::apache::thrift::to_string; out << "duplicate_request("; - out << "timetag=" << to_string(timetag); + out << "timetag="; + (__isset.timetag ? (out << to_string(timetag)) : (out << "")); out << ", " - << "task_code=" << to_string(task_code); + << "task_code="; + (__isset.task_code ? (out << to_string(task_code)) : (out << "")); out << ", " - << "raw_message=" << to_string(raw_message); + << "raw_message="; + (__isset.raw_message ? (out << to_string(raw_message)) : (out << "")); out << ", " - << "hash=" << to_string(hash); + << "hash="; + (__isset.hash ? (out << to_string(hash)) : (out << "")); out << ")"; } diff --git a/src/idl/dsn.thrift b/src/idl/dsn.thrift index 626e57bca0..aefba84d86 100644 --- a/src/idl/dsn.thrift +++ b/src/idl/dsn.thrift @@ -4,3 +4,7 @@ namespace cpp dsn struct blob { } + +struct task_code +{ +} diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index e8ea98cad2..5e6dadebc6 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -253,10 +253,10 @@ struct scan_response struct duplicate_request { - 1:i64 timetag; - 2:dsn.task_code task_code; - 3:dsn.blob raw_message; - 4:i64 hash; + 1: optional i64 timetag + 2: optional dsn.task_code task_code + 3: optional dsn.blob raw_message + 4: optional i64 hash } struct duplicate_response diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index e04d524361..55aaa10b54 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1834,13 +1834,21 @@ class duplicate_request bool operator==(const duplicate_request &rhs) const { - if (!(timetag == rhs.timetag)) + if (__isset.timetag != rhs.__isset.timetag) return false; - if (!(task_code == rhs.task_code)) + else if (__isset.timetag && !(timetag == rhs.timetag)) return false; - if (!(raw_message == rhs.raw_message)) + if (__isset.task_code != rhs.__isset.task_code) return false; - if (!(hash == rhs.hash)) + else if (__isset.task_code && !(task_code == rhs.task_code)) + return false; + if (__isset.raw_message != rhs.__isset.raw_message) + return false; + else if (__isset.raw_message && !(raw_message == rhs.raw_message)) + return false; + if (__isset.hash != rhs.__isset.hash) + return false; + else if (__isset.hash && !(hash == rhs.hash)) return false; return true; } diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 29e9dc5164..d875c38c62 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -60,7 +60,8 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli dsn::string_view app) : mutation_duplicator(r), _remote_cluster(remote_cluster) { - static bool _dummy = pegasus_client_factory::initialize(nullptr); + // initialize pegasus-client when this class is first time used. + static __attribute__((unused)) bool _dummy = pegasus_client_factory::initialize(nullptr); pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data()); _client = static_cast(client); @@ -69,7 +70,7 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli dassert_replica(ret.is_ok(), // never possible, meta server disallows such remote_cluster. "invalid remote cluster: {}, err_ret: {}", remote_cluster, - ret.get_error().description()); + ret.get_error()); _remote_cluster_id = static_cast(ret.get_value()); ddebug_replica("initialize mutation duplicator for local cluster [id:{}], " @@ -77,6 +78,8 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli get_current_cluster_id(), _remote_cluster_id, remote_cluster); + + // never possible to duplicate data to itself dassert_replica(get_current_cluster_id() != _remote_cluster_id, "invalid remote cluster: {} {}", remote_cluster, @@ -178,12 +181,6 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb dsn::blob data = std::get<2>(mut); uint64_t hash; - // a mutation must be a write - dsn::task_spec *task = dsn::task_spec::get(rpc_code); - dassert_replica(task != nullptr && task->rpc_request_is_write_operation, - "invalid rpc type({})", - rpc_code); - // extract the rpc wrapped inside if this is a DUPLICATE rpc if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { dsn::apps::duplicate_request dreq; diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 0144308136..7ef1a140c5 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -15,7 +15,7 @@ namespace server { using namespace dsn::literals::chrono_literals; -// Duplicates the loaded mutations to the remote pegasus cluster. +// Duplicates the loaded mutations to the remote pegasus cluster using pegasus client. class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator { using mutation_tuple_set = dsn::replication::mutation_tuple_set; @@ -39,13 +39,15 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator private: friend class pegasus_mutation_duplicator_test; - client::pegasus_client_impl *_client; + client::pegasus_client_impl *_client{nullptr}; uint8_t _remote_cluster_id{0}; std::string _remote_cluster; - // hash -> duplicate_rpc - std::map> _inflights; + // The duplicate_rpc are isolated by their hash value from hash key. + // Writes with the same hash are duplicated in mutation order to preserve data consistency, + // otherwise they are duplicated concurrently to improve performance. + std::map> _inflights; // hash -> duplicate_rpc dsn::zlock _lock; size_t _total_shipped_size{0}; @@ -54,6 +56,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator dsn::perf_counter_wrapper _failed_shipping_ops; }; +// Decodes the binary `request_data` into write request in thrift struct, and +// calculates the hash value from the write's hash key. extern uint64_t get_hash_from_request(dsn::task_code rpc_code, const dsn::blob &request_data); } // namespace server From c26e198ee690fbdfb945e95842c50827d017a036 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 30 Dec 2019 19:59:59 +0800 Subject: [PATCH 12/17] refine --- src/base/rrdb_types.cpp | 143 ++++++++++++------ src/idl/rrdb.thrift | 16 +- src/include/rrdb/rrdb_types.h | 27 +++- src/server/pegasus_mutation_duplicator.cpp | 70 +++++---- src/server/pegasus_mutation_duplicator.h | 2 +- .../test/pegasus_mutation_duplicator_test.cpp | 28 ++-- 6 files changed, 180 insertions(+), 106 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index 67d0b979da..b584ac2d73 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4257,10 +4257,10 @@ void scan_response::printTo(std::ostream &out) const duplicate_request::~duplicate_request() throw() {} -void duplicate_request::__set_timetag(const int64_t val) +void duplicate_request::__set_timestamp(const int64_t val) { - this->timetag = val; - __isset.timetag = true; + this->timestamp = val; + __isset.timestamp = true; } void duplicate_request::__set_task_code(const ::dsn::task_code &val) @@ -4281,6 +4281,12 @@ void duplicate_request::__set_hash(const int64_t val) __isset.hash = true; } +void duplicate_request::__set_from_clusters_set(const std::set &val) +{ + this->from_clusters_set = val; + __isset.from_clusters_set = true; +} + uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4302,8 +4308,8 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) switch (fid) { case 1: if (ftype == ::apache::thrift::protocol::T_I64) { - xfer += iprot->readI64(this->timetag); - this->__isset.timetag = true; + xfer += iprot->readI64(this->timestamp); + this->__isset.timestamp = true; } else { xfer += iprot->skip(ftype); } @@ -4332,6 +4338,26 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_SET) { + { + this->from_clusters_set.clear(); + uint32_t _size126; + ::apache::thrift::protocol::TType _etype129; + xfer += iprot->readSetBegin(_etype129, _size126); + uint32_t _i130; + for (_i130 = 0; _i130 < _size126; ++_i130) { + int8_t _elem131; + xfer += iprot->readByte(_elem131); + this->from_clusters_set.insert(_elem131); + } + xfer += iprot->readSetEnd(); + } + this->__isset.from_clusters_set = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -4350,9 +4376,9 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("duplicate_request"); - if (this->__isset.timetag) { - xfer += oprot->writeFieldBegin("timetag", ::apache::thrift::protocol::T_I64, 1); - xfer += oprot->writeI64(this->timetag); + if (this->__isset.timestamp) { + xfer += oprot->writeFieldBegin("timestamp", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->timestamp); xfer += oprot->writeFieldEnd(); } if (this->__isset.task_code) { @@ -4370,6 +4396,21 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeI64(this->hash); xfer += oprot->writeFieldEnd(); } + if (this->__isset.from_clusters_set) { + xfer += oprot->writeFieldBegin("from_clusters_set", ::apache::thrift::protocol::T_SET, 5); + { + xfer += oprot->writeSetBegin(::apache::thrift::protocol::T_BYTE, + static_cast(this->from_clusters_set.size())); + std::set::const_iterator _iter132; + for (_iter132 = this->from_clusters_set.begin(); + _iter132 != this->from_clusters_set.end(); + ++_iter132) { + xfer += oprot->writeByte((*_iter132)); + } + xfer += oprot->writeSetEnd(); + } + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4378,53 +4419,58 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) void swap(duplicate_request &a, duplicate_request &b) { using ::std::swap; - swap(a.timetag, b.timetag); + swap(a.timestamp, b.timestamp); swap(a.task_code, b.task_code); swap(a.raw_message, b.raw_message); swap(a.hash, b.hash); + swap(a.from_clusters_set, b.from_clusters_set); swap(a.__isset, b.__isset); } -duplicate_request::duplicate_request(const duplicate_request &other126) +duplicate_request::duplicate_request(const duplicate_request &other133) { - timetag = other126.timetag; - task_code = other126.task_code; - raw_message = other126.raw_message; - hash = other126.hash; - __isset = other126.__isset; + timestamp = other133.timestamp; + task_code = other133.task_code; + raw_message = other133.raw_message; + hash = other133.hash; + from_clusters_set = other133.from_clusters_set; + __isset = other133.__isset; } -duplicate_request::duplicate_request(duplicate_request &&other127) +duplicate_request::duplicate_request(duplicate_request &&other134) { - timetag = std::move(other127.timetag); - task_code = std::move(other127.task_code); - raw_message = std::move(other127.raw_message); - hash = std::move(other127.hash); - __isset = std::move(other127.__isset); + timestamp = std::move(other134.timestamp); + task_code = std::move(other134.task_code); + raw_message = std::move(other134.raw_message); + hash = std::move(other134.hash); + from_clusters_set = std::move(other134.from_clusters_set); + __isset = std::move(other134.__isset); } -duplicate_request &duplicate_request::operator=(const duplicate_request &other128) +duplicate_request &duplicate_request::operator=(const duplicate_request &other135) { - timetag = other128.timetag; - task_code = other128.task_code; - raw_message = other128.raw_message; - hash = other128.hash; - __isset = other128.__isset; + timestamp = other135.timestamp; + task_code = other135.task_code; + raw_message = other135.raw_message; + hash = other135.hash; + from_clusters_set = other135.from_clusters_set; + __isset = other135.__isset; return *this; } -duplicate_request &duplicate_request::operator=(duplicate_request &&other129) +duplicate_request &duplicate_request::operator=(duplicate_request &&other136) { - timetag = std::move(other129.timetag); - task_code = std::move(other129.task_code); - raw_message = std::move(other129.raw_message); - hash = std::move(other129.hash); - __isset = std::move(other129.__isset); + timestamp = std::move(other136.timestamp); + task_code = std::move(other136.task_code); + raw_message = std::move(other136.raw_message); + hash = std::move(other136.hash); + from_clusters_set = std::move(other136.from_clusters_set); + __isset = std::move(other136.__isset); return *this; } void duplicate_request::printTo(std::ostream &out) const { using ::apache::thrift::to_string; out << "duplicate_request("; - out << "timetag="; - (__isset.timetag ? (out << to_string(timetag)) : (out << "")); + out << "timestamp="; + (__isset.timestamp ? (out << to_string(timestamp)) : (out << "")); out << ", " << "task_code="; (__isset.task_code ? (out << to_string(task_code)) : (out << "")); @@ -4434,6 +4480,9 @@ void duplicate_request::printTo(std::ostream &out) const out << ", " << "hash="; (__isset.hash ? (out << to_string(hash)) : (out << "")); + out << ", " + << "from_clusters_set="; + (__isset.from_clusters_set ? (out << to_string(from_clusters_set)) : (out << "")); out << ")"; } @@ -4502,26 +4551,26 @@ void swap(duplicate_response &a, duplicate_response &b) swap(a.__isset, b.__isset); } -duplicate_response::duplicate_response(const duplicate_response &other130) +duplicate_response::duplicate_response(const duplicate_response &other137) { - error = other130.error; - __isset = other130.__isset; + error = other137.error; + __isset = other137.__isset; } -duplicate_response::duplicate_response(duplicate_response &&other131) +duplicate_response::duplicate_response(duplicate_response &&other138) { - error = std::move(other131.error); - __isset = std::move(other131.__isset); + error = std::move(other138.error); + __isset = std::move(other138.__isset); } -duplicate_response &duplicate_response::operator=(const duplicate_response &other132) +duplicate_response &duplicate_response::operator=(const duplicate_response &other139) { - error = other132.error; - __isset = other132.__isset; + error = other139.error; + __isset = other139.__isset; return *this; } -duplicate_response &duplicate_response::operator=(duplicate_response &&other133) +duplicate_response &duplicate_response::operator=(duplicate_response &&other140) { - error = std::move(other133.error); - __isset = std::move(other133.__isset); + error = std::move(other140.error); + __isset = std::move(other140.__isset); return *this; } void duplicate_response::printTo(std::ostream &out) const diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 5e6dadebc6..7bb684702f 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -253,10 +253,24 @@ struct scan_response struct duplicate_request { - 1: optional i64 timetag + // The timestamp of this write. + 1: optional i64 timestamp + + // The code to identify this write. 2: optional dsn.task_code task_code + + // The binary form of the write. 3: optional dsn.blob raw_message + + // The hash value calculated from the hash key, which is retrieved from `raw_message`. + // This field is to optimize when duplicating a DUPLICATE, the hash calculation can be skipped. 4: optional i64 hash + + // Where the write is from. If the topology is A<->B<->C, a write to A will be then + // duplicated to B and C. For C, because. A and B is in the `from_clusters_set`, the write + // will not be duplicated to them, so that prevents infinite loop. + // The receiver of the write will + 5: optional set from_clusters_set } struct duplicate_response diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 55aaa10b54..7b62859012 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1798,13 +1798,19 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) typedef struct _duplicate_request__isset { - _duplicate_request__isset() : timetag(false), task_code(false), raw_message(false), hash(false) + _duplicate_request__isset() + : timestamp(false), + task_code(false), + raw_message(false), + hash(false), + from_clusters_set(false) { } - bool timetag : 1; + bool timestamp : 1; bool task_code : 1; bool raw_message : 1; bool hash : 1; + bool from_clusters_set : 1; } _duplicate_request__isset; class duplicate_request @@ -1814,17 +1820,18 @@ class duplicate_request duplicate_request(duplicate_request &&); duplicate_request &operator=(const duplicate_request &); duplicate_request &operator=(duplicate_request &&); - duplicate_request() : timetag(0), hash(0) {} + duplicate_request() : timestamp(0), hash(0) {} virtual ~duplicate_request() throw(); - int64_t timetag; + int64_t timestamp; ::dsn::task_code task_code; ::dsn::blob raw_message; int64_t hash; + std::set from_clusters_set; _duplicate_request__isset __isset; - void __set_timetag(const int64_t val); + void __set_timestamp(const int64_t val); void __set_task_code(const ::dsn::task_code &val); @@ -1832,11 +1839,13 @@ class duplicate_request void __set_hash(const int64_t val); + void __set_from_clusters_set(const std::set &val); + bool operator==(const duplicate_request &rhs) const { - if (__isset.timetag != rhs.__isset.timetag) + if (__isset.timestamp != rhs.__isset.timestamp) return false; - else if (__isset.timetag && !(timetag == rhs.timetag)) + else if (__isset.timestamp && !(timestamp == rhs.timestamp)) return false; if (__isset.task_code != rhs.__isset.task_code) return false; @@ -1850,6 +1859,10 @@ class duplicate_request return false; else if (__isset.hash && !(hash == rhs.hash)) return false; + if (__isset.from_clusters_set != rhs.__isset.from_clusters_set) + return false; + else if (__isset.from_clusters_set && !(from_clusters_set == rhs.from_clusters_set)) + return false; return true; } bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index d875c38c62..e35faa65df 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -97,15 +97,8 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli "the qps of failed DUPLICATE requests sent from this app"); } -static bool is_delete_operation(dsn::task_code code) -{ - return code == dsn::apps::RPC_RRDB_RRDB_REMOVE || code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE; -} - void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) { - uint64_t start = dsn_now_ns(); - duplicate_rpc rpc; { dsn::zauto_lock _(_lock); @@ -114,15 +107,14 @@ void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) } _client->async_duplicate(rpc, - [cb, rpc, start, this](dsn::error_code err) mutable { - on_duplicate_reply(std::move(cb), std::move(rpc), start, err); + [cb, rpc, this](dsn::error_code err) mutable { + on_duplicate_reply(std::move(cb), std::move(rpc), err); }, _env.__conf.tracker); } void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callback cb, duplicate_rpc rpc, - uint64_t start_ns, dsn::error_code err) { int perr = PERR_OK; @@ -134,13 +126,13 @@ void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callba if (perr != PERR_OK || err != dsn::ERR_OK) { _failed_shipping_ops->increment(); - // randomly log the 1% of the failed duplicate rpc. + // randomly log the 1% of the failed duplicate rpc, because minor number of + // errors are acceptable. + // TODO(wutao1): print the entire request for future debugging. if (dsn::rand::next_double01() <= 0.01) { - derror_replica("duplicate_rpc failed: {} [code:{}, cluster_id:{}, timestamp:{}]", + derror_replica("duplicate_rpc failed: {} [code:{}, timestamp:{}]", err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(), - rpc.request().task_code, - extract_cluster_id_from_timetag(rpc.request().timetag), - extract_timestamp_from_timetag(rpc.request().timetag)); + rpc.request().timestamp); } } else { _shipped_ops->increment(); @@ -171,42 +163,48 @@ void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callba } } +static bool has_unknown_cluster_id(const std::set &clusters_set) +{ + for (int8_t cid : clusters_set) { + if (!dsn::replication::is_cluster_id_configured(cid)) { + return true; + } + } + return false; +} + void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) { _total_shipped_size = 0; for (auto mut : muts) { - uint64_t timestamp = std::get<0>(mut); + // mut: 0=timestamp, 1=rpc_code, 2=raw_message + dsn::task_code rpc_code = std::get<1>(mut); - dsn::blob data = std::get<2>(mut); - uint64_t hash; + dsn::blob raw_message = std::get<2>(mut); + auto dreq = dsn::make_unique(); // extract the rpc wrapped inside if this is a DUPLICATE rpc if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dsn::apps::duplicate_request dreq; - dsn::from_blob_to_thrift(data, dreq); - - auto timetag = static_cast(dreq.timetag); - uint8_t from_cluster_id = extract_cluster_id_from_timetag(timetag); - if (from_cluster_id == _remote_cluster_id) { + dsn::from_blob_to_thrift(raw_message, *dreq); + if (dreq->from_clusters_set.find(_remote_cluster_id) != dreq->from_clusters_set.end()) { // ignore this mutation to prevent infinite duplication loop. continue; } - - hash = static_cast(dreq.hash); - data = std::move(dreq.raw_message); - rpc_code = dreq.task_code; - timestamp = extract_timestamp_from_timetag(timetag); + if (has_unknown_cluster_id(dreq->from_clusters_set)) { + // if this write is duplicated from nowhere + continue; + } } else { - hash = get_hash_from_request(rpc_code, data); + dreq->__set_hash(get_hash_from_request(rpc_code, raw_message)); + dreq->__set_raw_message(raw_message); + dreq->__set_task_code(rpc_code); + dreq->__set_timestamp(std::get<0>(mut)); + dreq->__isset.from_clusters_set = true; } + dreq->from_clusters_set.insert(get_current_cluster_id()); - auto dreq = dsn::make_unique(); - dreq->task_code = rpc_code; - dreq->hash = hash; - dreq->raw_message = std::move(data); - dreq->timetag = - generate_timetag(timestamp, get_current_cluster_id(), is_delete_operation(rpc_code)); + uint64_t hash = dreq->hash; duplicate_rpc rpc(std::move(dreq), dsn::apps::RPC_RRDB_RRDB_DUPLICATE, 10_s, // TODO(wutao1): configurable timeout. diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 7ef1a140c5..064399b239 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -34,7 +34,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator private: void send(uint64_t hash, callback cb); - void on_duplicate_reply(callback, duplicate_rpc, uint64_t start_ns, dsn::error_code err); + void on_duplicate_reply(callback, duplicate_rpc, dsn::error_code err); private: friend class pegasus_mutation_duplicator_test; diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index a829dad4b5..acb0604e43 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -72,7 +72,6 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base ASSERT_EQ(total_shipped_size, final_size); }, rpc, - dsn_now_ns(), dsn::ERR_OK); // schedule next round @@ -115,7 +114,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); // failed - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_TIMEOUT); + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_TIMEOUT); // schedule next round _tracker.wait_outstanding_tasks(); @@ -128,7 +127,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base // with other error rpc.response().error = PERR_INVALID_ARGUMENT; - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_OK); + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_OK); _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); @@ -137,8 +136,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base // with other error rpc.response().error = PERR_OK; - duplicator_impl->on_duplicate_reply( - [](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_IO_PENDING); + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_IO_PENDING); _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); @@ -185,7 +183,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base auto rpc_list = std::move(duplicate_rpc::mail_box()); for (const auto &rpc : rpc_list) { rpc.response().error = dsn::ERR_OK; - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn_now_ns(), dsn::ERR_OK); + duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_OK); } _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); @@ -209,13 +207,14 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base pegasus::pegasus_generate_key(duplicated_request.key, std::string("hash") + std::to_string(i), std::string("sort")); - request.timetag = generate_timetag(100, 2, false); // master(onebox2)'s cluster_id = 2 - request.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; - request.hash = pegasus_key_hash(duplicated_request.key); + request.__set_timestamp(100); + request.__set_from_clusters_set({2}); // master(onebox2)'s cluster_id = 2 + request.__set_task_code(dsn::apps::RPC_RRDB_RRDB_PUT); + request.__set_hash(pegasus_key_hash(duplicated_request.key)); dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); - request.raw_message = dsn::move_message_to_blob(msg.get()); + request.__set_raw_message(dsn::move_message_to_blob(msg.get())); msg = dsn::from_thrift_request_to_received_message(request, code); auto data = dsn::move_message_to_blob(msg.get()); @@ -251,13 +250,14 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base pegasus::pegasus_generate_key(duplicated_request.key, std::string("hash") + std::to_string(i), std::string("sort")); - request.timetag = generate_timetag(100, 130, false); // cluster_id=130(nowhere) - request.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; - request.hash = pegasus_key_hash(duplicated_request.key); + request.__set_timestamp(100); + request.__set_from_clusters_set({int8_t(130)}); // cluster_id=130(nowhere) + request.__set_task_code(dsn::apps::RPC_RRDB_RRDB_PUT); + request.__set_hash(pegasus_key_hash(duplicated_request.key)); dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); - request.raw_message = dsn::move_message_to_blob(msg.get()); + request.__set_raw_message(dsn::move_message_to_blob(msg.get())); msg = dsn::from_thrift_request_to_received_message(request, code); auto data = dsn::move_message_to_blob(msg.get()); From 2d3d594dbd8a3a2fda4f0e044c10bc08c039de91 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 2 Jan 2020 17:16:32 +0800 Subject: [PATCH 13/17] fix thrift --- src/idl/rrdb.thrift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 7bb684702f..ed2a1147f2 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -267,9 +267,8 @@ struct duplicate_request 4: optional i64 hash // Where the write is from. If the topology is A<->B<->C, a write to A will be then - // duplicated to B and C. For C, because. A and B is in the `from_clusters_set`, the write + // duplicated to B and C. For C, because A and B is in the `from_clusters_set`, the write // will not be duplicated to them, so that prevents infinite loop. - // The receiver of the write will 5: optional set from_clusters_set } From 3fe56a479a43aa105732a41c8948e3ee99b8e12e Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 2 Jan 2020 22:51:10 +0800 Subject: [PATCH 14/17] ignore duplicating duplicate --- src/base/rrdb_types.cpp | 158 +++++------------- src/idl/rrdb.thrift | 11 +- src/include/rrdb/rrdb_types.h | 31 +--- src/server/pegasus_mutation_duplicator.cpp | 34 +--- src/server/pegasus_mutation_duplicator.h | 2 +- .../test/pegasus_mutation_duplicator_test.cpp | 123 ++------------ 6 files changed, 75 insertions(+), 284 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index b584ac2d73..a346fb2614 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4275,18 +4275,6 @@ void duplicate_request::__set_raw_message(const ::dsn::blob &val) __isset.raw_message = true; } -void duplicate_request::__set_hash(const int64_t val) -{ - this->hash = val; - __isset.hash = true; -} - -void duplicate_request::__set_from_clusters_set(const std::set &val) -{ - this->from_clusters_set = val; - __isset.from_clusters_set = true; -} - uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4330,34 +4318,6 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; - case 4: - if (ftype == ::apache::thrift::protocol::T_I64) { - xfer += iprot->readI64(this->hash); - this->__isset.hash = true; - } else { - xfer += iprot->skip(ftype); - } - break; - case 5: - if (ftype == ::apache::thrift::protocol::T_SET) { - { - this->from_clusters_set.clear(); - uint32_t _size126; - ::apache::thrift::protocol::TType _etype129; - xfer += iprot->readSetBegin(_etype129, _size126); - uint32_t _i130; - for (_i130 = 0; _i130 < _size126; ++_i130) { - int8_t _elem131; - xfer += iprot->readByte(_elem131); - this->from_clusters_set.insert(_elem131); - } - xfer += iprot->readSetEnd(); - } - this->__isset.from_clusters_set = true; - } else { - xfer += iprot->skip(ftype); - } - break; default: xfer += iprot->skip(ftype); break; @@ -4391,26 +4351,6 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) xfer += this->raw_message.write(oprot); xfer += oprot->writeFieldEnd(); } - if (this->__isset.hash) { - xfer += oprot->writeFieldBegin("hash", ::apache::thrift::protocol::T_I64, 4); - xfer += oprot->writeI64(this->hash); - xfer += oprot->writeFieldEnd(); - } - if (this->__isset.from_clusters_set) { - xfer += oprot->writeFieldBegin("from_clusters_set", ::apache::thrift::protocol::T_SET, 5); - { - xfer += oprot->writeSetBegin(::apache::thrift::protocol::T_BYTE, - static_cast(this->from_clusters_set.size())); - std::set::const_iterator _iter132; - for (_iter132 = this->from_clusters_set.begin(); - _iter132 != this->from_clusters_set.end(); - ++_iter132) { - xfer += oprot->writeByte((*_iter132)); - } - xfer += oprot->writeSetEnd(); - } - xfer += oprot->writeFieldEnd(); - } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4422,47 +4362,37 @@ void swap(duplicate_request &a, duplicate_request &b) swap(a.timestamp, b.timestamp); swap(a.task_code, b.task_code); swap(a.raw_message, b.raw_message); - swap(a.hash, b.hash); - swap(a.from_clusters_set, b.from_clusters_set); swap(a.__isset, b.__isset); } -duplicate_request::duplicate_request(const duplicate_request &other133) +duplicate_request::duplicate_request(const duplicate_request &other126) { - timestamp = other133.timestamp; - task_code = other133.task_code; - raw_message = other133.raw_message; - hash = other133.hash; - from_clusters_set = other133.from_clusters_set; - __isset = other133.__isset; + timestamp = other126.timestamp; + task_code = other126.task_code; + raw_message = other126.raw_message; + __isset = other126.__isset; } -duplicate_request::duplicate_request(duplicate_request &&other134) +duplicate_request::duplicate_request(duplicate_request &&other127) { - timestamp = std::move(other134.timestamp); - task_code = std::move(other134.task_code); - raw_message = std::move(other134.raw_message); - hash = std::move(other134.hash); - from_clusters_set = std::move(other134.from_clusters_set); - __isset = std::move(other134.__isset); + timestamp = std::move(other127.timestamp); + task_code = std::move(other127.task_code); + raw_message = std::move(other127.raw_message); + __isset = std::move(other127.__isset); } -duplicate_request &duplicate_request::operator=(const duplicate_request &other135) +duplicate_request &duplicate_request::operator=(const duplicate_request &other128) { - timestamp = other135.timestamp; - task_code = other135.task_code; - raw_message = other135.raw_message; - hash = other135.hash; - from_clusters_set = other135.from_clusters_set; - __isset = other135.__isset; + timestamp = other128.timestamp; + task_code = other128.task_code; + raw_message = other128.raw_message; + __isset = other128.__isset; return *this; } -duplicate_request &duplicate_request::operator=(duplicate_request &&other136) +duplicate_request &duplicate_request::operator=(duplicate_request &&other129) { - timestamp = std::move(other136.timestamp); - task_code = std::move(other136.task_code); - raw_message = std::move(other136.raw_message); - hash = std::move(other136.hash); - from_clusters_set = std::move(other136.from_clusters_set); - __isset = std::move(other136.__isset); + timestamp = std::move(other129.timestamp); + task_code = std::move(other129.task_code); + raw_message = std::move(other129.raw_message); + __isset = std::move(other129.__isset); return *this; } void duplicate_request::printTo(std::ostream &out) const @@ -4477,18 +4407,16 @@ void duplicate_request::printTo(std::ostream &out) const out << ", " << "raw_message="; (__isset.raw_message ? (out << to_string(raw_message)) : (out << "")); - out << ", " - << "hash="; - (__isset.hash ? (out << to_string(hash)) : (out << "")); - out << ", " - << "from_clusters_set="; - (__isset.from_clusters_set ? (out << to_string(from_clusters_set)) : (out << "")); out << ")"; } duplicate_response::~duplicate_response() throw() {} -void duplicate_response::__set_error(const int32_t val) { this->error = val; } +void duplicate_response::__set_error(const int32_t val) +{ + this->error = val; + __isset.error = true; +} uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4535,10 +4463,11 @@ uint32_t duplicate_response::write(::apache::thrift::protocol::TProtocol *oprot) apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("duplicate_response"); - xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1); - xfer += oprot->writeI32(this->error); - xfer += oprot->writeFieldEnd(); - + if (this->__isset.error) { + xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(this->error); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4551,33 +4480,34 @@ void swap(duplicate_response &a, duplicate_response &b) swap(a.__isset, b.__isset); } -duplicate_response::duplicate_response(const duplicate_response &other137) +duplicate_response::duplicate_response(const duplicate_response &other130) { - error = other137.error; - __isset = other137.__isset; + error = other130.error; + __isset = other130.__isset; } -duplicate_response::duplicate_response(duplicate_response &&other138) +duplicate_response::duplicate_response(duplicate_response &&other131) { - error = std::move(other138.error); - __isset = std::move(other138.__isset); + error = std::move(other131.error); + __isset = std::move(other131.__isset); } -duplicate_response &duplicate_response::operator=(const duplicate_response &other139) +duplicate_response &duplicate_response::operator=(const duplicate_response &other132) { - error = other139.error; - __isset = other139.__isset; + error = other132.error; + __isset = other132.__isset; return *this; } -duplicate_response &duplicate_response::operator=(duplicate_response &&other140) +duplicate_response &duplicate_response::operator=(duplicate_response &&other133) { - error = std::move(other140.error); - __isset = std::move(other140.__isset); + error = std::move(other133.error); + __isset = std::move(other133.__isset); return *this; } void duplicate_response::printTo(std::ostream &out) const { using ::apache::thrift::to_string; out << "duplicate_response("; - out << "error=" << to_string(error); + out << "error="; + (__isset.error ? (out << to_string(error)) : (out << "")); out << ")"; } } diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index ed2a1147f2..5cb93f8ef8 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -261,20 +261,11 @@ struct duplicate_request // The binary form of the write. 3: optional dsn.blob raw_message - - // The hash value calculated from the hash key, which is retrieved from `raw_message`. - // This field is to optimize when duplicating a DUPLICATE, the hash calculation can be skipped. - 4: optional i64 hash - - // Where the write is from. If the topology is A<->B<->C, a write to A will be then - // duplicated to B and C. For C, because A and B is in the `from_clusters_set`, the write - // will not be duplicated to them, so that prevents infinite loop. - 5: optional set from_clusters_set } struct duplicate_response { - 1:i32 error; + 1: optional i32 error; } service rrdb diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 7b62859012..9be8707319 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1798,19 +1798,10 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) typedef struct _duplicate_request__isset { - _duplicate_request__isset() - : timestamp(false), - task_code(false), - raw_message(false), - hash(false), - from_clusters_set(false) - { - } + _duplicate_request__isset() : timestamp(false), task_code(false), raw_message(false) {} bool timestamp : 1; bool task_code : 1; bool raw_message : 1; - bool hash : 1; - bool from_clusters_set : 1; } _duplicate_request__isset; class duplicate_request @@ -1820,14 +1811,12 @@ class duplicate_request duplicate_request(duplicate_request &&); duplicate_request &operator=(const duplicate_request &); duplicate_request &operator=(duplicate_request &&); - duplicate_request() : timestamp(0), hash(0) {} + duplicate_request() : timestamp(0) {} virtual ~duplicate_request() throw(); int64_t timestamp; ::dsn::task_code task_code; ::dsn::blob raw_message; - int64_t hash; - std::set from_clusters_set; _duplicate_request__isset __isset; @@ -1837,10 +1826,6 @@ class duplicate_request void __set_raw_message(const ::dsn::blob &val); - void __set_hash(const int64_t val); - - void __set_from_clusters_set(const std::set &val); - bool operator==(const duplicate_request &rhs) const { if (__isset.timestamp != rhs.__isset.timestamp) @@ -1855,14 +1840,6 @@ class duplicate_request return false; else if (__isset.raw_message && !(raw_message == rhs.raw_message)) return false; - if (__isset.hash != rhs.__isset.hash) - return false; - else if (__isset.hash && !(hash == rhs.hash)) - return false; - if (__isset.from_clusters_set != rhs.__isset.from_clusters_set) - return false; - else if (__isset.from_clusters_set && !(from_clusters_set == rhs.from_clusters_set)) - return false; return true; } bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } @@ -1907,7 +1884,9 @@ class duplicate_response bool operator==(const duplicate_response &rhs) const { - if (!(error == rhs.error)) + if (__isset.error != rhs.__isset.error) + return false; + else if (__isset.error && !(error == rhs.error)) return false; return true; } diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index e35faa65df..a179718e2b 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -107,13 +107,14 @@ void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) } _client->async_duplicate(rpc, - [cb, rpc, this](dsn::error_code err) mutable { - on_duplicate_reply(std::move(cb), std::move(rpc), err); + [hash, cb, rpc, this](dsn::error_code err) mutable { + on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); }, _env.__conf.tracker); } -void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callback cb, +void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, + mutation_duplicator::callback cb, duplicate_rpc rpc, dsn::error_code err) { @@ -140,7 +141,6 @@ void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callba rpc.dsn_request()->header->body_length + rpc.dsn_request()->header->hdr_length; } - auto hash = static_cast(rpc.request().hash); { dsn::zauto_lock _(_lock); if (perr != PERR_OK || err != dsn::ERR_OK) { @@ -163,16 +163,6 @@ void pegasus_mutation_duplicator::on_duplicate_reply(mutation_duplicator::callba } } -static bool has_unknown_cluster_id(const std::set &clusters_set) -{ - for (int8_t cid : clusters_set) { - if (!dsn::replication::is_cluster_id_configured(cid)) { - return true; - } - } - return false; -} - void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) { _total_shipped_size = 0; @@ -183,28 +173,16 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb dsn::task_code rpc_code = std::get<1>(mut); dsn::blob raw_message = std::get<2>(mut); auto dreq = dsn::make_unique(); + uint64_t hash = get_hash_from_request(rpc_code, raw_message); - // extract the rpc wrapped inside if this is a DUPLICATE rpc if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - dsn::from_blob_to_thrift(raw_message, *dreq); - if (dreq->from_clusters_set.find(_remote_cluster_id) != dreq->from_clusters_set.end()) { - // ignore this mutation to prevent infinite duplication loop. - continue; - } - if (has_unknown_cluster_id(dreq->from_clusters_set)) { - // if this write is duplicated from nowhere - continue; - } + // ignore if it is a DUPLICATE } else { - dreq->__set_hash(get_hash_from_request(rpc_code, raw_message)); dreq->__set_raw_message(raw_message); dreq->__set_task_code(rpc_code); dreq->__set_timestamp(std::get<0>(mut)); - dreq->__isset.from_clusters_set = true; } - dreq->from_clusters_set.insert(get_current_cluster_id()); - uint64_t hash = dreq->hash; duplicate_rpc rpc(std::move(dreq), dsn::apps::RPC_RRDB_RRDB_DUPLICATE, 10_s, // TODO(wutao1): configurable timeout. diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 064399b239..681add796e 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -34,7 +34,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator private: void send(uint64_t hash, callback cb); - void on_duplicate_reply(callback, duplicate_rpc, dsn::error_code err); + void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err); private: friend class pegasus_mutation_duplicator_test; diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index acb0604e43..f79eea034d 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -67,12 +67,12 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base total_shipped_size += rpc.dsn_request()->body_size() + rpc.dsn_request()->header->hdr_length; - duplicator_impl->on_duplicate_reply( - [total_shipped_size](size_t final_size) { - ASSERT_EQ(total_shipped_size, final_size); - }, - rpc, - dsn::ERR_OK); + duplicator_impl->on_duplicate_reply(get_hash(rpc), + [total_shipped_size](size_t final_size) { + ASSERT_EQ(total_shipped_size, final_size); + }, + rpc, + dsn::ERR_OK); // schedule next round _tracker.wait_outstanding_tasks(); @@ -114,7 +114,8 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); // failed - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_TIMEOUT); + duplicator_impl->on_duplicate_reply( + get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_TIMEOUT); // schedule next round _tracker.wait_outstanding_tasks(); @@ -127,7 +128,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base // with other error rpc.response().error = PERR_INVALID_ARGUMENT; - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_OK); + duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK); _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); @@ -136,7 +137,8 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base // with other error rpc.response().error = PERR_OK; - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_IO_PENDING); + duplicator_impl->on_duplicate_reply( + get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_IO_PENDING); _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicator_impl->_inflights.size(), 1); ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); @@ -183,7 +185,7 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base auto rpc_list = std::move(duplicate_rpc::mail_box()); for (const auto &rpc : rpc_list) { rpc.response().error = dsn::ERR_OK; - duplicator_impl->on_duplicate_reply([](size_t) {}, rpc, dsn::ERR_OK); + duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK); } _tracker.wait_outstanding_tasks(); ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); @@ -191,91 +193,6 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base } } - void test_all_mutations_are_duplicated_from_master() - { - replica_base replica(dsn::gpid(1, 1), "fake_replica"); - auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); - duplicator->set_task_environment(&_env); - - mutation_tuple_set muts; - for (uint64_t i = 0; i < 3000; i++) { - uint64_t ts = 200 + i; - dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_DUPLICATE; - dsn::apps::duplicate_request request; - - dsn::apps::update_request duplicated_request; - pegasus::pegasus_generate_key(duplicated_request.key, - std::string("hash") + std::to_string(i), - std::string("sort")); - request.__set_timestamp(100); - request.__set_from_clusters_set({2}); // master(onebox2)'s cluster_id = 2 - request.__set_task_code(dsn::apps::RPC_RRDB_RRDB_PUT); - request.__set_hash(pegasus_key_hash(duplicated_request.key)); - - dsn::message_ptr msg = - dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); - request.__set_raw_message(dsn::move_message_to_blob(msg.get())); - - msg = dsn::from_thrift_request_to_received_message(request, code); - auto data = dsn::move_message_to_blob(msg.get()); - muts.insert(std::make_tuple(ts, code, data)); - } - - auto duplicator_impl = dynamic_cast(duplicator.get()); - RPC_MOCKING(duplicate_rpc) - { - duplicator->duplicate(muts, [](size_t total) { ASSERT_EQ(total, 0); }); - - // all mutation duplicated from master will not be duplicated - // again to prevent infinite loop. - ASSERT_EQ(duplicator_impl->_inflights.size(), 0); - ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); - ASSERT_EQ(duplicator_impl->_total_shipped_size, 0); - } - } - - void test_mutation_is_duplicated_from_nowhere() - { - replica_base replica(dsn::gpid(1, 1), "fake_replica"); - auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); - duplicator->set_task_environment(&_env); - - mutation_tuple_set muts; - for (uint64_t i = 0; i < 3000; i++) { - uint64_t ts = 200 + i; - dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_DUPLICATE; - dsn::apps::duplicate_request request; - - dsn::apps::update_request duplicated_request; - pegasus::pegasus_generate_key(duplicated_request.key, - std::string("hash") + std::to_string(i), - std::string("sort")); - request.__set_timestamp(100); - request.__set_from_clusters_set({int8_t(130)}); // cluster_id=130(nowhere) - request.__set_task_code(dsn::apps::RPC_RRDB_RRDB_PUT); - request.__set_hash(pegasus_key_hash(duplicated_request.key)); - - dsn::message_ptr msg = - dsn::from_thrift_request_to_received_message(duplicated_request, request.task_code); - request.__set_raw_message(dsn::move_message_to_blob(msg.get())); - - msg = dsn::from_thrift_request_to_received_message(request, code); - auto data = dsn::move_message_to_blob(msg.get()); - muts.insert(std::make_tuple(ts, code, data)); - } - - auto duplicator_impl = dynamic_cast(duplicator.get()); - RPC_MOCKING(duplicate_rpc) - { - duplicator->duplicate(muts, [](size_t total) { ASSERT_EQ(total, 0); }); - - // ignore those mutations that are duplicated from nowhere - ASSERT_EQ(duplicator_impl->_inflights.size(), 0); - ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); - ASSERT_EQ(duplicator_impl->_total_shipped_size, 0); - } - } - void test_create_duplicator() { replica_base replica(dsn::gpid(1, 1), "fake_replica"); @@ -286,6 +203,12 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2"); ASSERT_EQ(get_current_cluster_id(), 1); } + +private: + static uint64_t get_hash(const duplicate_rpc &rpc) + { + return get_hash_from_request(rpc.request().task_code, rpc.request().raw_message); + } }; TEST_F(pegasus_mutation_duplicator_test, get_hash_from_request) @@ -367,17 +290,7 @@ TEST_F(pegasus_mutation_duplicator_test, duplicate_isolated_hashkeys) test_duplicate_isolated_hashkeys(); } -TEST_F(pegasus_mutation_duplicator_test, all_mutations_are_duplicated_from_master) -{ - test_all_mutations_are_duplicated_from_master(); -} - TEST_F(pegasus_mutation_duplicator_test, create_duplicator) { test_create_duplicator(); } -TEST_F(pegasus_mutation_duplicator_test, mutation_is_duplicated_from_nowhere) -{ - test_mutation_is_duplicated_from_nowhere(); -} - } // namespace server } // namespace pegasus From 1859186abe79da1359557cd39cb5fa5d4ef01057 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 3 Jan 2020 11:16:26 +0800 Subject: [PATCH 15/17] add cluster_id in duplicate_request --- src/base/rrdb_types.cpp | 27 ++++++++++++++++++++++ src/idl/rrdb.thrift | 3 +++ src/include/rrdb/rrdb_types.h | 15 ++++++++++-- src/server/pegasus_mutation_duplicator.cpp | 12 ++++++---- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index a346fb2614..ef9e9a08c9 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4275,6 +4275,12 @@ void duplicate_request::__set_raw_message(const ::dsn::blob &val) __isset.raw_message = true; } +void duplicate_request::__set_cluster_id(const int8_t val) +{ + this->cluster_id = val; + __isset.cluster_id = true; +} + uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -4318,6 +4324,14 @@ uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_BYTE) { + xfer += iprot->readByte(this->cluster_id); + this->__isset.cluster_id = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -4351,6 +4365,11 @@ uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) xfer += this->raw_message.write(oprot); xfer += oprot->writeFieldEnd(); } + if (this->__isset.cluster_id) { + xfer += oprot->writeFieldBegin("cluster_id", ::apache::thrift::protocol::T_BYTE, 4); + xfer += oprot->writeByte(this->cluster_id); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4362,6 +4381,7 @@ void swap(duplicate_request &a, duplicate_request &b) swap(a.timestamp, b.timestamp); swap(a.task_code, b.task_code); swap(a.raw_message, b.raw_message); + swap(a.cluster_id, b.cluster_id); swap(a.__isset, b.__isset); } @@ -4370,6 +4390,7 @@ duplicate_request::duplicate_request(const duplicate_request &other126) timestamp = other126.timestamp; task_code = other126.task_code; raw_message = other126.raw_message; + cluster_id = other126.cluster_id; __isset = other126.__isset; } duplicate_request::duplicate_request(duplicate_request &&other127) @@ -4377,6 +4398,7 @@ duplicate_request::duplicate_request(duplicate_request &&other127) timestamp = std::move(other127.timestamp); task_code = std::move(other127.task_code); raw_message = std::move(other127.raw_message); + cluster_id = std::move(other127.cluster_id); __isset = std::move(other127.__isset); } duplicate_request &duplicate_request::operator=(const duplicate_request &other128) @@ -4384,6 +4406,7 @@ duplicate_request &duplicate_request::operator=(const duplicate_request &other12 timestamp = other128.timestamp; task_code = other128.task_code; raw_message = other128.raw_message; + cluster_id = other128.cluster_id; __isset = other128.__isset; return *this; } @@ -4392,6 +4415,7 @@ duplicate_request &duplicate_request::operator=(duplicate_request &&other129) timestamp = std::move(other129.timestamp); task_code = std::move(other129.task_code); raw_message = std::move(other129.raw_message); + cluster_id = std::move(other129.cluster_id); __isset = std::move(other129.__isset); return *this; } @@ -4407,6 +4431,9 @@ void duplicate_request::printTo(std::ostream &out) const out << ", " << "raw_message="; (__isset.raw_message ? (out << to_string(raw_message)) : (out << "")); + out << ", " + << "cluster_id="; + (__isset.cluster_id ? (out << to_string(cluster_id)) : (out << "")); out << ")"; } diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 5cb93f8ef8..5718c95f9f 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -261,6 +261,9 @@ struct duplicate_request // The binary form of the write. 3: optional dsn.blob raw_message + + // ID of the cluster where this write comes from. + 4: optional byte cluster_id } struct duplicate_response diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 9be8707319..b9ed126f5a 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1798,10 +1798,14 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) typedef struct _duplicate_request__isset { - _duplicate_request__isset() : timestamp(false), task_code(false), raw_message(false) {} + _duplicate_request__isset() + : timestamp(false), task_code(false), raw_message(false), cluster_id(false) + { + } bool timestamp : 1; bool task_code : 1; bool raw_message : 1; + bool cluster_id : 1; } _duplicate_request__isset; class duplicate_request @@ -1811,12 +1815,13 @@ class duplicate_request duplicate_request(duplicate_request &&); duplicate_request &operator=(const duplicate_request &); duplicate_request &operator=(duplicate_request &&); - duplicate_request() : timestamp(0) {} + duplicate_request() : timestamp(0), cluster_id(0) {} virtual ~duplicate_request() throw(); int64_t timestamp; ::dsn::task_code task_code; ::dsn::blob raw_message; + int8_t cluster_id; _duplicate_request__isset __isset; @@ -1826,6 +1831,8 @@ class duplicate_request void __set_raw_message(const ::dsn::blob &val); + void __set_cluster_id(const int8_t val); + bool operator==(const duplicate_request &rhs) const { if (__isset.timestamp != rhs.__isset.timestamp) @@ -1840,6 +1847,10 @@ class duplicate_request return false; else if (__isset.raw_message && !(raw_message == rhs.raw_message)) return false; + if (__isset.cluster_id != rhs.__isset.cluster_id) + return false; + else if (__isset.cluster_id && !(cluster_id == rhs.cluster_id)) + return false; return true; } bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index a179718e2b..5a994fe9ad 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -106,11 +106,12 @@ void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) _inflights[hash].pop_front(); } - _client->async_duplicate(rpc, - [hash, cb, rpc, this](dsn::error_code err) mutable { - on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); - }, - _env.__conf.tracker); + _client->async_duplicate( + rpc, + [hash, cb, rpc, this](dsn::error_code err) mutable { + on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); + }, + _env.__conf.tracker); } void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, @@ -181,6 +182,7 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb dreq->__set_raw_message(raw_message); dreq->__set_task_code(rpc_code); dreq->__set_timestamp(std::get<0>(mut)); + dreq->__set_cluster_id(get_current_cluster_id()); } duplicate_rpc rpc(std::move(dreq), From 6e736824fc2d1f2e5204513c0c330ea23a0dad57 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 3 Jan 2020 14:46:32 +0800 Subject: [PATCH 16/17] revert value_schema --- src/base/pegasus_value_schema.h | 18 -------- src/base/test/pegasus_value_schema_test.cpp | 46 --------------------- 2 files changed, 64 deletions(-) delete mode 100644 src/base/test/pegasus_value_schema_test.cpp diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h index 3a056b8fc9..ccacd5ba0c 100644 --- a/src/base/pegasus_value_schema.h +++ b/src/base/pegasus_value_schema.h @@ -21,24 +21,6 @@ namespace pegasus { #define PEGASUS_DATA_VERSION_MAX 0u -/// Generates timetag in host endian. -inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag) -{ - return timestamp << 8 | cluster_id << 1 | delete_tag; -} - -inline uint8_t extract_cluster_id_from_timetag(uint64_t timetag) -{ - // 7bit: 0x7F - return static_cast((timetag >> 1) & 0x7F); -} - -inline uint64_t extract_timestamp_from_timetag(uint64_t timetag) -{ - // 56bit: 0xFFFFFFFFFFFFFFL - return static_cast((timetag >> 8) & 0xFFFFFFFFFFFFFFL); -} - /// Extracts expire_ts from rocksdb value with given version. /// The value schema must be in v0. /// \return expire_ts in host endian diff --git a/src/base/test/pegasus_value_schema_test.cpp b/src/base/test/pegasus_value_schema_test.cpp deleted file mode 100644 index 28db232c69..0000000000 --- a/src/base/test/pegasus_value_schema_test.cpp +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#include "base/pegasus_value_schema.h" - -#include - -using namespace pegasus; - -TEST(value_schema, generate_and_extract_timetag) -{ - struct test_case - { - uint64_t timestamp; - bool delete_tag; - - uint64_t wtimestamp; - } tests[] = { - {1000, true, 1000}, - {1000, false, 1000}, - - {std::numeric_limits::max() >> 8, - true, - std::numeric_limits::max() >> 8}, - - {std::numeric_limits::max() >> 8, - false, - std::numeric_limits::max() >> 8}, - - {std::numeric_limits::max(), false, std::numeric_limits::max() >> 8}, - - {std::numeric_limits::max(), false, std::numeric_limits::max() >> 8}, - - // Wed, 12 Dec 2018 09:48:48 GMT - {1544583472297055, false, 1544583472297055}, - }; - - for (auto &t : tests) { - for (uint8_t cluster_id = 1; cluster_id <= 0x7F; cluster_id++) { - uint64_t timetag = generate_timetag(t.timestamp, cluster_id, t.delete_tag); - ASSERT_EQ(cluster_id, extract_cluster_id_from_timetag(timetag)); - ASSERT_EQ(t.wtimestamp, extract_timestamp_from_timetag(timetag)); - } - } -} From b004a1a95d2ee98329b2571bd336a5433edd6580 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 3 Jan 2020 15:00:47 +0800 Subject: [PATCH 17/17] fix format --- src/server/pegasus_mutation_duplicator.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 5a994fe9ad..7b77542387 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -106,12 +106,11 @@ void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) _inflights[hash].pop_front(); } - _client->async_duplicate( - rpc, - [hash, cb, rpc, this](dsn::error_code err) mutable { - on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); - }, - _env.__conf.tracker); + _client->async_duplicate(rpc, + [hash, cb, rpc, this](dsn::error_code err) mutable { + on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); + }, + _env.__conf.tracker); } void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,