Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 3, 2024
1 parent f641578 commit c93f286
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 50 deletions.
22 changes: 18 additions & 4 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
};

if (ent.__isset.progress) {
nlohmann::json sub_json;
for (const auto &p : ent.progress) {
sub_json[std::to_string(p.first)] = p.second;
nlohmann::json progress;
for (const auto &[partition_id, state] : ent.progress) {
progress[std::to_string(partition_id)] = state;
}
json["progress"] = sub_json;

json["progress"] = progress;
}

if (ent.__isset.remote_app_name) {
Expand All @@ -198,6 +199,19 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
json["remote_replica_count"] = ent.remote_replica_count;
}

if (ent.__isset.partition_states) {
nlohmann::json partition_states;
for (const auto &[partition_id, state] : ent.partition_states) {
nlohmann::json partition_state;
partition_state["confirmed_decree"] = state.confirmed_decree;
partition_state["last_committed_decree"] = state.last_committed_decree;

partition_states[std::to_string(partition_id)] = partition_state;
}

json["partition_states"] = partition_states;
}

return json;
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void duplication_info::persist_status()

std::string duplication_info::to_string() const
{
return duplication_entry_to_string(to_duplication_entry(duplication_entry_type::kPartitionLevelList));
return duplication_entry_to_string(to_partition_level_entry_for_list());
}

blob duplication_info::to_json_blob() const
Expand Down Expand Up @@ -299,7 +299,7 @@ void duplication_info::append_if_valid_for_query(
{
zauto_read_lock l(_lock);

entry_list.emplace_back(to_duplication_entry(duplication_entry_type::kDuplicationLevelInfo));
entry_list.emplace_back(to_duplication_level_entry());
duplication_entry &ent = entry_list.back();
// the confirmed decree is not useful for displaying
// the overall state of duplication
Expand Down
62 changes: 31 additions & 31 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ class duplication_info;

using duplication_info_s_ptr = std::shared_ptr<duplication_info>;

enum class duplication_entry_type: int
{
kDuplicationLevelInfo,
kPartitionLevelSync,
kPartitionLevelList,
};

/// This class is thread-safe.
class duplication_info
{
Expand Down Expand Up @@ -160,7 +153,7 @@ class duplication_info
void append_if_valid_for_query(const app_state &app,
/*out*/ std::vector<duplication_entry> &entry_list) const;

duplication_entry to_duplication_entry(duplication_entry_type type) const
duplication_entry to_duplication_level_entry() const
{
duplication_entry entry;
entry.dupid = id;
Expand All @@ -171,21 +164,39 @@ class duplication_info
entry.__set_remote_app_name(remote_app_name);
entry.__set_remote_replica_count(remote_replica_count);

if (type == duplication_entry_type::kDuplicationLevelInfo) {
return entry;
}
return entry;
}

duplication_entry to_partition_level_entry_for_sync() const
{
auto entry = to_duplication_level_entry();

if (type == duplication_entry_type::kPartitionLevelSync) {
entry.__isset.progress = true;
insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){
entry.progress.emplace(partition_id, partition_state.stored_decree);
}, entry);
for (const auto &[partition_id, state] : _progress) {
if (!state.is_inited) {
continue;
}

entry.progress.emplace(partition_id, state.stored_decree);
}
else if (type == duplication_entry_type::kPartitionLevelList) {
entry.__isset.partition_states= true;
insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){
entry.partition_states.emplace(partition_id, {partition_state.stored_decree, partition_state.last_committed_decree});
}, entry);

return entry;
}

duplication_entry to_partition_level_entry_for_list() const
{
auto entry = to_duplication_level_entry();

entry.__isset.partition_states = true;
for (const auto &[partition_id, state] : _progress) {
if (!state.is_inited) {
continue;
}

duplication_partition_state partition_state;
partition_state.confirmed_decree = state.stored_decree;
partition_state.last_committed_decree = state.last_committed_decree;
entry.partition_states.emplace(partition_id, partition_state);
}

return entry;
Expand Down Expand Up @@ -227,17 +238,6 @@ class duplication_info
// To json encoded string.
std::string to_string() const;

void insert_into_entry(std::function<int, const partition_progress &, duplication_entry&> inserter, duplication_entry &entry) const
{
for (const auto &[partition_id, partition_state] : _progress) {
if (!partition_state.is_inited) {
continue;
}

inserter(partition_id,partition_state, entry);
}
}

friend class duplication_info_test;
friend class meta_duplication_service_test;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
}
}

response.dup_map[app_id][dup_id] = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync);
response.dup_map[app_id][dup_id] = dup->to_partition_level_entry_for_sync();

// report progress periodically for each duplications
dup->report_progress_if_time_up();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/duplication_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class duplication_info_test : public testing::Test
ASSERT_EQ(duplication_status::DS_INIT, dup._status);
ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);

auto dup_ent = dup.to_duplication_entry(duplication_entry_type::kPartitionLevelSync);
auto dup_ent = dup.to_partition_level_entry_for_sync();
ASSERT_EQ(0, dup_ent.progress.size());
ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/meta_duplication_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class meta_duplication_service_test : public meta_test_base
ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);

auto ent = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync);
auto ent = dup->to_partition_level_entry_for_sync();
for (int j = 0; j < app->partition_count; j++) {
ASSERT_EQ(invalid_decree, ent.progress[j]);
}
Expand Down
20 changes: 10 additions & 10 deletions src/replica/duplication/replica_duplicator_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const

void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
{
// state is inconsistent with meta-server
auto it = ent.progress.find(get_gpid().get_partition_index());
if (it == ent.progress.end()) {
// Inconsistent with the meta server.
_duplications.erase(ent.dupid);
return;
}
Expand All @@ -124,15 +124,15 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
}

return;
}

// update progress
duplication_progress newp = dup->progress().set_confirmed_decree(it->second);
CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok());
dup->update_status_if_needed(next_status);
if (ent.__isset.fail_mode) {
dup->update_fail_mode(ent.fail_mode);
}
}

// Update progress.
duplication_progress newp = dup->progress().set_confirmed_decree(it->second);
CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok());
dup->update_status_if_needed(next_status);
if (ent.__isset.fail_mode) {
dup->update_fail_mode(ent.fail_mode);
}
}

decree replica_duplicator_manager::min_confirmed_decree() const
Expand Down

0 comments on commit c93f286

Please sign in to comment.