Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(dup): protect private log from missing when duplication is enabled #320

Merged
merged 17 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
}
}

decree replica_duplicator_manager::min_confirmed_decree() const
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
zauto_lock l(_lock);

decree min_decree = invalid_decree;
if (_replica->status() == partition_status::PS_PRIMARY) {
for (auto &kv : _duplications) {
const duplication_progress &p = kv.second->progress();
if (min_decree == invalid_decree) {
min_decree = p.confirmed_decree;
} else {
min_decree = std::min(min_decree, p.confirmed_decree);
}
}
} else if (_primary_confirmed_decree > 0) {
// if the replica is not primary, use the latest known (from primary)
// confirmed_decree instead.
min_decree = _primary_confirmed_decree;
}
return min_decree;
}

// Remove the duplications that are not in the `new_dup_map`.
// NOTE: this function may be blocked when destroying replica_duplicator.
void replica_duplicator_manager::remove_non_existed_duplications(
Expand All @@ -80,5 +102,23 @@ void replica_duplicator_manager::remove_non_existed_duplications(
}
}

void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree confirmed)
{
// this function always runs in the same single thread with config-sync
if (_replica->status() != partition_status::PS_SECONDARY) {
return;
}

zauto_lock l(_lock);
remove_all_duplications();
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (confirmed >= 0) {
// confirmed decree never decreases
if (_primary_confirmed_decree < confirmed) {
_primary_confirmed_decree = confirmed;
}
}
_replica->update_init_info_duplicating(confirmed >= 0);
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ class replica_duplicator_manager : public replica_base
/// collect updated duplication confirm points from this replica.
std::vector<duplication_confirm_entry> get_duplication_confirms_to_update() const;

/// mutations <= min_confirmed_decree are assumed to be cleanable.
/// If there's no duplication, invalid_decree is returned, mean that all logs are cleanable.
/// THREAD_POOL_REPLICATION
/// \see replica::on_checkpoint_timer()
decree min_confirmed_decree() const;

/// Updates the latest known confirmed decree on this replica if it's secondary.
/// THREAD_POOL_REPLICATION
/// \see replica_check.cpp
void update_confirmed_decree_if_secondary(decree confirmed);

private:
void sync_duplication(const duplication_entry &ent);

Expand All @@ -66,6 +77,8 @@ class replica_duplicator_manager : public replica_base

std::map<dupid_t, replica_duplicator_u_ptr> _duplications;

decree _primary_confirmed_decree{invalid_decree};

// avoid thread conflict between replica::on_checkpoint_timer and
// duplication_sync_timer.
mutable zlock _lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@ class replica_duplicator_manager_test : public duplication_test_base
ASSERT_EQ(d._duplications.size(), 1);
}

void test_set_confirmed_decree_non_primary()
{
auto r = stub->add_primary_replica(2, 1);
auto &d = r->get_replica_duplicator_manager();

duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
ent.remote = "dsn://slave-cluster";
ent.progress[r->get_gpid().get_partition_index()] = 100;
d.sync_duplication(ent);
ASSERT_EQ(d._duplications.size(), 1);
ASSERT_EQ(d._primary_confirmed_decree, invalid_decree);

// replica failover
r->as_secondary();

d.update_confirmed_decree_if_secondary(99);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 99);

// receives group check
d.update_confirmed_decree_if_secondary(101);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 101);

// confirmed decree never decreases
d.update_confirmed_decree_if_secondary(0);
ASSERT_EQ(d._primary_confirmed_decree, 101);
d.update_confirmed_decree_if_secondary(1);
ASSERT_EQ(d._primary_confirmed_decree, 101);
}

void test_get_duplication_confirms()
{
auto r = stub->add_primary_replica(2, 1);
Expand Down Expand Up @@ -68,17 +101,71 @@ class replica_duplicator_manager_test : public duplication_test_base
auto result = r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
ASSERT_EQ(result.size(), update_dup_num);
}

void test_min_confirmed_decree()
{
struct test_case
{
std::vector<int64_t> confirmed_decree;
int64_t min_confirmed_decree;
};

auto r = stub->add_non_primary_replica(2, 1);
auto assert_test = [r, this](test_case tt) {
for (int id = 1; id <= tt.confirmed_decree.size(); id++) {
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
ent.progress[r->get_gpid().get_partition_index()] = 0;

auto dup = make_unique<replica_duplicator>(ent, r);
dup->update_progress(dup->progress()
.set_last_decree(tt.confirmed_decree[id - 1])
.set_confirmed_decree(tt.confirmed_decree[id - 1]));
add_dup(r, std::move(dup));
}

ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(),
tt.min_confirmed_decree);
r->get_replica_duplicator_manager()._duplications.clear();
};

{
// non-primary
test_case tt{{1, 2, 3}, invalid_decree};
assert_test(tt);
}

{ // primary
r->as_primary();
test_case tt{{1, 2, 3}, 1};
assert_test(tt);

tt = {{1000}, 1000};
assert_test(tt);

tt = {{}, invalid_decree};
assert_test(tt);
}
}
};

TEST_F(replica_duplicator_manager_test, get_duplication_confirms)
{
test_get_duplication_confirms();
}

TEST_F(replica_duplicator_manager_test, set_confirmed_decree_non_primary)
{
test_set_confirmed_decree_non_primary();
}

TEST_F(replica_duplicator_manager_test, remove_non_existed_duplications)
{
test_remove_non_existed_duplications();
}

TEST_F(replica_duplicator_manager_test, min_confirmed_decree) { test_min_confirmed_decree(); }

} // namespace replication
} // namespace dsn
123 changes: 123 additions & 0 deletions src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/utility/filesystem.h>

#include "dist/replication/lib/mutation_log_utils.h"
#include "dist/replication/lib/duplication/load_from_private_log.h"
#include "dist/replication/lib/duplication/duplication_pipeline.h"
#include "duplication_test_base.h"

namespace dsn {
namespace apps {

// for loading PUT mutations from log file.
DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT);

} // namespace apps
} // namespace dsn

namespace dsn {
namespace replication {

class replica_duplicator_test : public duplication_test_base
{
public:
replica_duplicator_test() { _replica->init_private_log(_log_dir); }

void test_new_duplicator()
{
dupid_t dupid = 1;
std::string remote = "remote_address";
duplication_status::type status = duplication_status::DS_PAUSE;
int64_t confirmed_decree = 100;

duplication_entry dup_ent;
dup_ent.dupid = dupid;
dup_ent.remote = remote;
dup_ent.status = status;
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed_decree;

auto duplicator = make_unique<replica_duplicator>(dup_ent, _replica.get());
ASSERT_EQ(duplicator->id(), dupid);
ASSERT_EQ(duplicator->remote_cluster_name(), remote);
ASSERT_EQ(duplicator->_status, status);
ASSERT_EQ(duplicator->progress().confirmed_decree, confirmed_decree);
ASSERT_EQ(duplicator->progress().last_decree, confirmed_decree);

auto &expected_env = *duplicator;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);
}

void test_pause_start_duplication()
{
mutation_log_ptr mlog = new mutation_log_private(
_replica->dir(), 4, _replica->get_gpid(), _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);

{
_replica->init_private_log(mlog);
auto duplicator = create_test_duplicator();

duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
auto expected_env = duplicator->_ship->_mutation_duplicator->_env;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);

duplicator->update_status_if_needed(duplication_status::DS_PAUSE);
ASSERT_TRUE(duplicator->paused());
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);
ASSERT_EQ(duplicator->_load_private.get(), nullptr);
ASSERT_EQ(duplicator->_load.get(), nullptr);
ASSERT_EQ(duplicator->_ship.get(), nullptr);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

duplicator->wait_all();
}
}
};

TEST_F(replica_duplicator_test, new_duplicator) { test_new_duplicator(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种TEST里面转调一个函数的方式,是因为无法访问某些private变量吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的


TEST_F(replica_duplicator_test, pause_start_duplication) { test_pause_start_duplication(); }

TEST_F(replica_duplicator_test, duplication_progress)
{
auto duplicator = create_test_duplicator();
ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication from empty plog
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_last_decree(10));
ASSERT_EQ(duplicator->progress().last_decree, 10);
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_confirmed_decree(10));
ASSERT_EQ(duplicator->progress().confirmed_decree, 10);
ASSERT_EQ(duplicator->progress().last_decree, 10);

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)),
error_s::make(ERR_INVALID_STATE, "never decrease confirmed_decree: new(1) old(10)"));

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)),
error_s::make(ERR_INVALID_STATE,
"last_decree(10) should always larger than confirmed_decree(12)"));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
bool verbose_commit_log() const;
dsn::task_tracker *tracker() { return &_tracker; }

/// \see replica_duplicate.cpp
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const;
void update_init_info_duplicating(bool duplicating);

void update_last_checkpoint_generate_time();

Expand Down
22 changes: 15 additions & 7 deletions src/dist/replication/lib/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"

#include "dist/replication/lib/duplication/replica_duplicator_manager.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>

namespace dsn {
Expand Down Expand Up @@ -91,6 +95,7 @@ void replica::broadcast_group_check()
request->node = addr;
_primary_states.get_replica_config(it->second, request->config);
request->last_committed_decree = last_committed_decree();
request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree());

if (request->config.status == partition_status::PS_POTENTIAL_SECONDARY) {
auto it = _primary_states.learners.find(addr);
Expand Down Expand Up @@ -133,13 +138,13 @@ void replica::on_group_check(const group_check_request &request,
{
_checker.only_one_thread_access();

ddebug("%s: process group check, primary = %s, ballot = %" PRId64
", status = %s, last_committed_decree = %" PRId64,
name(),
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree);
ddebug_replica("process group check, primary = {}, ballot = {}, status = {}, "
"last_committed_decree = {}, confirmed_decree = {}",
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree,
request.__isset.confirmed_decree ? request.confirmed_decree : invalid_decree);

if (request.config.ballot < get_ballot()) {
response.err = ERR_VERSION_OUTDATED;
Expand All @@ -154,6 +159,9 @@ void replica::on_group_check(const group_check_request &request,
} else if (is_same_ballot_status_change_allowed(status(), request.config.status)) {
update_local_configuration(request.config, true);
}
if (request.__isset.confirmed_decree) {
_duplication_mgr->update_confirmed_decree_if_secondary(request.confirmed_decree);
}

switch (status()) {
case partition_status::PS_INACTIVE:
Expand Down
Loading