Skip to content

Commit

Permalink
fix(dup): don't GC by valid_start_offset during duplication & add app…
Browse files Browse the repository at this point in the history
…_name to replica_base (#448)
  • Loading branch information
Wu Tao authored Apr 28, 2020
1 parent e2d33eb commit 9a1a997
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 68 deletions.
13 changes: 11 additions & 2 deletions include/dsn/dist/replication/replica_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ namespace replication {
/// Base class for types that are one-instance-per-replica.
struct replica_base
{
replica_base(gpid id, string_view name) : _gpid(id), _name(name) {}
replica_base(gpid id, string_view name, string_view app_name)
: _gpid(id), _name(name), _app_name(app_name)
{
}

explicit replica_base(replica_base *rhs) : replica_base(rhs->get_gpid(), rhs->replica_name()) {}
explicit replica_base(replica_base *rhs)
: replica_base(rhs->get_gpid(), rhs->replica_name(), rhs->_app_name)
{
}

gpid get_gpid() const { return _gpid; }

const char *replica_name() const { return _name.c_str(); }

const char *app_name() const { return _app_name.c_str(); }

private:
const gpid _gpid;
const std::string _name;
const std::string _app_name;
};

} // namespace replication
Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
// Prepend a special tag identifying this is a mutation_batch,
// so `dxxx_replica` logging in prepare_list will print along with its real caller.
// This helps for debugging.
replica_base base(r->get_gpid(), std::string("mutation_batch@") + r->replica_name());
replica_base base(
r->get_gpid(), std::string("mutation_batch@") + r->replica_name(), r->app_name());
_mutation_buffer =
make_unique<prepare_list>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
// committer
Expand Down
14 changes: 7 additions & 7 deletions src/dist/replication/lib/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ void replica_duplicator::verify_start_decree(decree start_decree)
decree confirmed_decree = progress().confirmed_decree;
decree last_decree = progress().last_decree;
decree max_gced_decree = get_max_gced_decree();
dassert(max_gced_decree < start_decree,
"the logs haven't yet duplicated were accidentally truncated "
"[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]",
max_gced_decree,
start_decree,
confirmed_decree,
last_decree);
dassert_f(max_gced_decree < start_decree,
"the logs haven't yet duplicated were accidentally truncated "
"[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]",
max_gced_decree,
start_decree,
confirmed_decree,
last_decree);
}

decree replica_duplicator::get_max_gced_decree() const
Expand Down
43 changes: 19 additions & 24 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ static bool should_reserve_file(log_file_ptr log,
}

int mutation_log::garbage_collection(gpid gpid,
decree durable_decree,
decree cleanable_decree,
int64_t valid_start_offset,
int64_t reserve_max_size,
int64_t reserve_max_time)
Expand Down Expand Up @@ -1252,27 +1252,25 @@ int mutation_log::garbage_collection(gpid gpid,

// log is invalid, ok to delete
else if (valid_start_offset >= log->end_offset()) {
dinfo("gc_private @ %d.%d: max_offset for %s is %" PRId64 " vs %" PRId64
" as app.valid_start_offset.private,"
" safe to delete this and all older logs",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
mark_it->second->path().c_str(),
mark_it->second->end_offset(),
valid_start_offset);
ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because "
"valid_start_offset={} outdates log_end_offset={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
valid_start_offset,
log->end_offset());
break;
}

// all decrees are durable, ok to delete
else if (durable_decree >= max_decree) {
dinfo("gc_private @ %d.%d: max_decree for %s is %" PRId64 " vs %" PRId64
" as app.durable decree,"
" safe to delete this and all older logs",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
mark_it->second->path().c_str(),
max_decree,
durable_decree);
// all mutations are cleanable, ok to delete
else if (cleanable_decree >= max_decree) {
ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because "
"cleanable_decree={} outdates max_decree={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
cleanable_decree,
max_decree);
break;
}

Expand All @@ -1296,7 +1294,7 @@ int mutation_log::garbage_collection(gpid gpid,
for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_to_delete;
++it) {
log_file_ptr log = it->second;
dassert(it->first == log->index(), "%d VS %d", it->first, log->index());
dcheck_eq(it->first, log->index());

// close first
log->close();
Expand All @@ -1312,10 +1310,7 @@ int mutation_log::garbage_collection(gpid gpid,
}

// delete succeed
ddebug("gc_private @ %d.%d: log file %s is removed",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
fpath.c_str());
ddebug_f("gc_private @ {}: log file {} is removed", _private_gpid, fpath);
deleted++;

// erase from _log_files
Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ class mutation_log : public ref_counter
// Decree of the maximum garbage-collected mutation.
// For example, given mutations [20, 100], if [20, 50] is garbage-collected,
// the max_gced_decree=50.
// In production the mutations may not be ordered with the file-id. Given 3 log files:
// Under the real-world cases, the mutations may not be ordered with the file-id.
// Given 3 log files:
// #1:[20, 30], #2:[30, 50], #3:[10, 50]
// The third file is learned from primary of new epoch. Since it contains mutations smaller
// than the others, the max_gced_decree = 9.
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace replication {
replica::replica(
replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore)
: serverlet<replica>("replica"),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str)),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str), app.app_name),
_app_info(app),
_primary_states(
gpid, stub->options().staleness_for_commit, stub->options().batch_write_disabled),
Expand Down
24 changes: 12 additions & 12 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ void replica::on_checkpoint_timer()
decree last_durable_decree = _app->last_durable_decree();
decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
decree cleanable_decree = last_durable_decree;
int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;

if (min_confirmed_decree >= 0) {
// Do not rely on valid_start_offset for GC during duplication.
// cleanable_decree is the only GC trigger.
valid_start_offset = 0;
if (min_confirmed_decree < last_durable_decree) {
ddebug_replica("gc_private {}: delay gc for duplication: min_confirmed_decree({}) "
"last_durable_decree({})",
Expand All @@ -87,20 +92,15 @@ void replica::on_checkpoint_timer()
min_confirmed_decree,
last_durable_decree);
}
} else {
// protect the logs from being truncated
// if this app is in duplication
if (is_duplicating()) {
// unsure if the logs can be dropped, because min_confirmed_decree
// is currently unavailable
ddebug_replica(
"gc_private {}: skip gc because confirmed duplication progress is unknown",
enum_to_string(status()));
return;
}
} else if (is_duplicating()) {
// unsure if the logs can be dropped, because min_confirmed_decree
// is currently unavailable
ddebug_replica(
"gc_private {}: skip gc because confirmed duplication progress is unknown",
enum_to_string(status()));
return;
}

int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;
tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this, plog, cleanable_decree, valid_start_offset] {
Expand Down
5 changes: 0 additions & 5 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ void replica_stub::install_perf_counters()
"dup.pending_mutations_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"number of mutations pending for duplication");
_counter_dup_time_lag.init_app_counter(
"eon.replica_stub",
"dup.time_lag(ms)",
COUNTER_TYPE_NUMBER_PERCENTILES,
"time (in ms) lag between master and slave in the duplication");

// <- Cold Backup Metrics ->

Expand Down
1 change: 0 additions & 1 deletion src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// if we need to duplicate to multiple clusters someday.
perf_counter_wrapper _counter_dup_confirmed_rate;
perf_counter_wrapper _counter_dup_pending_mutations_count;
perf_counter_wrapper _counter_dup_time_lag;

perf_counter_wrapper _counter_cold_backup_running_count;
perf_counter_wrapper _counter_cold_backup_recent_start_count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,55 @@ class mutation_log_test : public replica_test_base
ASSERT_EQ(std::string(lhs.data(), lhs.length()), std::string(rhs.data(), rhs.length()));
}

// return number of entries written
int generate_multiple_log_files(uint files_num = 3)
{
// decree ranges from [1, files_num*10)
for (int f = 0; f < files_num; f++) {
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(msg, 10 * f + i);
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
mlog->tracker()->wait_outstanding_tasks();
mlog->close();
}
return static_cast<int>(files_num * 10);
}

mutation_log_ptr create_private_log() { return create_private_log(1); }

mutation_log_ptr create_private_log(int private_log_size_mb, decree replay_start_decree = 0)
{
gpid id = get_gpid();
std::map<gpid, decree> replay_condition;
replay_condition[id] = replay_start_decree;
mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; };
mutation_log_ptr mlog;

int try_cnt = 0;
while (try_cnt < 5) {
try_cnt++;
mlog = new mutation_log_private(
_replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000);
error_code err = mlog->open(cb, nullptr, replay_condition);
if (err == ERR_OK) {
break;
}
derror_f("mlog open failed, encountered error: {}", err);
}
EXPECT_NE(mlog, nullptr);
return mlog;
}

void test_replay_single_file(int num_entries)
{
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog = new mutation_log_private(
_log_dir, 1024, get_gpid(), _replica.get(), 1024, 512, 10000);

EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
mutation_log_ptr mlog = create_private_log();

for (int i = 0; i < num_entries; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
Expand Down Expand Up @@ -354,10 +394,7 @@ class mutation_log_test : public replica_test_base
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog = new mutation_log_private(
_log_dir, private_log_file_size_mb, get_gpid(), _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);

mutation_log_ptr mlog = create_private_log(private_log_file_size_mb);
for (int i = 0; i < num_entries; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
mutations.push_back(mu);
Expand All @@ -366,8 +403,7 @@ class mutation_log_test : public replica_test_base
}

{ // reading logs
mutation_log_ptr mlog =
new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000);
mutation_log_ptr mlog = create_private_log(private_log_file_size_mb);

std::vector<std::string> log_files;
ASSERT_TRUE(utils::filesystem::get_subfiles(mlog->dir(), log_files, false));
Expand Down Expand Up @@ -411,10 +447,7 @@ TEST_F(mutation_log_test, open)
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog =
new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000);

EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
mutation_log_ptr mlog = create_private_log(4);

for (int i = 0; i < 1000; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
Expand Down Expand Up @@ -449,5 +482,18 @@ TEST_F(mutation_log_test, replay_multiple_files_20000_1mb) { test_replay_multipl

TEST_F(mutation_log_test, replay_multiple_files_50000_1mb) { test_replay_multiple_files(50000, 1); }

TEST_F(mutation_log_test, replay_start_decree)
{
// decree ranges from [1, 30)
generate_multiple_log_files(3);

decree replay_start_decree = 11; // start replay from second file, the first file is ignored.
mutation_log_ptr mlog = create_private_log(1, replay_start_decree);

// ensure the first file is not stripped out.
ASSERT_EQ(mlog->max_gced_decree(get_gpid()), 0);
ASSERT_EQ(mlog->get_log_file_map().size(), 3);
}

} // namespace replication
} // namespace dsn

0 comments on commit 9a1a997

Please sign in to comment.