From d0fc506dc2d4733bc75e058b43a3f52b2083c12f Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 29 Nov 2024 15:26:39 +0800 Subject: [PATCH] add test for duplication info --- src/meta/duplication/duplication_info.cpp | 18 +++- src/meta/duplication/duplication_info.h | 7 +- .../duplication/meta_duplication_service.cpp | 5 +- src/meta/test/duplication_info_test.cpp | 91 ++++++++++++++++--- 4 files changed, 97 insertions(+), 24 deletions(-) diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index 377d9326bb..565be6992f 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -125,8 +125,13 @@ void duplication_info::init_progress(int partition_index, decree d) zauto_write_lock l(_lock); auto &p = _progress[partition_index]; + + p.last_committed_decree = invalid_decree; p.volatile_decree = p.stored_decree = d; + p.is_altering = false; + p.last_progress_update_ms = 0; p.is_inited = true; + p.checkpoint_prepared = false; } bool duplication_info::alter_progress(int partition_index, @@ -135,17 +140,22 @@ bool duplication_info::alter_progress(int partition_index, zauto_write_lock l(_lock); partition_progress &p = _progress[partition_index]; + + // last_committed_decree could be update at any time no matter whether progress is + // initialized or busy updating, since it is not persisted to remote meta storage. + // It is just collected from the primary replica of each partition. + if (confirm_entry.__isset.last_committed_decree) { + p.last_committed_decree = confirm_entry.last_committed_decree; + } + if (!p.is_inited) { return false; } + if (p.is_altering) { return false; } - if (confirm_entry.__isset.last_committed_decree) { - p.last_committed_decree = confirm_entry.last_committed_decree; - } - p.checkpoint_prepared = confirm_entry.checkpoint_prepared; if (p.volatile_decree < confirm_entry.confirmed_decree) { p.volatile_decree = confirm_entry.confirmed_decree; diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 962d9f54f0..d1503a8682 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -218,12 +218,13 @@ class duplication_info struct partition_progress { + // last committed decree collected from the primary replica of each partition. + // Not persisted to remote meta storage. + int64_t last_committed_decree{invalid_decree}; + int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; - // - int64_t last_committed_decree{invalid_decree}; - bool is_altering{false}; uint64_t last_progress_update_ms{0}; bool is_inited{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index f723ece5f1..f63d8af359 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -913,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress( std::move(partition_path), [dup, partition_idx](const blob &value) { // value is confirmed_decree encoded in string. - if (value.size() == 0) { + if (value.empty()) { // not found dup->init_progress(partition_idx, invalid_decree); return; @@ -958,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id, app->max_replica_count, store_path, json); - if (nullptr == dup) { + if (!dup) { LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path); return; // fail fast } + if (!dup->is_invalid_status()) { app->duplications[dup->id] = dup; refresh_duplicating_no_lock(app); diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 4e12bcb8c3..25b17212ff 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -30,6 +30,7 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" +#include "test_util/test_util.h" DSN_DECLARE_uint64(dup_progress_min_update_period_ms); @@ -49,9 +50,22 @@ class duplication_info_test : public testing::Test dup._status = status; } - static void test_alter_progress() + static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree) { + dup.init_progress(partition_idx, expected_decree); + + const auto &progress = dup._progress[partition_idx]; + ASSERT_EQ(invalid_decree, progress.last_committed_decree); + ASSERT_EQ(expected_decree, progress.volatile_decree); + ASSERT_EQ(expected_decree, progress.stored_decree); + ASSERT_FALSE(progress.is_altering); + ASSERT_EQ(0, progress.last_progress_update_ms); + ASSERT_TRUE(progress.is_inited); + ASSERT_FALSE(progress.checkpoint_prepared); + } + static void test_alter_progress() + { duplication_info dup(1, 1, kTestAppName, @@ -62,45 +76,91 @@ class duplication_info_test : public testing::Test kTestRemoteAppName, std::vector(), kTestMetaStorePath); - duplication_confirm_entry entry; - ASSERT_FALSE(dup.alter_progress(0, entry)); - dup.init_progress(0, invalid_decree); + // Failed to alter progres for partition 0 since it has not been initialized. + ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry())); + + // Initialize progress for partition 0. + test_init_progress(dup, 0, invalid_decree); + + // Alter progress with specified decrees for partition 0. + duplication_confirm_entry entry; + entry.__set_last_committed_decree(8); entry.confirmed_decree = 5; entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + ASSERT_EQ(8, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // busy updating + // Busy updating. + entry.__set_last_committed_decree(15); entry.confirmed_decree = 10; entry.checkpoint_prepared = false; ASSERT_FALSE(dup.alter_progress(0, entry)); - ASSERT_EQ(dup._progress[0].volatile_decree, 5); + + // last_committed_decree could be updated at any time. + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree); ASSERT_TRUE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); + // Persist progress for partition 0. dup.persist_progress(0); - ASSERT_EQ(dup._progress[0].stored_decree, 5); + + ASSERT_EQ(15, dup._progress[0].last_committed_decree); + ASSERT_EQ(5, dup._progress[0].volatile_decree); + ASSERT_EQ(5, dup._progress[0].stored_decree); ASSERT_FALSE(dup._progress[0].is_altering); ASSERT_TRUE(dup._progress[0].checkpoint_prepared); - // too frequent to update - dup.init_progress(1, invalid_decree); + // Initialize progress for partition 1. + test_init_progress(dup, 1, 5); + + // Alter progress for partition 1. ASSERT_TRUE(dup.alter_progress(1, entry)); + + ASSERT_EQ(15, dup._progress[1].last_committed_decree); + ASSERT_EQ(10, dup._progress[1].volatile_decree); + ASSERT_EQ(5, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_FALSE(dup._progress[1].checkpoint_prepared); + + // Persist progress for partition 1. dup.persist_progress(1); + // It is too frequent to alter progress. + PRESERVE_FLAG(dup_progress_min_update_period_ms); + FLAGS_dup_progress_min_update_period_ms = 10000; + entry.__set_last_committed_decree(25); + entry.confirmed_decree = 15; + entry.checkpoint_prepared = true; ASSERT_FALSE(dup.alter_progress(1, entry)); + ASSERT_EQ(25, dup._progress[1].last_committed_decree); + // volatile_decree would be updated successfully even if it is too frequent. + ASSERT_EQ(15, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_FALSE(dup._progress[1].is_altering); + // checkpoint_prepared would be updated successfully even if it is too frequent. + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + // Reduce last update timestamp to make it infrequent. dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; + entry.__set_last_committed_decree(26); + entry.confirmed_decree = 25; - entry.confirmed_decree = 15; - entry.checkpoint_prepared = true; ASSERT_TRUE(dup.alter_progress(1, entry)); + ASSERT_EQ(26, dup._progress[1].last_committed_decree); + ASSERT_EQ(25, dup._progress[1].volatile_decree); + ASSERT_EQ(10, dup._progress[1].stored_decree); ASSERT_TRUE(dup._progress[1].is_altering); + ASSERT_TRUE(dup._progress[1].checkpoint_prepared); + + // Checkpoint are ready for both partition 0 and 1. ASSERT_TRUE(dup.all_checkpoint_has_prepared()); } @@ -128,8 +188,9 @@ class duplication_info_test : public testing::Test for (int i = 0; i < 4; i++) { dup.init_progress(i, invalid_decree); } + for (auto kv : dup_ent.progress) { - ASSERT_EQ(kv.second, invalid_decree); + ASSERT_EQ(invalid_decree, kv.second); } dup.start(); @@ -153,8 +214,8 @@ class duplication_info_test : public testing::Test dup.start(); dup.persist_status(); - ASSERT_EQ(dup._status, duplication_status::DS_PREPARE); - ASSERT_EQ(dup._next_status, duplication_status::DS_INIT); + ASSERT_EQ(duplication_status::DS_PREPARE, dup._status); + ASSERT_EQ(duplication_status::DS_INIT, dup._next_status); ASSERT_FALSE(dup.is_altering()); }