From 9cd9991d9fab048978913b8ba94988a8b94a8ca5 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Mon, 26 Jul 2021 15:14:27 +0800 Subject: [PATCH] feat(duplication): derror and add perf-counter if mutation loss instead of dassert for duplication (#862) --- src/replica/duplication/mutation_batch.cpp | 61 ++++++++++++++++++- src/replica/duplication/mutation_batch.h | 19 +++++- .../duplication/test/mutation_batch_test.cpp | 23 +++++++ src/replica/mutation_cache.h | 2 + src/replica/prepare_list.h | 5 +- 5 files changed, 105 insertions(+), 5 deletions(-) diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 43d3ec9e5f..6d1dd7c287 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -20,13 +20,70 @@ #include "replica_duplicator.h" #include "mutation_batch.h" -#include "replica/prepare_list.h" namespace dsn { namespace replication { /*static*/ constexpr int64_t mutation_batch::PREPARE_LIST_NUM_ENTRIES; +mutation_buffer::mutation_buffer(replica_base *r, + decree init_decree, + int max_count, + mutation_committer committer) + : prepare_list(r, init_decree, max_count, committer) +{ + auto counter_str = fmt::format("dup_recent_mutation_loss_count@{}", r->get_gpid()); + _counter_dulication_mutation_loss_count.init_app_counter( + "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); +} + +void mutation_buffer::commit(decree d, commit_type ct) +{ + if (d <= last_committed_decree()) + return; + + if (ct != COMMIT_TO_DECREE_HARD) { + dassert_replica(false, "invalid commit type {}", (int)ct); + } + + ballot last_bt = 0; + for (decree d0 = last_committed_decree() + 1; d0 <= d; d0++) { + mutation_ptr next_committed_mutation = get_mutation_by_decree(d0); + // The unexpected case as follow: next_committed_decree is out of prepare_list[start~end] + // + // last_committed_decree - next_committed_decree + // | | + // n n+1 + // + // [min_decree------max_decree] + // | | + // n+m(m>1) n+k(k>=m) + // + // just derror but not dassert if mutation loss or other problem, it's different from base + // class implement. And from the error and perf-counter, we can choose restart duplication + // or ignore the loss. + if (next_committed_mutation == nullptr || !next_committed_mutation->is_logged()) { + derror_replica("mutation[{}] is lost in prepare_list: " + "prepare_last_committed_decree={}, prepare_min_decree={}, " + "prepare_max_decree={}", + d0, + last_committed_decree(), + min_decree(), + max_decree()); + _counter_dulication_mutation_loss_count->set(min_decree() - last_committed_decree()); + // if next_commit_mutation loss, let last_commit_decree catch up with min_decree, and + // the next loop will commit from min_decree + _last_committed_decree = min_decree() - 1; + return; + } + + dcheck_ge_replica(next_committed_mutation->data.header.ballot, last_bt); + _last_committed_decree++; + last_bt = next_committed_mutation->data.header.ballot; + _committer(next_committed_mutation); + } +} + error_s mutation_batch::add(mutation_ptr mu) { if (mu->get_decree() <= _mutation_buffer->last_committed_decree()) { @@ -76,7 +133,7 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r) replica_base base( r->get_gpid(), std::string("mutation_batch@") + r->replica_name(), r->app_name()); _mutation_buffer = - make_unique(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) { + make_unique(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) { // committer add_mutation_if_valid(mu, _loaded_mutations, _start_decree); }); diff --git a/src/replica/duplication/mutation_batch.h b/src/replica/duplication/mutation_batch.h index 588aa2bb0e..dd47880af7 100644 --- a/src/replica/duplication/mutation_batch.h +++ b/src/replica/duplication/mutation_batch.h @@ -17,15 +17,29 @@ #pragma once +#include #include #include "replica/mutation.h" - +#include "replica/prepare_list.h" namespace dsn { namespace replication { class replica_duplicator; -class prepare_list; + +class mutation_buffer : public prepare_list +{ +public: + mutation_buffer(replica_base *r, + decree init_decree, + int max_count, + mutation_committer committer); + + void commit(decree d, commit_type ct); + +private: + perf_counter_wrapper _counter_dulication_mutation_loss_count; +}; // A sorted array of committed mutations that are ready for duplication. // Not thread-safe. @@ -51,6 +65,7 @@ class mutation_batch : replica_base private: friend class replica_duplicator_test; + friend class mutation_batch_test; std::unique_ptr _mutation_buffer; mutation_tuple_set _loaded_mutations; diff --git a/src/replica/duplication/test/mutation_batch_test.cpp b/src/replica/duplication/test/mutation_batch_test.cpp index 26cf080604..cb8431fe89 100644 --- a/src/replica/duplication/test/mutation_batch_test.cpp +++ b/src/replica/duplication/test/mutation_batch_test.cpp @@ -24,6 +24,18 @@ namespace replication { class mutation_batch_test : public duplication_test_base { public: + void + reset_buffer(const mutation_batch &batcher, const decree last_commit, decree start, decree end) + { + batcher._mutation_buffer->reset(last_commit); + batcher._mutation_buffer->_start_decree = start; + batcher._mutation_buffer->_end_decree = end; + } + + void commit_buffer(const mutation_batch &batcher, const decree current_decree) + { + batcher._mutation_buffer->commit(current_decree, COMMIT_TO_DECREE_HARD); + } }; TEST_F(mutation_batch_test, add_mutation_if_valid) @@ -62,5 +74,16 @@ TEST_F(mutation_batch_test, ignore_non_idempotent_write) ASSERT_EQ(result.size(), 0); } +TEST_F(mutation_batch_test, mutation_buffer_commit) +{ + auto duplicator = create_test_duplicator(0); + mutation_batch batcher(duplicator.get()); + // mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit decree) is out of + // [start~end] + reset_buffer(batcher, 10, 15, 20); + commit_buffer(batcher, 15); + ASSERT_EQ(batcher.last_decree(), 14); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/mutation_cache.h b/src/replica/mutation_cache.h index 802d5b8c4c..de5461c9b6 100644 --- a/src/replica/mutation_cache.h +++ b/src/replica/mutation_cache.h @@ -57,6 +57,8 @@ class mutation_cache int capacity() const { return _max_count; } private: + friend class mutation_batch_test; + std::vector _array; int _max_count; diff --git a/src/replica/prepare_list.h b/src/replica/prepare_list.h index 82edaed4f2..ee84af1879 100644 --- a/src/replica/prepare_list.h +++ b/src/replica/prepare_list.h @@ -68,9 +68,12 @@ class prepare_list : public mutation_cache, private replica_base error_code prepare(mutation_ptr &mu, partition_status::type status, bool pop_all_committed_mutations = false); // unordered prepare - void commit(decree decree, commit_type ct); // ordered commit + virtual void commit(decree decree, commit_type ct); // ordered commit + + virtual ~prepare_list() = default; private: + friend class mutation_buffer; decree _last_committed_decree; mutation_committer _committer; };