Skip to content

Commit

Permalink
feat(duplication): derror and add perf-counter if mutation loss inste…
Browse files Browse the repository at this point in the history
…ad of dassert for duplication (#862)
  • Loading branch information
foreverneverer authored Jul 26, 2021
1 parent df46be2 commit 9cd9991
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 5 deletions.
61 changes: 59 additions & 2 deletions src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,70 @@

#include "replica_duplicator.h"
#include "mutation_batch.h"
#include "replica/prepare_list.h"

namespace dsn {
namespace replication {

/*static*/ constexpr int64_t mutation_batch::PREPARE_LIST_NUM_ENTRIES;

mutation_buffer::mutation_buffer(replica_base *r,
decree init_decree,
int max_count,
mutation_committer committer)
: prepare_list(r, init_decree, max_count, committer)
{
auto counter_str = fmt::format("dup_recent_mutation_loss_count@{}", r->get_gpid());
_counter_dulication_mutation_loss_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());
}

void mutation_buffer::commit(decree d, commit_type ct)
{
if (d <= last_committed_decree())
return;

if (ct != COMMIT_TO_DECREE_HARD) {
dassert_replica(false, "invalid commit type {}", (int)ct);
}

ballot last_bt = 0;
for (decree d0 = last_committed_decree() + 1; d0 <= d; d0++) {
mutation_ptr next_committed_mutation = get_mutation_by_decree(d0);
// The unexpected case as follow: next_committed_decree is out of prepare_list[start~end]
//
// last_committed_decree - next_committed_decree
// | |
// n n+1
//
// [min_decree------max_decree]
// | |
// n+m(m>1) n+k(k>=m)
//
// just derror but not dassert if mutation loss or other problem, it's different from base
// class implement. And from the error and perf-counter, we can choose restart duplication
// or ignore the loss.
if (next_committed_mutation == nullptr || !next_committed_mutation->is_logged()) {
derror_replica("mutation[{}] is lost in prepare_list: "
"prepare_last_committed_decree={}, prepare_min_decree={}, "
"prepare_max_decree={}",
d0,
last_committed_decree(),
min_decree(),
max_decree());
_counter_dulication_mutation_loss_count->set(min_decree() - last_committed_decree());
// if next_commit_mutation loss, let last_commit_decree catch up with min_decree, and
// the next loop will commit from min_decree
_last_committed_decree = min_decree() - 1;
return;
}

dcheck_ge_replica(next_committed_mutation->data.header.ballot, last_bt);
_last_committed_decree++;
last_bt = next_committed_mutation->data.header.ballot;
_committer(next_committed_mutation);
}
}

error_s mutation_batch::add(mutation_ptr mu)
{
if (mu->get_decree() <= _mutation_buffer->last_committed_decree()) {
Expand Down Expand Up @@ -76,7 +133,7 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
replica_base base(
r->get_gpid(), std::string("mutation_batch@") + r->replica_name(), r->app_name());
_mutation_buffer =
make_unique<prepare_list>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
make_unique<mutation_buffer>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
// committer
add_mutation_if_valid(mu, _loaded_mutations, _start_decree);
});
Expand Down
19 changes: 17 additions & 2 deletions src/replica/duplication/mutation_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,29 @@

#pragma once

#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/mutation_duplicator.h>

#include "replica/mutation.h"

#include "replica/prepare_list.h"
namespace dsn {
namespace replication {

class replica_duplicator;
class prepare_list;

class mutation_buffer : public prepare_list
{
public:
mutation_buffer(replica_base *r,
decree init_decree,
int max_count,
mutation_committer committer);

void commit(decree d, commit_type ct);

private:
perf_counter_wrapper _counter_dulication_mutation_loss_count;
};

// A sorted array of committed mutations that are ready for duplication.
// Not thread-safe.
Expand All @@ -51,6 +65,7 @@ class mutation_batch : replica_base

private:
friend class replica_duplicator_test;
friend class mutation_batch_test;

std::unique_ptr<prepare_list> _mutation_buffer;
mutation_tuple_set _loaded_mutations;
Expand Down
23 changes: 23 additions & 0 deletions src/replica/duplication/test/mutation_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ namespace replication {
class mutation_batch_test : public duplication_test_base
{
public:
void
reset_buffer(const mutation_batch &batcher, const decree last_commit, decree start, decree end)
{
batcher._mutation_buffer->reset(last_commit);
batcher._mutation_buffer->_start_decree = start;
batcher._mutation_buffer->_end_decree = end;
}

void commit_buffer(const mutation_batch &batcher, const decree current_decree)
{
batcher._mutation_buffer->commit(current_decree, COMMIT_TO_DECREE_HARD);
}
};

TEST_F(mutation_batch_test, add_mutation_if_valid)
Expand Down Expand Up @@ -62,5 +74,16 @@ TEST_F(mutation_batch_test, ignore_non_idempotent_write)
ASSERT_EQ(result.size(), 0);
}

TEST_F(mutation_batch_test, mutation_buffer_commit)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());
// mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit decree) is out of
// [start~end]
reset_buffer(batcher, 10, 15, 20);
commit_buffer(batcher, 15);
ASSERT_EQ(batcher.last_decree(), 14);
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/mutation_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class mutation_cache
int capacity() const { return _max_count; }

private:
friend class mutation_batch_test;

std::vector<mutation_ptr> _array;
int _max_count;

Expand Down
5 changes: 4 additions & 1 deletion src/replica/prepare_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ class prepare_list : public mutation_cache, private replica_base
error_code prepare(mutation_ptr &mu,
partition_status::type status,
bool pop_all_committed_mutations = false); // unordered prepare
void commit(decree decree, commit_type ct); // ordered commit
virtual void commit(decree decree, commit_type ct); // ordered commit

virtual ~prepare_list() = default;

private:
friend class mutation_buffer;
decree _last_committed_decree;
mutation_committer _committer;
};
Expand Down

0 comments on commit 9cd9991

Please sign in to comment.