From 0ee404ebf3549a784e28ea5e0affd6c039c4e709 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Mon, 13 May 2024 11:26:41 +0800 Subject: [PATCH] feat(duplication): add options to support cluster name only used for duplication and allow any other cluster id except myself to be ignored (#2000) The purpose of this PR is to optimize configurations for duplications. Firstly, many Pegasus clusters are configured with the same `cluster_name` (namely `[replication]cluster_name`). However, once we decide to duplicate tables between them, their `cluster_name` have to be changed to be distinguished from each other -- this might lead to side effects. Secondly, consider a scenario where many clusters are duplicated to a target cluster. This means we have to add many cluster ids to the `*.ini` file of the target cluster, and the target cluster might be restarted very frequently. Thus following options are added to solve both problems: ```diff [replication] + dup_cluster_name = + dup_ignore_other_cluster_ids = false ``` `[replication]dup_cluster_name` is added only for duplication in case `cluster_name` has to be changed, while `[replication]dup_ignore_other_cluster_ids` is added so that only the target cluster id should be configured and there is no need to add any other cluster id. --- src/common/common.cpp | 23 +++++++++- src/common/common.h | 14 +++++- src/common/duplication_common.cpp | 44 ++++++++++++++++--- src/common/duplication_common.h | 13 +++--- src/common/test/common_test.cpp | 17 ++++++- src/common/test/duplication_common_test.cpp | 44 ++++++++++++++++--- .../duplication/meta_duplication_service.cpp | 19 +++++--- src/server/pegasus_mutation_duplicator.cpp | 27 +++++++++--- src/server/pegasus_write_service.cpp | 4 +- src/server/pegasus_write_service.h | 10 ----- src/server/pegasus_write_service_impl.h | 10 ----- src/server/rocksdb_wrapper.cpp | 4 +- .../test/pegasus_mutation_duplicator_test.cpp | 7 ++- src/test_util/test_util.h | 2 +- src/utils/fmt_logging.h | 24 ++++++++++ 15 files changed, 200 insertions(+), 62 deletions(-) diff --git a/src/common/common.cpp b/src/common/common.cpp index 9ea1f558d0..4cc6fb7eea 100644 --- a/src/common/common.cpp +++ b/src/common/common.cpp @@ -21,7 +21,17 @@ #include "utils/fmt_logging.h" #include "utils/strings.h" -DSN_DEFINE_string(replication, cluster_name, "", "name of this cluster"); +DSN_DEFINE_string(replication, cluster_name, "", "The name of this cluster"); + +// Many Pegasus clusters are configured with the same `cluster_name`(namely +// `[replication]cluster_name`). However, once we decide to duplicate tables +// between them, their `cluster_name` have to be changed to be distinguished +// from each other -- this might lead to side effects. Thus use `dup_cluster_name` +// only for duplication in case `cluster_name` has to be changed. +DSN_DEFINE_string(replication, + dup_cluster_name, + "", + "The name of this cluster only used for duplication"); namespace dsn { @@ -31,5 +41,16 @@ namespace dsn { return FLAGS_cluster_name; } +/*extern*/ const char *get_current_dup_cluster_name() +{ + if (!utils::is_empty(FLAGS_dup_cluster_name)) { + return FLAGS_dup_cluster_name; + } + + // Once `dup_cluster_name` is not configured, use cluster_name instead. + return get_current_cluster_name(); +} + const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); + } // namespace dsn diff --git a/src/common/common.h b/src/common/common.h index 6ef992c6f1..7965e6279c 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -22,11 +22,21 @@ #include namespace dsn { -/// Returns the cluster name (i.e, "onebox") if it's configured under -/// "replication" section: +/// Returns the cluster name ("onebox" in the following example) if it's +/// configured under "replication" section: /// [replication] /// cluster_name = "onebox" extern const char *get_current_cluster_name(); +/// Returns the cluster name ("onebox" in the following example) which is +/// only used for duplication (see the definition for `dup_cluster_name` +/// flag for details) if it's configured under "replication" section: +/// [replication] +/// dup_cluster_name = "onebox" +/// +/// However, once `[replication]dup_cluster_name` is not configured, +/// `[replication]cluster_name` would be returned. +extern const char *get_current_dup_cluster_name(); + extern const std::string PEGASUS_CLUSTER_SECTION_NAME; } // namespace dsn diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index 6c6e1ad034..0aea933473 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -19,9 +19,11 @@ #include #include +#include #include #include +#include "common/common.h" #include "duplication_types.h" #include "nlohmann/detail/json_ref.hpp" #include "nlohmann/json_fwd.hpp" @@ -37,6 +39,17 @@ DSN_DEFINE_uint32(replication, "send mutation log batch bytes size per rpc"); DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); +// While many clusters are duplicated to a target cluster, we have to add many cluster +// ids to the `*.ini` file of the target cluster, and the target cluster might be restarted +// very frequently. +// +// This option is added so that only the target cluster id should be configured while +// there is no need to add any other cluster id. +DSN_DEFINE_bool(replication, + dup_ignore_other_cluster_ids, + false, + "Allow any other cluster id except myself to be ignored for duplication"); + namespace dsn { namespace replication { @@ -89,7 +102,6 @@ class duplication_group_registry : public utils::singletonsecond; } - const std::map &get_duplication_group() { return _group; } const std::set &get_distinct_cluster_id_set() { return _distinct_cids; } private: @@ -132,6 +144,22 @@ class duplication_group_registry : public utils::singleton &get_duplication_group() +/*extern*/ const std::set &get_distinct_cluster_id_set() { - return internal::duplication_group_registry::instance().get_duplication_group(); + return internal::duplication_group_registry::instance().get_distinct_cluster_id_set(); } -/*extern*/ const std::set &get_distinct_cluster_id_set() +/*extern*/ bool is_dup_cluster_id_configured(uint8_t cluster_id) { - return internal::duplication_group_registry::instance().get_distinct_cluster_id_set(); + if (cluster_id != get_current_dup_cluster_id()) { + if (FLAGS_dup_ignore_other_cluster_ids) { + return true; + } + } + + return get_distinct_cluster_id_set().find(cluster_id) != get_distinct_cluster_id_set().end(); } } // namespace replication diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index 0918d978b5..fe0ec067b4 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -20,7 +20,6 @@ #pragma once #include -#include #include #include @@ -63,21 +62,19 @@ inline bool is_duplication_status_invalid(duplication_status::type status) /// The returned cluster id of get_duplication_cluster_id("wuhan-mi-srv-ad") is 3. extern error_with get_duplication_cluster_id(const std::string &cluster_name); +extern uint8_t get_current_dup_cluster_id_or_default(); + +extern uint8_t get_current_dup_cluster_id(); + /// Returns a json string. extern std::string duplication_entry_to_string(const duplication_entry &dup); /// Returns a json string. extern std::string duplication_query_response_to_string(const duplication_query_response &); -/// Returns a mapping from cluster_name to cluster_id. -extern const std::map &get_duplication_group(); - extern const std::set &get_distinct_cluster_id_set(); -inline bool is_cluster_id_configured(uint8_t cid) -{ - return get_distinct_cluster_id_set().find(cid) != get_distinct_cluster_id_set().end(); -} +extern bool is_dup_cluster_id_configured(uint8_t cluster_id); struct duplication_constants { diff --git a/src/common/test/common_test.cpp b/src/common/test/common_test.cpp index f645bbe705..2b2a621247 100644 --- a/src/common/test/common_test.cpp +++ b/src/common/test/common_test.cpp @@ -20,10 +20,25 @@ #include "common/common.h" #include "gtest/gtest.h" +#include "test_util/test_util.h" +#include "utils/flags.h" + +DSN_DECLARE_string(dup_cluster_name); namespace dsn { + TEST(duplication_common, get_current_cluster_name) { - ASSERT_STREQ(get_current_cluster_name(), "master-cluster"); + ASSERT_STREQ("master-cluster", get_current_cluster_name()); } + +TEST(duplication_common, get_current_dup_cluster_name) +{ + ASSERT_STREQ("master-cluster", get_current_dup_cluster_name()); + + PRESERVE_FLAG(dup_cluster_name); + FLAGS_dup_cluster_name = "slave-cluster"; + ASSERT_STREQ("slave-cluster", get_current_dup_cluster_name()); +} + } // namespace dsn diff --git a/src/common/test/duplication_common_test.cpp b/src/common/test/duplication_common_test.cpp index 8d5fa63541..11690f4057 100644 --- a/src/common/test/duplication_common_test.cpp +++ b/src/common/test/duplication_common_test.cpp @@ -29,23 +29,57 @@ #include #include "gtest/gtest.h" +#include "test_util/test_util.h" #include "utils/error_code.h" +#include "utils/flags.h" + +DSN_DECLARE_string(dup_cluster_name); +DSN_DECLARE_bool(dup_ignore_other_cluster_ids); namespace dsn { namespace replication { TEST(duplication_common, get_duplication_cluster_id) { - ASSERT_EQ(get_duplication_cluster_id("master-cluster").get_value(), 1); - ASSERT_EQ(get_duplication_cluster_id("slave-cluster").get_value(), 2); + ASSERT_EQ(1, get_duplication_cluster_id("master-cluster").get_value()); + ASSERT_EQ(2, get_duplication_cluster_id("slave-cluster").get_value()); + + ASSERT_EQ(ERR_INVALID_PARAMETERS, get_duplication_cluster_id("").get_error().code()); + ASSERT_EQ(ERR_OBJECT_NOT_FOUND, get_duplication_cluster_id("unknown").get_error().code()); +} + +TEST(duplication_common, get_current_dup_cluster_id) +{ + ASSERT_EQ(1, get_current_dup_cluster_id()); - ASSERT_EQ(get_duplication_cluster_id("").get_error().code(), ERR_INVALID_PARAMETERS); - ASSERT_EQ(get_duplication_cluster_id("unknown").get_error().code(), ERR_OBJECT_NOT_FOUND); + // Current cluster id is static, thus updating dup cluster name should never change + // current cluster id. + PRESERVE_FLAG(dup_cluster_name); + FLAGS_dup_cluster_name = "slave-cluster"; + ASSERT_EQ(1, get_current_dup_cluster_id()); } TEST(duplication_common, get_distinct_cluster_id_set) { - ASSERT_EQ(get_distinct_cluster_id_set(), std::set({1, 2})); + ASSERT_EQ(std::set({1, 2}), get_distinct_cluster_id_set()); +} + +TEST(duplication_common, is_dup_cluster_id_configured) +{ + ASSERT_FALSE(is_dup_cluster_id_configured(0)); + ASSERT_TRUE(is_dup_cluster_id_configured(1)); + ASSERT_TRUE(is_dup_cluster_id_configured(2)); + ASSERT_FALSE(is_dup_cluster_id_configured(3)); +} + +TEST(duplication_common, dup_ignore_other_cluster_ids) +{ + PRESERVE_FLAG(dup_ignore_other_cluster_ids); + FLAGS_dup_ignore_other_cluster_ids = true; + + for (uint8_t id = 0; id < 4; ++id) { + ASSERT_TRUE(is_dup_cluster_id_configured(id)); + } } } // namespace replication diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index 2673f5055b..c8a0d888ed 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -49,11 +49,14 @@ #include "utils/error_code.h" #include "utils/errors.h" #include "utils/fail_point.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" #include "utils/string_conv.h" #include "utils/zlocks.h" +DSN_DECLARE_bool(dup_ignore_other_cluster_ids); + namespace dsn { namespace replication { @@ -209,13 +212,15 @@ void meta_duplication_service::add_duplication(duplication_add_rpc rpc) ERR_INVALID_PARAMETERS, "illegal operation: adding duplication to itself"); - auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name); - LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(), - response, - ERR_INVALID_PARAMETERS, - "get_duplication_cluster_id({}) failed, error: {}", - request.remote_cluster_name, - remote_cluster_id.get_error()); + if (!FLAGS_dup_ignore_other_cluster_ids) { + auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name); + LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(), + response, + ERR_INVALID_PARAMETERS, + "get_duplication_cluster_id({}) failed, error: {}", + request.remote_cluster_name, + remote_cluster_id.get_error()); + } std::vector meta_list; LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT( diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index de13e8ee9a..849fa56b3c 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -32,6 +32,7 @@ #include #include "client_lib/pegasus_client_impl.h" +#include "common/common.h" #include "common/duplication_common.h" #include "duplication_internal_types.h" #include "pegasus/client.h" @@ -40,7 +41,6 @@ #include "rrdb/rrdb_types.h" #include "runtime/message_utils.h" #include "runtime/rpc/rpc_message.h" -#include "server/pegasus_write_service.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/chrono_literals.h" @@ -50,6 +50,8 @@ #include "utils/fmt_logging.h" #include "utils/rand.h" +DSN_DECLARE_bool(dup_ignore_other_cluster_ids); + METRIC_DEFINE_counter(replica, dup_shipped_successful_requests, dsn::metric_unit::kRequests, @@ -127,6 +129,19 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data()); _client = static_cast(client); + CHECK_STRNE_PREFIX_MSG(dsn::get_current_dup_cluster_name(), + remote_cluster.data(), + "remote cluster should not be myself: {}", + remote_cluster); + + if (FLAGS_dup_ignore_other_cluster_ids) { + LOG_INFO_PREFIX("initialize mutation duplicator for local cluster [id:{}], " + "remote cluster [id:ignored, addr:{}]", + dsn::replication::get_current_dup_cluster_id(), + remote_cluster); + return; + } + auto ret = dsn::replication::get_duplication_cluster_id(remote_cluster.data()); CHECK_PREFIX_MSG(ret.is_ok(), // never possible, meta server disallows such remote_cluster. "invalid remote cluster: {}, err_ret: {}", @@ -136,13 +151,15 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli LOG_INFO_PREFIX("initialize mutation duplicator for local cluster [id:{}], " "remote cluster [id:{}, addr:{}]", - get_current_cluster_id(), + dsn::replication::get_current_dup_cluster_id(), _remote_cluster_id, remote_cluster); // never possible to duplicate data to itself - CHECK_NE_PREFIX_MSG( - get_current_cluster_id(), _remote_cluster_id, "invalid remote cluster: {}", remote_cluster); + CHECK_NE_PREFIX_MSG(dsn::replication::get_current_dup_cluster_id(), + _remote_cluster_id, + "invalid remote cluster: {}", + remote_cluster); } void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) @@ -237,7 +254,7 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb entry.__set_raw_message(raw_message); entry.__set_task_code(rpc_code); entry.__set_timestamp(std::get<0>(mut)); - entry.__set_cluster_id(get_current_cluster_id()); + entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id()); batch_request->entries.emplace_back(std::move(entry)); batch_bytes += raw_message.length(); } diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index a7e9d93420..d688ab6e45 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -349,12 +349,12 @@ int pegasus_write_service::duplicate(int64_t decree, { // Verifies the cluster_id. for (const auto &request : requests.entries) { - if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) { + if (!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) { resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint("request cluster id is unconfigured"); return empty_put(decree); } - if (request.cluster_id == get_current_cluster_id()) { + if (request.cluster_id == dsn::replication::get_current_dup_cluster_id()) { resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint("self-duplicating"); return empty_put(decree); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index eed596e355..f430b48b0c 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -22,10 +22,7 @@ #include #include -#include "common//duplication_common.h" -#include "common/common.h" #include "replica/replica_base.h" -#include "utils/errors.h" #include "utils/metrics.h" namespace dsn { @@ -55,13 +52,6 @@ class ingestion_response; namespace pegasus { namespace server { -inline uint8_t get_current_cluster_id() -{ - static const uint8_t cluster_id = - dsn::replication::get_duplication_cluster_id(dsn::get_current_cluster_name()).get_value(); - return cluster_id; -} - // The context of an mutation to the database. struct db_write_context { diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index f551b1b550..6eec4b7601 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -59,16 +59,6 @@ struct db_get_context bool expired{false}; }; -inline int get_cluster_id_if_exists() -{ - // cluster_id is 0 if not configured, which means it will accept writes - // from any cluster as long as the timestamp is larger. - static auto cluster_id_res = - dsn::replication::get_duplication_cluster_id(dsn::get_current_cluster_name()); - static uint64_t cluster_id = cluster_id_res.is_ok() ? cluster_id_res.get_value() : 0; - return cluster_id; -} - inline dsn::error_code get_external_files_path(const std::string &bulk_load_dir, const bool verify_before_ingest, const dsn::replication::bulk_load_metadata &metadata, diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index e4d6e3b21f..59203836be 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -26,6 +26,7 @@ #include "base/meta_store.h" #include "base/pegasus_value_schema.h" +#include "common/duplication_common.h" #include "pegasus_key_schema.h" #include "pegasus_utils.h" #include "pegasus_write_service_impl.h" @@ -121,7 +122,8 @@ int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx, uint64_t new_timetag = ctx.remote_timetag; if (!ctx.is_duplicated_write()) { // local write - new_timetag = generate_timetag(ctx.timestamp, get_cluster_id_if_exists(), false); + new_timetag = generate_timetag( + ctx.timestamp, dsn::replication::get_current_dup_cluster_id_or_default(), false); } if (ctx.verify_timetag && // needs read-before-write diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index fac7c449b5..aaf91a0657 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -44,7 +44,6 @@ #include "runtime/message_utils.h" #include "runtime/rpc/rpc_holder.h" #include "runtime/rpc/rpc_message.h" -#include "server/pegasus_write_service.h" #include "utils/blob.h" #include "utils/error_code.h" @@ -275,9 +274,9 @@ class pegasus_mutation_duplicator_test : public pegasus_server_test_base auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); duplicator->set_task_environment(&_env); auto duplicator_impl = dynamic_cast(duplicator.get()); - ASSERT_EQ(duplicator_impl->_remote_cluster_id, 2); - ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2"); - ASSERT_EQ(get_current_cluster_id(), 1); + ASSERT_EQ(2, duplicator_impl->_remote_cluster_id); + ASSERT_STREQ("onebox2", duplicator_impl->_remote_cluster.c_str()); + ASSERT_EQ(1, get_current_dup_cluster_id()); } private: diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h index 46be616c90..2e2b34bb0d 100644 --- a/src/test_util/test_util.h +++ b/src/test_util/test_util.h @@ -46,7 +46,7 @@ class file_meta; // Save the current value of a flag and restore it at the end of the function. #define PRESERVE_FLAG(name) \ - auto PRESERVED_FLAGS_##name = FLAGS_##name; \ + const auto PRESERVED_FLAGS_##name = FLAGS_##name; \ auto PRESERVED_FLAGS_##name##_cleanup = \ dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name = PRESERVED_FLAGS_##name; }) diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h index 989e6eb916..97e751a48c 100644 --- a/src/utils/fmt_logging.h +++ b/src/utils/fmt_logging.h @@ -207,6 +207,30 @@ inline const char *null_str_printer(const char *s) { return s == nullptr ? "(nul #define CHECK_NOTNULL_PREFIX_MSG(p, ...) CHECK_PREFIX_MSG(p != nullptr, __VA_ARGS__) #define CHECK_NOTNULL_PREFIX(p) CHECK_NOTNULL_PREFIX_MSG(p, "") +#define CHECK_STREQ_PREFIX_MSG(var1, var2, ...) \ + do { \ + const auto &_v1 = (var1); \ + const auto &_v2 = (var2); \ + CHECK_EXPRESSION_PREFIX_MSG(var1 == var2, \ + dsn::utils::equals(_v1, _v2), \ + "{} vs {} {}", \ + null_str_printer(_v1), \ + null_str_printer(_v2), \ + fmt::format(__VA_ARGS__)); \ + } while (false) + +#define CHECK_STRNE_PREFIX_MSG(var1, var2, ...) \ + do { \ + const auto &_v1 = (var1); \ + const auto &_v2 = (var2); \ + CHECK_EXPRESSION_PREFIX_MSG(var1 != var2, \ + !dsn::utils::equals(_v1, _v2), \ + "{} vs {} {}", \ + null_str_printer(_v1), \ + null_str_printer(_v2), \ + fmt::format(__VA_ARGS__)); \ + } while (false) + #define CHECK_NE_PREFIX_MSG(var1, var2, ...) \ do { \ const auto &_v1 = (var1); \