Skip to content

Commit

Permalink
add test for duplication info
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Nov 29, 2024
1 parent f2f7292 commit d0fc506
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 24 deletions.
18 changes: 14 additions & 4 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,13 @@ void duplication_info::init_progress(int partition_index, decree d)
zauto_write_lock l(_lock);

auto &p = _progress[partition_index];

p.last_committed_decree = invalid_decree;
p.volatile_decree = p.stored_decree = d;
p.is_altering = false;
p.last_progress_update_ms = 0;
p.is_inited = true;
p.checkpoint_prepared = false;
}

bool duplication_info::alter_progress(int partition_index,
Expand All @@ -135,17 +140,22 @@ bool duplication_info::alter_progress(int partition_index,
zauto_write_lock l(_lock);

partition_progress &p = _progress[partition_index];

// last_committed_decree could be update at any time no matter whether progress is
// initialized or busy updating, since it is not persisted to remote meta storage.
// It is just collected from the primary replica of each partition.
if (confirm_entry.__isset.last_committed_decree) {
p.last_committed_decree = confirm_entry.last_committed_decree;
}

if (!p.is_inited) {
return false;
}

if (p.is_altering) {
return false;
}

if (confirm_entry.__isset.last_committed_decree) {
p.last_committed_decree = confirm_entry.last_committed_decree;
}

p.checkpoint_prepared = confirm_entry.checkpoint_prepared;
if (p.volatile_decree < confirm_entry.confirmed_decree) {
p.volatile_decree = confirm_entry.confirmed_decree;
Expand Down
7 changes: 4 additions & 3 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,13 @@ class duplication_info

struct partition_progress
{
// last committed decree collected from the primary replica of each partition.
// Not persisted to remote meta storage.
int64_t last_committed_decree{invalid_decree};

int64_t volatile_decree{invalid_decree};
int64_t stored_decree{invalid_decree};

//
int64_t last_committed_decree{invalid_decree};

bool is_altering{false};
uint64_t last_progress_update_ms{0};
bool is_inited{false};
Expand Down
5 changes: 3 additions & 2 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ void meta_duplication_service::do_restore_duplication_progress(
std::move(partition_path), [dup, partition_idx](const blob &value) {
// value is confirmed_decree encoded in string.

if (value.size() == 0) {
if (value.empty()) {
// not found
dup->init_progress(partition_idx, invalid_decree);
return;
Expand Down Expand Up @@ -958,10 +958,11 @@ void meta_duplication_service::do_restore_duplication(dupid_t dup_id,
app->max_replica_count,
store_path,
json);
if (nullptr == dup) {
if (!dup) {
LOG_ERROR("failed to decode json \"{}\" on path {}", json, store_path);
return; // fail fast
}

if (!dup->is_invalid_status()) {
app->duplications[dup->id] = dup;
refresh_duplicating_no_lock(app);
Expand Down
91 changes: 76 additions & 15 deletions src/meta/test/duplication_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "gtest/gtest.h"
#include "runtime/app_model.h"
#include "test_util/test_util.h"

DSN_DECLARE_uint64(dup_progress_min_update_period_ms);

Expand All @@ -49,9 +50,22 @@ class duplication_info_test : public testing::Test
dup._status = status;
}

static void test_alter_progress()
static void test_init_progress(duplication_info &dup, int partition_idx, decree expected_decree)
{
dup.init_progress(partition_idx, expected_decree);

const auto &progress = dup._progress[partition_idx];
ASSERT_EQ(invalid_decree, progress.last_committed_decree);
ASSERT_EQ(expected_decree, progress.volatile_decree);
ASSERT_EQ(expected_decree, progress.stored_decree);
ASSERT_FALSE(progress.is_altering);
ASSERT_EQ(0, progress.last_progress_update_ms);
ASSERT_TRUE(progress.is_inited);
ASSERT_FALSE(progress.checkpoint_prepared);
}

static void test_alter_progress()
{
duplication_info dup(1,
1,
kTestAppName,
Expand All @@ -62,45 +76,91 @@ class duplication_info_test : public testing::Test
kTestRemoteAppName,
std::vector<host_port>(),
kTestMetaStorePath);
duplication_confirm_entry entry;
ASSERT_FALSE(dup.alter_progress(0, entry));

dup.init_progress(0, invalid_decree);
// Failed to alter progres for partition 0 since it has not been initialized.
ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry()));

// Initialize progress for partition 0.
test_init_progress(dup, 0, invalid_decree);

// Alter progress with specified decrees for partition 0.
duplication_confirm_entry entry;
entry.__set_last_committed_decree(8);
entry.confirmed_decree = 5;
entry.checkpoint_prepared = true;
ASSERT_TRUE(dup.alter_progress(0, entry));
ASSERT_EQ(dup._progress[0].volatile_decree, 5);

ASSERT_EQ(8, dup._progress[0].last_committed_decree);
ASSERT_EQ(5, dup._progress[0].volatile_decree);
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);

// busy updating
// Busy updating.
entry.__set_last_committed_decree(15);
entry.confirmed_decree = 10;
entry.checkpoint_prepared = false;
ASSERT_FALSE(dup.alter_progress(0, entry));
ASSERT_EQ(dup._progress[0].volatile_decree, 5);

// last_committed_decree could be updated at any time.
ASSERT_EQ(15, dup._progress[0].last_committed_decree);
ASSERT_EQ(5, dup._progress[0].volatile_decree);
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);

// Persist progress for partition 0.
dup.persist_progress(0);
ASSERT_EQ(dup._progress[0].stored_decree, 5);

ASSERT_EQ(15, dup._progress[0].last_committed_decree);
ASSERT_EQ(5, dup._progress[0].volatile_decree);
ASSERT_EQ(5, dup._progress[0].stored_decree);
ASSERT_FALSE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);

// too frequent to update
dup.init_progress(1, invalid_decree);
// Initialize progress for partition 1.
test_init_progress(dup, 1, 5);

// Alter progress for partition 1.
ASSERT_TRUE(dup.alter_progress(1, entry));

ASSERT_EQ(15, dup._progress[1].last_committed_decree);
ASSERT_EQ(10, dup._progress[1].volatile_decree);
ASSERT_EQ(5, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_FALSE(dup._progress[1].checkpoint_prepared);

// Persist progress for partition 1.
dup.persist_progress(1);

// It is too frequent to alter progress.
PRESERVE_FLAG(dup_progress_min_update_period_ms);
FLAGS_dup_progress_min_update_period_ms = 10000;
entry.__set_last_committed_decree(25);
entry.confirmed_decree = 15;
entry.checkpoint_prepared = true;
ASSERT_FALSE(dup.alter_progress(1, entry));
ASSERT_EQ(25, dup._progress[1].last_committed_decree);
// volatile_decree would be updated successfully even if it is too frequent.
ASSERT_EQ(15, dup._progress[1].volatile_decree);
ASSERT_EQ(10, dup._progress[1].stored_decree);
ASSERT_FALSE(dup._progress[1].is_altering);
// checkpoint_prepared would be updated successfully even if it is too frequent.
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);

// Reduce last update timestamp to make it infrequent.
dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100;
entry.__set_last_committed_decree(26);
entry.confirmed_decree = 25;

entry.confirmed_decree = 15;
entry.checkpoint_prepared = true;
ASSERT_TRUE(dup.alter_progress(1, entry));
ASSERT_EQ(26, dup._progress[1].last_committed_decree);
ASSERT_EQ(25, dup._progress[1].volatile_decree);
ASSERT_EQ(10, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);

// Checkpoint are ready for both partition 0 and 1.
ASSERT_TRUE(dup.all_checkpoint_has_prepared());
}

Expand Down Expand Up @@ -128,8 +188,9 @@ class duplication_info_test : public testing::Test
for (int i = 0; i < 4; i++) {
dup.init_progress(i, invalid_decree);
}

for (auto kv : dup_ent.progress) {
ASSERT_EQ(kv.second, invalid_decree);
ASSERT_EQ(invalid_decree, kv.second);
}

dup.start();
Expand All @@ -153,8 +214,8 @@ class duplication_info_test : public testing::Test
dup.start();

dup.persist_status();
ASSERT_EQ(dup._status, duplication_status::DS_PREPARE);
ASSERT_EQ(dup._next_status, duplication_status::DS_INIT);
ASSERT_EQ(duplication_status::DS_PREPARE, dup._status);
ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);
ASSERT_FALSE(dup.is_altering());
}

Expand Down

0 comments on commit d0fc506

Please sign in to comment.