diff --git a/idl/duplication.thrift b/idl/duplication.thrift index 589cc5d085..604edd1d3b 100644 --- a/idl/duplication.thrift +++ b/idl/duplication.thrift @@ -22,6 +22,15 @@ namespace cpp dsn.replication namespace go admin namespace java org.apache.pegasus.replication +// Indicate which data of a table needs to be duplicated: +// * FULL: all of the data of the table needs to be duplicated. +// * INCREMENTAL: only incremental data of the table would be duplicated. +enum duplication_mode +{ + FULL = 0, + INCREMENTAL, +} + // - INIT -> PREPARE // - PREPARE -> APP // - APP -> LOG @@ -129,6 +138,16 @@ struct duplication_modify_response 2:i32 appid; } +// The states tracking each partition for duplication. +struct duplication_partition_state +{ + // The max decree of this partition that has been confirmed to be received by follower. + 1:i64 confirmed_decree; + + // The max decree that has been committed by this partition. + 2:i64 last_committed_decree; +} + struct duplication_entry { 1:i32 dupid; @@ -136,7 +155,8 @@ struct duplication_entry 3:string remote; 4:i64 create_ts; - // partition_index => confirmed decree + // Used for syncing duplications with partition-level progress (replica server -> meta server). + // partition index => confirmed decree. 5:optional map progress; 7:optional duplication_fail_mode fail_mode; @@ -150,6 +170,13 @@ struct duplication_entry // For versions >= v2.6.0, this could be specified by client. // For versions < v2.6.0, this must be the same with source replica_count. 9:optional i32 remote_replica_count; + + // TODO(wangdan): would be supported later. + 10:optional duplication_mode mode; + + // Used for listing duplications with partition-level details (client -> meta server). + // partition index => partition states. + 11:optional map partition_states; } // This request is sent from client to meta. diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index cc49165f98..a0eee52d38 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -181,11 +182,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) }; if (ent.__isset.progress) { - nlohmann::json sub_json; - for (const auto &p : ent.progress) { - sub_json[std::to_string(p.first)] = p.second; + nlohmann::json progress; + for (const auto &[partition_index, state] : ent.progress) { + progress[std::to_string(partition_index)] = state; } - json["progress"] = sub_json; + + json["progress"] = progress; } if (ent.__isset.remote_app_name) { @@ -198,6 +200,19 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) json["remote_replica_count"] = ent.remote_replica_count; } + if (ent.__isset.partition_states) { + nlohmann::json partition_states; + for (const auto &[partition_index, state] : ent.partition_states) { + nlohmann::json partition_state; + partition_state["confirmed_decree"] = state.confirmed_decree; + partition_state["last_committed_decree"] = state.last_committed_decree; + + partition_states[std::to_string(partition_index)] = partition_state; + } + + json["partition_states"] = partition_states; + } + return json; } diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 8f609c7b20..5aaa21a9c8 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -18,7 +18,6 @@ #include "duplication_info.h" #include "common/duplication_common.h" -#include "meta/meta_data.h" #include "runtime/api_layer1.h" #include "utils/flags.h" #include "utils/fmt_logging.h" @@ -221,7 +220,7 @@ void duplication_info::persist_status() std::string duplication_info::to_string() const { - return duplication_entry_to_string(to_duplication_entry()); + return duplication_entry_to_string(to_partition_level_entry_for_list()); } blob duplication_info::to_json_blob() const @@ -293,17 +292,11 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, return dup; } -void duplication_info::append_if_valid_for_query( - const app_state &app, - /*out*/ std::vector &entry_list) const +void duplication_info::append_as_entry(std::vector &entry_list) const { zauto_read_lock l(_lock); - entry_list.emplace_back(to_duplication_entry()); - duplication_entry &ent = entry_list.back(); - // the confirmed decree is not useful for displaying - // the overall state of duplication - ent.__isset.progress = false; + entry_list.emplace_back(to_duplication_level_entry()); } } // namespace dsn::replication diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 7563d3d411..2b7e9dd03e 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -38,10 +39,7 @@ #include "utils/fmt_utils.h" #include "utils/zlocks.h" -namespace dsn { -namespace replication { - -class app_state; +namespace dsn::replication { class duplication_info; @@ -149,10 +147,10 @@ class duplication_info // duplication_query_rpc is handled in THREAD_POOL_META_SERVER, // which is not thread safe for read. - void append_if_valid_for_query(const app_state &app, - /*out*/ std::vector &entry_list) const; + void append_as_entry(std::vector &entry_list) const; - duplication_entry to_duplication_entry() const + // Build an entry including only duplication-level info. + duplication_entry to_duplication_level_entry() const { duplication_entry entry; entry.dupid = id; @@ -162,13 +160,47 @@ class duplication_info entry.__set_fail_mode(_fail_mode); entry.__set_remote_app_name(remote_app_name); entry.__set_remote_replica_count(remote_replica_count); + + return entry; + } + + // Build an entry including also partition-level progress used for sync besides + // duplication-level info. + duplication_entry to_partition_level_entry_for_sync() const + { + auto entry = to_duplication_level_entry(); + entry.__isset.progress = true; - for (const auto &kv : _progress) { - if (!kv.second.is_inited) { + for (const auto &[partition_index, state] : _progress) { + if (!state.is_inited) { + continue; + } + + entry.progress.emplace(partition_index, state.stored_decree); + } + + return entry; + } + + // Build an entry including also partition-level detailed states used for list + // besides duplication-level info. + duplication_entry to_partition_level_entry_for_list() const + { + auto entry = to_duplication_level_entry(); + + entry.__isset.partition_states = true; + for (const auto &[partition_index, state] : _progress) { + if (!state.is_inited) { continue; } - entry.progress[kv.first] = kv.second.stored_decree; + + duplication_partition_state partition_state; + partition_state.confirmed_decree = state.stored_decree; + partition_state.last_committed_decree = state.last_committed_decree; + + entry.partition_states.emplace(partition_index, partition_state); } + return entry; } @@ -231,7 +263,7 @@ class duplication_info bool checkpoint_prepared{false}; }; - // partition_idx => progress + // partition_index => progress std::map _progress; uint64_t _last_progress_report_ms{0}; @@ -281,7 +313,6 @@ extern void json_encode(dsn::json::JsonWriter &out, const duplication_fail_mode: extern bool json_decode(const dsn::json::JsonObject &in, duplication_fail_mode::type &s); -} // namespace replication -} // namespace dsn +} // namespace dsn::replication USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::duplication_info); diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index f63d8af359..28650dc895 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -84,7 +84,7 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re response.appid = app->app_id; for (const auto &[_, dup] : app->duplications) { - dup->append_if_valid_for_query(*app, response.entry_list); + dup->append_as_entry(response.entry_list); } } } @@ -425,7 +425,7 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc) } } - response.dup_map[app_id][dup_id] = dup->to_duplication_entry(); + response.dup_map[app_id][dup_id] = dup->to_partition_level_entry_for_sync(); // report progress periodically for each duplications dup->report_progress_if_time_up(); diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 72bbcdbb15..a690d450d2 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -29,6 +29,7 @@ #include #include "gtest/gtest.h" +#include "gutil/map_util.h" #include "runtime/app_model.h" #include "test_util/test_util.h" #include "utils/flags.h" @@ -51,11 +52,46 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) + static void test_duplication_entry_for_sync(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree) { - dup.init_progress(partition_idx, expected_decree); + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(expected_confirmed_decree, gutil::FindOrDie(entry.progress, partition_index)); + } + + static void test_duplication_entry_for_list(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree, + decree expected_last_committed_decree) + { + const auto &entry = dup.to_partition_level_entry_for_list(); + ASSERT_TRUE(gutil::ContainsKey(entry.partition_states, partition_index)); + + const auto &state = gutil::FindOrDie(entry.partition_states, partition_index); + ASSERT_EQ(expected_confirmed_decree, state.confirmed_decree); + ASSERT_EQ(expected_last_committed_decree, state.last_committed_decree); + } + + static void test_duplication_entry(const duplication_info &dup, + int partition_index, + decree expected_confirmed_decree, + decree expected_last_committed_decree) + { + test_duplication_entry_for_sync(dup, partition_index, expected_confirmed_decree); + test_duplication_entry_for_list( + dup, partition_index, expected_confirmed_decree, expected_last_committed_decree); + } + + static void + test_init_progress(duplication_info &dup, int partition_index, decree expected_decree) + { + dup.init_progress(partition_index, expected_decree); + + ASSERT_TRUE(gutil::ContainsKey(dup._progress, partition_index)); - const auto &progress = dup._progress[partition_idx]; + const auto &progress = dup._progress[partition_index]; ASSERT_EQ(invalid_decree, progress.last_committed_decree); ASSERT_EQ(expected_decree, progress.volatile_decree); ASSERT_EQ(expected_decree, progress.stored_decree); @@ -63,6 +99,8 @@ class duplication_info_test : public testing::Test ASSERT_EQ(0, progress.last_progress_update_ms); ASSERT_TRUE(progress.is_inited); ASSERT_FALSE(progress.checkpoint_prepared); + + test_duplication_entry(dup, partition_index, expected_decree, invalid_decree); } static void test_alter_progress() @@ -96,6 +134,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, invalid_decree, 8); // Busy updating. entry.__set_last_committed_decree(15); @@ -109,6 +148,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, invalid_decree, 15); // Persist progress for partition 0. dup.persist_progress(0); @@ -118,6 +158,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + test_duplication_entry(dup, 0, 5, 15); // Initialize progress for partition 1. test_init_progress(dup, 1, 5); @@ -130,6 +171,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 5, 15); // Persist progress for partition 1. dup.persist_progress(1); @@ -148,6 +190,7 @@ class duplication_info_test : public testing::Test ASSERT_FALSE(dup._progress[1].is_altering); // checkpoint_prepared would be updated successfully even if it is too frequent. ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 10, 25); // Reduce last update timestamp to make it infrequent. dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; @@ -160,6 +203,7 @@ class duplication_info_test : public testing::Test ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + test_duplication_entry(dup, 1, 10, 26); // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); @@ -181,17 +225,24 @@ class duplication_info_test : public testing::Test ASSERT_EQ(duplication_status::DS_INIT, dup._status); ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); - auto dup_ent = dup.to_duplication_entry(); - ASSERT_EQ(0, dup_ent.progress.size()); - ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name); - ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count); + { + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_TRUE(entry.progress.empty()); + ASSERT_EQ(kTestRemoteAppName, entry.remote_app_name); + ASSERT_EQ(kTestRemoteReplicaCount, entry.remote_replica_count); + } - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 4; ++i) { dup.init_progress(i, invalid_decree); } - for (auto kv : dup_ent.progress) { - ASSERT_EQ(invalid_decree, kv.second); + { + const auto &entry = dup.to_partition_level_entry_for_sync(); + ASSERT_EQ(4, entry.progress.size()); + for (int partition_index = 0; partition_index < 4; ++partition_index) { + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index)); + } } dup.start(); diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index ec65a41fa0..5e0ebe6b19 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -46,6 +46,7 @@ #include "dsn.layer2_types.h" #include "duplication_types.h" #include "gtest/gtest.h" +#include "gutil/map_util.h" #include "http/http_server.h" #include "http/http_status_code.h" #include "meta/duplication/duplication_info.h" @@ -233,9 +234,12 @@ class meta_duplication_service_test : public meta_test_base ASSERT_EQ(duplication_status::DS_INIT, dup->_status); ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status); - auto ent = dup->to_duplication_entry(); - for (int j = 0; j < app->partition_count; j++) { - ASSERT_EQ(invalid_decree, ent.progress[j]); + const auto &entry = dup->to_partition_level_entry_for_sync(); + ASSERT_EQ(app->partition_count, entry.progress.size()); + for (int partition_index = 0; partition_index < app->partition_count; + ++partition_index) { + ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index)); + ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, partition_index)); } if (last_dup != 0) { diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 724f164d1a..5569554d76 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -102,9 +102,9 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) { - // state is inconsistent with meta-server auto it = ent.progress.find(get_gpid().get_partition_index()); if (it == ent.progress.end()) { + // Inconsistent with the meta server. _duplications.erase(ent.dupid); return; } @@ -114,22 +114,24 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) dupid_t dupid = ent.dupid; duplication_status::type next_status = ent.status; - replica_duplicator_u_ptr &dup = _duplications[dupid]; - if (dup == nullptr) { + auto &dup = _duplications[dupid]; + if (!dup) { if (!is_duplication_status_invalid(next_status)) { dup = std::make_unique(ent, _replica); } else { LOG_ERROR_PREFIX("illegal duplication status: {}", duplication_status_to_string(next_status)); } - } else { - // update progress - duplication_progress newp = dup->progress().set_confirmed_decree(it->second); - CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok()); - dup->update_status_if_needed(next_status); - if (ent.__isset.fail_mode) { - dup->update_fail_mode(ent.fail_mode); - } + + return; + } + + // Update progress. + CHECK_EQ_PREFIX(dup->update_progress(dup->progress().set_confirmed_decree(it->second)), + error_s::ok()); + dup->update_status_if_needed(next_status); + if (ent.__isset.fail_mode) { + dup->update_fail_mode(ent.fail_mode); } }