diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index cc49165f98..c2f549a841 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -181,11 +181,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) }; if (ent.__isset.progress) { - nlohmann::json sub_json; - for (const auto &p : ent.progress) { - sub_json[std::to_string(p.first)] = p.second; + nlohmann::json progress; + for (const auto &[partition_id, state] : ent.progress) { + progress[std::to_string(partition_id)] = state; } - json["progress"] = sub_json; + + json["progress"] = progress; } if (ent.__isset.remote_app_name) { @@ -198,6 +199,19 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) json["remote_replica_count"] = ent.remote_replica_count; } + if (ent.__isset.partition_states) { + nlohmann::json partition_states; + for (const auto &[partition_id, state] : ent.partition_states) { + nlohmann::json partition_state; + partition_state["confirmed_decree"] = state.confirmed_decree; + partition_state["last_committed_decree"] = state.last_committed_decree; + + partition_states[std::to_string(partition_id)] = partition_state; + } + + json["partition_states"] = partition_states; + } + return json; } diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 9c032d55f6..83f967e10e 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -221,7 +221,7 @@ void duplication_info::persist_status() std::string duplication_info::to_string() const { - return duplication_entry_to_string(to_duplication_entry(duplication_entry_type::kPartitionLevelList)); + return duplication_entry_to_string(to_partition_level_entry_for_list()); } blob duplication_info::to_json_blob() const @@ -299,7 +299,7 @@ void duplication_info::append_if_valid_for_query( { zauto_read_lock l(_lock); - entry_list.emplace_back(to_duplication_entry(duplication_entry_type::kDuplicationLevelInfo)); + entry_list.emplace_back(to_duplication_level_entry()); duplication_entry &ent = entry_list.back(); // the confirmed decree is not useful for displaying // the overall state of duplication diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 25ca7157ef..35d69eacdb 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -48,13 +48,6 @@ class duplication_info; using duplication_info_s_ptr = std::shared_ptr<duplication_info>; - enum class duplication_entry_type: int - { - kDuplicationLevelInfo, - kPartitionLevelSync, - kPartitionLevelList, - }; - /// This class is thread-safe. class duplication_info { @@ -160,7 +153,7 @@ class duplication_info void append_if_valid_for_query(const app_state &app, /*out*/ std::vector<duplication_entry> &entry_list) const; - duplication_entry to_duplication_entry(duplication_entry_type type) const + duplication_entry to_duplication_level_entry() const { duplication_entry entry; entry.dupid = id; @@ -171,21 +164,39 @@ class duplication_info entry.__set_remote_app_name(remote_app_name); entry.__set_remote_replica_count(remote_replica_count); - if (type == duplication_entry_type::kDuplicationLevelInfo) { - return entry; - } + return entry; + } + + duplication_entry to_partition_level_entry_for_sync() const + { + auto entry = to_duplication_level_entry(); - if (type == duplication_entry_type::kPartitionLevelSync) { entry.__isset.progress = true; - insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){ - entry.progress.emplace(partition_id, partition_state.stored_decree); - }, entry); + for (const auto &[partition_id, state] : _progress) { + if (!state.is_inited) { + continue; + } + + entry.progress.emplace(partition_id, state.stored_decree); } - else if (type == duplication_entry_type::kPartitionLevelList) { - entry.__isset.partition_states= true; - insert_into_entry([](int partition_id, const partition_progress &partition_state, duplication_entry& entry){ - entry.partition_states.emplace(partition_id, {partition_state.stored_decree, partition_state.last_committed_decree}); - }, entry); + + return entry; + } + + duplication_entry to_partition_level_entry_for_list() const + { + auto entry = to_duplication_level_entry(); + + entry.__isset.partition_states = true; + for (const auto &[partition_id, state] : _progress) { + if (!state.is_inited) { + continue; + } + + duplication_partition_state partition_state; + partition_state.confirmed_decree = state.stored_decree; + partition_state.last_committed_decree = state.last_committed_decree; + entry.partition_states.emplace(partition_id, partition_state); } return entry; @@ -227,17 +238,6 @@ class duplication_info // To json encoded string. std::string to_string() const; - void insert_into_entry(std::function<int, const partition_progress &, duplication_entry&> inserter, duplication_entry &entry) const - { - for (const auto &[partition_id, partition_state] : _progress) { - if (!partition_state.is_inited) { - continue; - } - - inserter(partition_id,partition_state, entry); - } - } - friend class duplication_info_test; friend class meta_duplication_service_test; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index 93e283218d..f0a620959b 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -425,7 +425,7 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc) } } - response.dup_map[app_id][dup_id] = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync); + response.dup_map[app_id][dup_id] = dup->to_partition_level_entry_for_sync(); // report progress periodically for each duplications dup->report_progress_if_time_up(); diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 34702bf51a..cc7c6e46ee 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -181,7 +181,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(duplication_status::DS_INIT, dup._status); ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); - auto dup_ent = dup.to_duplication_entry(duplication_entry_type::kPartitionLevelSync); + auto dup_ent = dup.to_partition_level_entry_for_sync(); ASSERT_EQ(0, dup_ent.progress.size()); ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name); ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count); diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index 02a5132f6d..ef398b08fa 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -233,7 +233,7 @@ class meta_duplication_service_test : public meta_test_base ASSERT_EQ(duplication_status::DS_INIT, dup->_status); ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status); - auto ent = dup->to_duplication_entry(duplication_entry_type::kPartitionLevelSync); + auto ent = dup->to_partition_level_entry_for_sync(); for (int j = 0; j < app->partition_count; j++) { ASSERT_EQ(invalid_decree, ent.progress[j]); } diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 1461e35104..0dcd0f9965 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -102,9 +102,9 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) { - // state is inconsistent with meta-server auto it = ent.progress.find(get_gpid().get_partition_index()); if (it == ent.progress.end()) { + // Inconsistent with the meta server. _duplications.erase(ent.dupid); return; } @@ -124,15 +124,15 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) } return; - } - - // update progress - duplication_progress newp = dup->progress().set_confirmed_decree(it->second); - CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok()); - dup->update_status_if_needed(next_status); - if (ent.__isset.fail_mode) { - dup->update_fail_mode(ent.fail_mode); - } + } + + // Update progress. + duplication_progress newp = dup->progress().set_confirmed_decree(it->second); + CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok()); + dup->update_status_if_needed(next_status); + if (ent.__isset.fail_mode) { + dup->update_fail_mode(ent.fail_mode); + } } decree replica_duplicator_manager::min_confirmed_decree() const