Skip to content

Commit

Permalink
feat(duplication): collect last committed decrees from primary replic…
Browse files Browse the repository at this point in the history
…as to meta server of the master cluster for duplication (apache#2159)

While a table is being migrated to another cluster by duplication, we want to check
if the migration is finished by decree. Since we already have confirmed decree for
the log entires duplicated to the follower table, we need to collect last committed
decrees from the primary replicas to the meta server of the master cluster. We would
compare both kinds of decrees to check whether the migration is finished.
 
Following configurations are newly added:
```diff
[replication]
+ dup_progress_min_update_period_ms = 5000
+ dup_progress_min_report_period_ms = 300000
```
  • Loading branch information
empiredan authored Dec 2, 2024
1 parent 32a982f commit d9f2600
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 78 deletions.
4 changes: 4 additions & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ struct duplication_confirm_entry
1:i32 dupid;
2:i64 confirmed_decree;
3:optional bool checkpoint_prepared = false;

// Last committed decree from the primary replica of each partition, collected by
// meta server and used to be compared with duplicating progress of follower table.
4:optional i64 last_committed_decree;
}

// This is an internal RPC sent from replica server to meta.
Expand Down
80 changes: 61 additions & 19 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@
#include "common/duplication_common.h"
#include "meta/meta_data.h"
#include "runtime/api_layer1.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"

namespace dsn {
namespace replication {
DSN_DEFINE_uint64(replication,
dup_progress_min_update_period_ms,
5000,
"The minimum period in milliseconds that progress of duplication is updated");

DSN_DEFINE_uint64(replication,
dup_progress_min_report_period_ms,
5ULL * 60 * 1000,
"The minimum period in milliseconds that progress of duplication is reported");

namespace dsn::replication {

/*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_status::type &s)
{
Expand Down Expand Up @@ -116,8 +126,13 @@ void duplication_info::init_progress(int partition_index, decree d)
zauto_write_lock l(_lock);

auto &p = _progress[partition_index];

p.last_committed_decree = invalid_decree;
p.volatile_decree = p.stored_decree = d;
p.is_altering = false;
p.last_progress_update_ms = 0;
p.is_inited = true;
p.checkpoint_prepared = false;
}

bool duplication_info::alter_progress(int partition_index,
Expand All @@ -126,9 +141,18 @@ bool duplication_info::alter_progress(int partition_index,
zauto_write_lock l(_lock);

partition_progress &p = _progress[partition_index];

// last_committed_decree could be update at any time no matter whether progress is
// initialized or busy updating, since it is not persisted to remote meta storage.
// It is just collected from the primary replica of each partition.
if (confirm_entry.__isset.last_committed_decree) {
p.last_committed_decree = confirm_entry.last_committed_decree;
}

if (!p.is_inited) {
return false;
}

if (p.is_altering) {
return false;
}
Expand All @@ -137,15 +161,19 @@ bool duplication_info::alter_progress(int partition_index,
if (p.volatile_decree < confirm_entry.confirmed_decree) {
p.volatile_decree = confirm_entry.confirmed_decree;
}
if (p.volatile_decree != p.stored_decree) {
// progress update is not supposed to be too frequent.
if (dsn_now_ms() > p.last_progress_update_ms + PROGRESS_UPDATE_PERIOD_MS) {
p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}

if (p.volatile_decree == p.stored_decree) {
return false;
}

// Progress update is not supposed to be too frequent.
if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) {
return false;
}
return false;

p.is_altering = true;
p.last_progress_update_ms = dsn_now_ms();
return true;
}

void duplication_info::persist_progress(int partition_index)
Expand All @@ -163,13 +191,26 @@ void duplication_info::persist_status()
zauto_write_lock l(_lock);

if (!_is_altering) {
LOG_ERROR_PREFIX("callers never write a duplication that is not altering to meta store");
LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, "
"next_status={}, master_app_id={}, master_app_name={}, "
"follower_cluster_name={}, follower_app_name={}",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id,
app_name,
remote_cluster_name,
remote_app_name);
return;
}
LOG_INFO_PREFIX("change duplication status from {} to {} successfully [app_id: {}]",

LOG_INFO_PREFIX("change duplication status from {} to {} successfully: master_app_id={}, "
"master_app_name={}, follower_cluster_name={}, follower_app_name={}",
duplication_status_to_string(_status),
duplication_status_to_string(_next_status),
app_id);
app_id,
app_name,
remote_cluster_name,
remote_app_name);

_is_altering = false;
_status = _next_status;
Expand Down Expand Up @@ -197,11 +238,13 @@ blob duplication_info::to_json_blob() const

void duplication_info::report_progress_if_time_up()
{
// progress report is not supposed to be too frequent.
if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) {
_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
// Progress report is not supposed to be too frequent.
if (dsn_now_ms() < _last_progress_report_ms + FLAGS_dup_progress_min_report_period_ms) {
return;
}

_last_progress_report_ms = dsn_now_ms();
LOG_INFO("duplication report: {}", to_string());
}

duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
Expand Down Expand Up @@ -263,5 +306,4 @@ void duplication_info::append_if_valid_for_query(
ent.__isset.progress = false;
}

} // namespace replication
} // namespace dsn
} // namespace dsn::replication
8 changes: 5 additions & 3 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,15 @@ class duplication_info

mutable zrwlock_nr _lock;

static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000; // 5s
static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min

struct partition_progress
{
// Last committed decree collected from the primary replica of each partition.
// Not persisted to remote meta storage.
int64_t last_committed_decree{invalid_decree};

int64_t volatile_decree{invalid_decree};
int64_t stored_decree{invalid_decree};

bool is_altering{false};
uint64_t last_progress_update_ms{0};
bool is_inited{false};
Expand Down
38 changes: 22 additions & 16 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,31 +791,36 @@ void meta_duplication_service::do_update_partition_confirmed(
int32_t partition_idx,
const duplication_confirm_entry &confirm_entry)
{
if (dup->alter_progress(partition_idx, confirm_entry)) {
std::string path = get_partition_path(dup, std::to_string(partition_idx));
blob value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
if (!dup->alter_progress(partition_idx, confirm_entry)) {
return;
}

const auto &path = get_partition_path(dup, std::to_string(partition_idx));

_meta_svc->get_meta_storage()->get_data(
path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable {
auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));

_meta_svc->get_meta_storage()->get_data(std::string(path), [=](const blob &data) mutable {
if (data.length() == 0) {
if (data.empty()) {
_meta_svc->get_meta_storage()->create_node(
std::string(path), std::move(value), [=]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
} else {
_meta_svc->get_meta_storage()->set_data(
std::string(path), std::move(value), [=]() mutable {
path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});
return;
}

_meta_svc->get_meta_storage()->set_data(
path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable {
dup->persist_progress(partition_idx);
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
confirm_entry.confirmed_decree;
});

// duplication_sync_rpc will finally be replied when confirmed points
// of all partitions are stored.
});
}
}

std::shared_ptr<duplication_info>
Expand Down Expand Up @@ -908,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress(
std::move(partition_path), [dup, partition_idx](const blob &value) {
// value is confirmed_decree encoded in string.

if (value.size() == 0) {
if (value.empty()) {
// not found
dup->init_progress(partition_idx, invalid_decree);
return;
Expand Down Expand Up @@ -953,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id,
app->max_replica_count,
store_path,
json);
if (nullptr == dup) {
if (!dup) {
LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path);
return; // fail fast
}

if (!dup->is_invalid_status()) {
app->duplications[dup->id] = dup;
refresh_duplicating_no_lock(app);
Expand Down
20 changes: 19 additions & 1 deletion src/meta/meta_state_service_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include <functional>
#include <queue>
#include <string>
#include <utility>
#include <vector>

#include "utils/blob.h"

namespace dsn {
class blob;
class task_tracker;

namespace dist {
class meta_state_service;
} // namespace dist
Expand Down Expand Up @@ -57,16 +60,31 @@ struct meta_storage

void create_node(std::string &&node, blob &&value, std::function<void()> &&cb);

void create_node(const std::string &node, blob &&value, std::function<void()> &&cb)
{
create_node(std::string(node), std::move(value), std::move(cb));
}

void delete_node_recursively(std::string &&node, std::function<void()> &&cb);

void delete_node(std::string &&node, std::function<void()> &&cb);

/// Will fatal if node doesn't exists.
void set_data(std::string &&node, blob &&value, std::function<void()> &&cb);

void set_data(const std::string &node, blob &&value, std::function<void()> &&cb)
{
set_data(std::string(node), std::move(value), std::move(cb));
}

/// If node does not exist, cb will receive an empty blob.
void get_data(std::string &&node, std::function<void(const blob &)> &&cb);

void get_data(const std::string &node, std::function<void(const blob &)> &&cb)
{
get_data(std::string(node), std::move(cb));
}

/// \param cb: void (bool node_exists, const std::vector<std::string> &children)
/// `children` contains the name (not full path) of children nodes.
/// `node_exists` indicates whether this node exists.
Expand Down
9 changes: 5 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,10 @@ void server_state::create_app(dsn::message_ex *msg)
request.options.partition_count,
request.options.replica_count,
duplicating
? fmt::format("{}.{}",
request.options.envs[duplication_constants::kEnvMasterClusterKey],
request.app_name)
? fmt::format("master_cluster_name={}, master_app_name={}",
master_cluster->second,
gutil::FindWithDefault(request.options.envs,
duplication_constants::kEnvMasterAppNameKey))
: "false");

auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) {
Expand Down Expand Up @@ -1162,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg)
zauto_write_lock l(_lock);

auto app = get_app(request.app_name);
if (nullptr != app) {
if (app) {
configuration_create_app_response response;

switch (app->status) {
Expand Down
Loading

0 comments on commit d9f2600

Please sign in to comment.