Skip to content

Commit

Permalink
feat(duplication): add options to support cluster name only used for …
Browse files Browse the repository at this point in the history
…duplication and allow any other cluster id except myself to be ignored (#2000)

The purpose of this PR is to optimize configurations for duplications.

Firstly, many Pegasus clusters are configured with the same `cluster_name`
(namely `[replication]cluster_name`). However, once we decide to duplicate
tables between them, their `cluster_name` have to be changed to be distinguished
from each other -- this might lead to side effects.

Secondly, consider a scenario where many clusters are duplicated to a target
cluster. This means we have to add many cluster ids to the `*.ini` file of the target
cluster, and the target cluster might be restarted very frequently.

Thus following options are added to solve both problems:

```diff
[replication]
+ dup_cluster_name =
+ dup_ignore_other_cluster_ids = false
```

`[replication]dup_cluster_name` is added only for duplication in case `cluster_name`
has to be changed, while `[replication]dup_ignore_other_cluster_ids` is added so that
only the target cluster id should be configured and there is no need to add any other
cluster id.
  • Loading branch information
empiredan authored May 13, 2024
1 parent 10ac661 commit 0ee404e
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 62 deletions.
23 changes: 22 additions & 1 deletion src/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@
#include "utils/fmt_logging.h"
#include "utils/strings.h"

DSN_DEFINE_string(replication, cluster_name, "", "name of this cluster");
DSN_DEFINE_string(replication, cluster_name, "", "The name of this cluster");

// Many Pegasus clusters are configured with the same `cluster_name`(namely
// `[replication]cluster_name`). However, once we decide to duplicate tables
// between them, their `cluster_name` have to be changed to be distinguished
// from each other -- this might lead to side effects. Thus use `dup_cluster_name`
// only for duplication in case `cluster_name` has to be changed.
DSN_DEFINE_string(replication,
dup_cluster_name,
"",
"The name of this cluster only used for duplication");

namespace dsn {

Expand All @@ -31,5 +41,16 @@ namespace dsn {
return FLAGS_cluster_name;
}

/*extern*/ const char *get_current_dup_cluster_name()
{
if (!utils::is_empty(FLAGS_dup_cluster_name)) {
return FLAGS_dup_cluster_name;
}

// Once `dup_cluster_name` is not configured, use cluster_name instead.
return get_current_cluster_name();
}

const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");

} // namespace dsn
14 changes: 12 additions & 2 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@
#include <string>

namespace dsn {
/// Returns the cluster name (i.e, "onebox") if it's configured under
/// "replication" section:
/// Returns the cluster name ("onebox" in the following example) if it's
/// configured under "replication" section:
/// [replication]
/// cluster_name = "onebox"
extern const char *get_current_cluster_name();

/// Returns the cluster name ("onebox" in the following example) which is
/// only used for duplication (see the definition for `dup_cluster_name`
/// flag for details) if it's configured under "replication" section:
/// [replication]
/// dup_cluster_name = "onebox"
///
/// However, once `[replication]dup_cluster_name` is not configured,
/// `[replication]cluster_name` would be returned.
extern const char *get_current_dup_cluster_name();

extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
} // namespace dsn
44 changes: 39 additions & 5 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#include <nlohmann/json.hpp>
#include <cstdint>
#include <map>
#include <utility>
#include <vector>

#include "common/common.h"
#include "duplication_types.h"
#include "nlohmann/detail/json_ref.hpp"
#include "nlohmann/json_fwd.hpp"
Expand All @@ -37,6 +39,17 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);

// While many clusters are duplicated to a target cluster, we have to add many cluster
// ids to the `*.ini` file of the target cluster, and the target cluster might be restarted
// very frequently.
//
// This option is added so that only the target cluster id should be configured while
// there is no need to add any other cluster id.
DSN_DEFINE_bool(replication,
dup_ignore_other_cluster_ids,
false,
"Allow any other cluster id except myself to be ignored for duplication");

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -89,7 +102,6 @@ class duplication_group_registry : public utils::singleton<duplication_group_reg
return it->second;
}

const std::map<std::string, uint8_t> &get_duplication_group() { return _group; }
const std::set<uint8_t> &get_distinct_cluster_id_set() { return _distinct_cids; }

private:
Expand Down Expand Up @@ -132,6 +144,22 @@ class duplication_group_registry : public utils::singleton<duplication_group_reg
return internal::duplication_group_registry::instance().get_cluster_id(cluster_name);
}

/*extern*/ uint8_t get_current_dup_cluster_id_or_default()
{
// Set cluster id to 0 as default if it is not configured, which means it would accept
// writes from any cluster as long as the timestamp is larger.
static const auto res = get_duplication_cluster_id(get_current_dup_cluster_name());
static const uint8_t cluster_id = res.is_ok() ? res.get_value() : 0;
return cluster_id;
}

/*extern*/ uint8_t get_current_dup_cluster_id()
{
static const uint8_t cluster_id =
get_duplication_cluster_id(get_current_dup_cluster_name()).get_value();
return cluster_id;
}

// TODO(wutao1): implement our C++ version of `TSimpleJSONProtocol` if there're
// more cases for converting thrift to JSON
static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
Expand Down Expand Up @@ -184,14 +212,20 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
return json.dump();
}

/*extern*/ const std::map<std::string, uint8_t> &get_duplication_group()
/*extern*/ const std::set<uint8_t> &get_distinct_cluster_id_set()
{
return internal::duplication_group_registry::instance().get_duplication_group();
return internal::duplication_group_registry::instance().get_distinct_cluster_id_set();
}

/*extern*/ const std::set<uint8_t> &get_distinct_cluster_id_set()
/*extern*/ bool is_dup_cluster_id_configured(uint8_t cluster_id)
{
return internal::duplication_group_registry::instance().get_distinct_cluster_id_set();
if (cluster_id != get_current_dup_cluster_id()) {
if (FLAGS_dup_ignore_other_cluster_ids) {
return true;
}
}

return get_distinct_cluster_id_set().find(cluster_id) != get_distinct_cluster_id_set().end();
}

} // namespace replication
Expand Down
13 changes: 5 additions & 8 deletions src/common/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#pragma once

#include <stdint.h>
#include <map>
#include <set>
#include <string>

Expand Down Expand Up @@ -63,21 +62,19 @@ inline bool is_duplication_status_invalid(duplication_status::type status)
/// The returned cluster id of get_duplication_cluster_id("wuhan-mi-srv-ad") is 3.
extern error_with<uint8_t> get_duplication_cluster_id(const std::string &cluster_name);

extern uint8_t get_current_dup_cluster_id_or_default();

extern uint8_t get_current_dup_cluster_id();

/// Returns a json string.
extern std::string duplication_entry_to_string(const duplication_entry &dup);

/// Returns a json string.
extern std::string duplication_query_response_to_string(const duplication_query_response &);

/// Returns a mapping from cluster_name to cluster_id.
extern const std::map<std::string, uint8_t> &get_duplication_group();

extern const std::set<uint8_t> &get_distinct_cluster_id_set();

inline bool is_cluster_id_configured(uint8_t cid)
{
return get_distinct_cluster_id_set().find(cid) != get_distinct_cluster_id_set().end();
}
extern bool is_dup_cluster_id_configured(uint8_t cluster_id);

struct duplication_constants
{
Expand Down
17 changes: 16 additions & 1 deletion src/common/test/common_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,25 @@
#include "common/common.h"

#include "gtest/gtest.h"
#include "test_util/test_util.h"
#include "utils/flags.h"

DSN_DECLARE_string(dup_cluster_name);

namespace dsn {

TEST(duplication_common, get_current_cluster_name)
{
ASSERT_STREQ(get_current_cluster_name(), "master-cluster");
ASSERT_STREQ("master-cluster", get_current_cluster_name());
}

TEST(duplication_common, get_current_dup_cluster_name)
{
ASSERT_STREQ("master-cluster", get_current_dup_cluster_name());

PRESERVE_FLAG(dup_cluster_name);
FLAGS_dup_cluster_name = "slave-cluster";
ASSERT_STREQ("slave-cluster", get_current_dup_cluster_name());
}

} // namespace dsn
44 changes: 39 additions & 5 deletions src/common/test/duplication_common_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,57 @@
#include <cstdint>

#include "gtest/gtest.h"
#include "test_util/test_util.h"
#include "utils/error_code.h"
#include "utils/flags.h"

DSN_DECLARE_string(dup_cluster_name);
DSN_DECLARE_bool(dup_ignore_other_cluster_ids);

namespace dsn {
namespace replication {

TEST(duplication_common, get_duplication_cluster_id)
{
ASSERT_EQ(get_duplication_cluster_id("master-cluster").get_value(), 1);
ASSERT_EQ(get_duplication_cluster_id("slave-cluster").get_value(), 2);
ASSERT_EQ(1, get_duplication_cluster_id("master-cluster").get_value());
ASSERT_EQ(2, get_duplication_cluster_id("slave-cluster").get_value());

ASSERT_EQ(ERR_INVALID_PARAMETERS, get_duplication_cluster_id("").get_error().code());
ASSERT_EQ(ERR_OBJECT_NOT_FOUND, get_duplication_cluster_id("unknown").get_error().code());
}

TEST(duplication_common, get_current_dup_cluster_id)
{
ASSERT_EQ(1, get_current_dup_cluster_id());

ASSERT_EQ(get_duplication_cluster_id("").get_error().code(), ERR_INVALID_PARAMETERS);
ASSERT_EQ(get_duplication_cluster_id("unknown").get_error().code(), ERR_OBJECT_NOT_FOUND);
// Current cluster id is static, thus updating dup cluster name should never change
// current cluster id.
PRESERVE_FLAG(dup_cluster_name);
FLAGS_dup_cluster_name = "slave-cluster";
ASSERT_EQ(1, get_current_dup_cluster_id());
}

TEST(duplication_common, get_distinct_cluster_id_set)
{
ASSERT_EQ(get_distinct_cluster_id_set(), std::set<uint8_t>({1, 2}));
ASSERT_EQ(std::set<uint8_t>({1, 2}), get_distinct_cluster_id_set());
}

TEST(duplication_common, is_dup_cluster_id_configured)
{
ASSERT_FALSE(is_dup_cluster_id_configured(0));
ASSERT_TRUE(is_dup_cluster_id_configured(1));
ASSERT_TRUE(is_dup_cluster_id_configured(2));
ASSERT_FALSE(is_dup_cluster_id_configured(3));
}

TEST(duplication_common, dup_ignore_other_cluster_ids)
{
PRESERVE_FLAG(dup_ignore_other_cluster_ids);
FLAGS_dup_ignore_other_cluster_ids = true;

for (uint8_t id = 0; id < 4; ++id) {
ASSERT_TRUE(is_dup_cluster_id_configured(id));
}
}

} // namespace replication
Expand Down
19 changes: 12 additions & 7 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
#include "utils/zlocks.h"

DSN_DECLARE_bool(dup_ignore_other_cluster_ids);

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -209,13 +212,15 @@ void meta_duplication_service::add_duplication(duplication_add_rpc rpc)
ERR_INVALID_PARAMETERS,
"illegal operation: adding duplication to itself");

auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
response,
ERR_INVALID_PARAMETERS,
"get_duplication_cluster_id({}) failed, error: {}",
request.remote_cluster_name,
remote_cluster_id.get_error());
if (!FLAGS_dup_ignore_other_cluster_ids) {
auto remote_cluster_id = get_duplication_cluster_id(request.remote_cluster_name);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
response,
ERR_INVALID_PARAMETERS,
"get_duplication_cluster_id({}) failed, error: {}",
request.remote_cluster_name,
remote_cluster_id.get_error());
}

std::vector<host_port> meta_list;
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
Expand Down
27 changes: 22 additions & 5 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <vector>

#include "client_lib/pegasus_client_impl.h"
#include "common/common.h"
#include "common/duplication_common.h"
#include "duplication_internal_types.h"
#include "pegasus/client.h"
Expand All @@ -40,7 +41,6 @@
#include "rrdb/rrdb_types.h"
#include "runtime/message_utils.h"
#include "runtime/rpc/rpc_message.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
Expand All @@ -50,6 +50,8 @@
#include "utils/fmt_logging.h"
#include "utils/rand.h"

DSN_DECLARE_bool(dup_ignore_other_cluster_ids);

METRIC_DEFINE_counter(replica,
dup_shipped_successful_requests,
dsn::metric_unit::kRequests,
Expand Down Expand Up @@ -127,6 +129,19 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli
pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data());
_client = static_cast<client::pegasus_client_impl *>(client);

CHECK_STRNE_PREFIX_MSG(dsn::get_current_dup_cluster_name(),
remote_cluster.data(),
"remote cluster should not be myself: {}",
remote_cluster);

if (FLAGS_dup_ignore_other_cluster_ids) {
LOG_INFO_PREFIX("initialize mutation duplicator for local cluster [id:{}], "
"remote cluster [id:ignored, addr:{}]",
dsn::replication::get_current_dup_cluster_id(),
remote_cluster);
return;
}

auto ret = dsn::replication::get_duplication_cluster_id(remote_cluster.data());
CHECK_PREFIX_MSG(ret.is_ok(), // never possible, meta server disallows such remote_cluster.
"invalid remote cluster: {}, err_ret: {}",
Expand All @@ -136,13 +151,15 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli

LOG_INFO_PREFIX("initialize mutation duplicator for local cluster [id:{}], "
"remote cluster [id:{}, addr:{}]",
get_current_cluster_id(),
dsn::replication::get_current_dup_cluster_id(),
_remote_cluster_id,
remote_cluster);

// never possible to duplicate data to itself
CHECK_NE_PREFIX_MSG(
get_current_cluster_id(), _remote_cluster_id, "invalid remote cluster: {}", remote_cluster);
CHECK_NE_PREFIX_MSG(dsn::replication::get_current_dup_cluster_id(),
_remote_cluster_id,
"invalid remote cluster: {}",
remote_cluster);
}

void pegasus_mutation_duplicator::send(uint64_t hash, callback cb)
Expand Down Expand Up @@ -237,7 +254,7 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
entry.__set_raw_message(raw_message);
entry.__set_task_code(rpc_code);
entry.__set_timestamp(std::get<0>(mut));
entry.__set_cluster_id(get_current_cluster_id());
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
batch_request->entries.emplace_back(std::move(entry));
batch_bytes += raw_message.length();
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,12 @@ int pegasus_write_service::duplicate(int64_t decree,
{
// Verifies the cluster_id.
for (const auto &request : requests.entries) {
if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
if (!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("request cluster id is unconfigured");
return empty_put(decree);
}
if (request.cluster_id == get_current_cluster_id()) {
if (request.cluster_id == dsn::replication::get_current_dup_cluster_id()) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("self-duplicating");
return empty_put(decree);
Expand Down
Loading

0 comments on commit 0ee404e

Please sign in to comment.