From 5b76e3828c9c252e43218d37854ca69d16ce9812 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 11 Jan 2021 13:41:25 -0800 Subject: [PATCH] r/consensus: adds conditional replicate For the idempotent producer feature we need to be sure that when a broker (a leader) replicates a message it has seen all the previous messages. Every time a leader is elected it receives a unique term. If we use a condition on the current leader's term it guaratees that when an re-election happens since the last messages catch up the replicate request doesn't pass. --- src/v/raft/consensus.cc | 31 ++++++++--- src/v/raft/consensus.h | 34 +++++++++++- src/v/raft/replicate_batcher.cc | 70 ++++++++++++++++--------- src/v/raft/replicate_batcher.h | 12 +++-- src/v/raft/tests/append_entries_test.cc | 48 +++++++++++++++++ src/v/raft/tests/raft_group_fixture.h | 22 ++++++++ 6 files changed, 179 insertions(+), 38 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index c63526674215..b545f74f7c65 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -411,6 +411,20 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) { ss::future> consensus::replicate(model::record_batch_reader&& rdr, replicate_options opts) { + return do_replicate({}, std::move(rdr), opts); +} + +ss::future> consensus::replicate( + model::term_id expected_term, + model::record_batch_reader&& rdr, + replicate_options opts) { + return do_replicate(expected_term, std::move(rdr), opts); +} + +ss::future> consensus::do_replicate( + std::optional expected_term, + model::record_batch_reader&& rdr, + replicate_options opts) { if (!is_leader() || unlikely(_transferring_leadership)) { return seastar::make_ready_future>( errc::not_leader); @@ -418,11 +432,11 @@ consensus::replicate(model::record_batch_reader&& rdr, replicate_options opts) { if (opts.consistency == consistency_level::quorum_ack) { _probe.replicate_requests_ack_all(); - return ss::with_gate(_bg, [this, rdr = std::move(rdr)]() mutable { - return _batcher.replicate(std::move(rdr)).finally([this] { - _probe.replicate_done(); - }); - }); + return ss::with_gate( + _bg, [this, expected_term, rdr = std::move(rdr)]() mutable { + return _batcher.replicate(expected_term, std::move(rdr)) + .finally([this] { _probe.replicate_done(); }); + }); } if (opts.consistency == consistency_level::leader_ack) { @@ -433,12 +447,17 @@ consensus::replicate(model::record_batch_reader&& rdr, replicate_options opts) { // For relaxed consistency, append data to leader disk without flush // asynchronous replication is provided by Raft protocol recovery mechanism. return _op_lock - .with([this, rdr = std::move(rdr)]() mutable { + .with([this, expected_term, rdr = std::move(rdr)]() mutable { if (!is_leader()) { return seastar::make_ready_future>( errc::not_leader); } + if (expected_term.has_value() && expected_term.value() != _term) { + return seastar::make_ready_future>( + errc::not_leader); + } + return disk_append(model::make_record_batch_reader< details::term_assigning_reader>( std::move(rdr), model::term_id(_term))) diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 3aec2090ed44..5edfbe36d0c8 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -162,6 +162,34 @@ class consensus { ss::future> replicate(model::record_batch_reader&&, replicate_options); + /** + * Replication happens only when expected_term matches the current _term + * otherwise consensus returns not_leader. This feature is needed to keep + * ingestion-time state machine in sync with the log. The conventional + * state machines running on top on the log are optimistic: to execute a + * command a user should add a command to a log (replicate) then continue + * reading the commands from the log and executing them one after another. + * When the commands are conditional the conventional approach is wasteful + * because we even when a condition resolves to false we still pay the + * replication costs. An alternative approach is to check the conditions + * before replication but in this case there is a risk of divergence between + * the log and the state (e.g. a leadership moves to an another broker, it + * adds messages then the leadership moves back). The expected_term + * prevents this situation. The expected use case is: + * 1. when a cached term matches consensus.term() call replicate using + * the cached term as expected_term + * 2. otherwise: + * a. abrt all incoming requests + * b. call consensus meta() to get the latest offset and a term + * c. wait until the state caches up with the latest offset + * d. cache the term + * e. continue with step #1 + */ + ss::future> replicate( + model::term_id expected_term, + model::record_batch_reader&&, + replicate_options); + ss::future make_reader( storage::log_reader_config, std::optional = std::nullopt); @@ -281,8 +309,10 @@ class consensus { append_entries_reply make_append_entries_reply(vnode, storage::append_result); - ss::future> - do_replicate(model::record_batch_reader&&); + ss::future> do_replicate( + std::optional, + model::record_batch_reader&&, + replicate_options); ss::future disk_append(model::record_batch_reader&&); diff --git a/src/v/raft/replicate_batcher.cc b/src/v/raft/replicate_batcher.cc index 45fc5311f9df..0860bbed26b3 100644 --- a/src/v/raft/replicate_batcher.cc +++ b/src/v/raft/replicate_batcher.cc @@ -32,9 +32,9 @@ replicate_batcher::replicate_batcher(consensus* ptr, size_t cache_size) : _ptr(ptr) , _max_batch_size_sem(cache_size) {} -ss::future> -replicate_batcher::replicate(model::record_batch_reader&& r) { - return do_cache(std::move(r)).then([this](item_ptr i) { +ss::future> replicate_batcher::replicate( + std::optional expected_term, model::record_batch_reader&& r) { + return do_cache(expected_term, std::move(r)).then([this](item_ptr i) { return _lock.with([this] { return flush(); }).then([i] { return i->_promise.get_future(); }); @@ -51,13 +51,14 @@ ss::future<> replicate_batcher::stop() { "finish replicating pending entries")); } _item_cache.clear(); - _data_cache.clear(); }); } -ss::future -replicate_batcher::do_cache(model::record_batch_reader&& r) { + +ss::future replicate_batcher::do_cache( + std::optional expected_term, model::record_batch_reader&& r) { return model::consume_reader_to_memory(std::move(r), model::no_timeout) - .then([this](ss::circular_buffer batches) { + .then([this, + expected_term](ss::circular_buffer batches) { ss::circular_buffer data; size_t bytes = std::accumulate( batches.cbegin(), @@ -66,27 +67,31 @@ replicate_batcher::do_cache(model::record_batch_reader&& r) { [](size_t sum, const model::record_batch& b) { return sum + b.size_bytes(); }); - return do_cache_with_backpressure(std::move(batches), bytes); + return do_cache_with_backpressure( + expected_term, std::move(batches), bytes); }); } ss::future replicate_batcher::do_cache_with_backpressure( - ss::circular_buffer batches, size_t bytes) { + std::optional expected_term, + ss::circular_buffer batches, + size_t bytes) { return ss::get_units(_max_batch_size_sem, bytes) - .then([this, batches = std::move(batches)]( + .then([this, expected_term, batches = std::move(batches)]( ss::semaphore_units<> u) mutable { size_t record_count = 0; + auto i = ss::make_lw_shared(); for (auto& b : batches) { record_count += b.record_count(); if (b.header().ctx.owner_shard == ss::this_shard_id()) { - _data_cache.push_back(std::move(b)); + i->data.push_back(std::move(b)); } else { - _data_cache.push_back(b.copy()); + i->data.push_back(b.copy()); } } - auto i = ss::make_lw_shared(); + i->expected_term = expected_term; i->record_count = record_count; i->units = std::move(u); _item_cache.emplace_back(i); @@ -95,20 +100,14 @@ replicate_batcher::do_cache_with_backpressure( } ss::future<> replicate_batcher::flush() { - auto notifications = std::exchange(_item_cache, {}); - auto data = std::exchange(_data_cache, {}); - if (notifications.empty()) { + auto item_cache = std::exchange(_item_cache, {}); + if (item_cache.empty()) { return ss::now(); } return ss::with_gate( - _ptr->_bg, - [this, - data = std::move(data), - notifications = std::move(notifications)]() mutable { + _ptr->_bg, [this, item_cache = std::move(item_cache)]() mutable { return _ptr->_op_lock.get_units().then( - [this, - data = std::move(data), - notifications = std::move(notifications)]( + [this, item_cache = std::move(item_cache)]( ss::semaphore_units<> u) mutable { // we have to check if we are the leader // it is critical as term could have been updated already by @@ -117,7 +116,7 @@ ss::future<> replicate_batcher::flush() { // this problem caused truncation failure. if (!_ptr->is_leader()) { - for (auto& n : notifications) { + for (auto& n : item_cache) { n->_promise.set_value(errc::not_leader); } return ss::make_ready_future<>(); @@ -125,9 +124,27 @@ ss::future<> replicate_batcher::flush() { auto meta = _ptr->meta(); auto const term = model::term_id(meta.term); - for (auto& b : data) { - b.set_term(term); + ss::circular_buffer data; + std::vector notifications; + + for (auto& n : item_cache) { + if ( + !n->expected_term.has_value() + || n->expected_term.value() == term) { + for (auto& b : n->data) { + b.set_term(term); + data.push_back(std::move(b)); + } + notifications.push_back(std::move(n)); + } else { + n->_promise.set_value(errc::not_leader); + } + } + + if (notifications.empty()) { + return ss::now(); } + auto seqs = _ptr->next_followers_request_seq(); append_entries_request req( _ptr->_self, @@ -141,6 +158,7 @@ ss::future<> replicate_batcher::flush() { }); }); } + static void propagate_result( result r, std::vector& notifications) { diff --git a/src/v/raft/replicate_batcher.h b/src/v/raft/replicate_batcher.h index e7571990b5ec..539eb8f0405e 100644 --- a/src/v/raft/replicate_batcher.h +++ b/src/v/raft/replicate_batcher.h @@ -29,6 +29,8 @@ class replicate_batcher { ss::promise> _promise; replicate_result ret; size_t record_count; + std::vector data; + std::optional expected_term; /** * Item keeps semaphore units until replicate batcher is done with * processing the request. @@ -45,7 +47,7 @@ class replicate_batcher { ~replicate_batcher() noexcept = default; ss::future> - replicate(model::record_batch_reader&&); + replicate(std::optional, model::record_batch_reader&&); ss::future<> flush(); ss::future<> stop(); @@ -58,15 +60,17 @@ class replicate_batcher { absl::flat_hash_map); private: - ss::future do_cache(model::record_batch_reader&&); + ss::future + do_cache(std::optional, model::record_batch_reader&&); ss::future do_cache_with_backpressure( - ss::circular_buffer, size_t); + std::optional, + ss::circular_buffer, + size_t); consensus* _ptr; ss::semaphore _max_batch_size_sem; std::vector _item_cache; - ss::circular_buffer _data_cache; mutex _lock; }; diff --git a/src/v/raft/tests/append_entries_test.cc b/src/v/raft/tests/append_entries_test.cc index 30767b940644..015f9dfd70d1 100644 --- a/src/v/raft/tests/append_entries_test.cc +++ b/src/v/raft/tests/append_entries_test.cc @@ -66,6 +66,54 @@ FIXTURE_TEST(test_replicate_multiple_entries, raft_test_fixture) { "State is consistent"); }; +FIXTURE_TEST(test_replicate_with_expected_term_leader, raft_test_fixture) { + raft_group gr = raft_group(raft::group_id(0), 3); + gr.enable_all(); + auto leader_id = wait_for_group_leader(gr); + auto leader_raft = gr.get_member(leader_id).consensus; + auto term = leader_raft->term(); + bool success = replicate_random_batches( + term, gr, 5, raft::consistency_level::leader_ack) + .get0(); + BOOST_REQUIRE(success); +}; + +FIXTURE_TEST(test_replicate_with_expected_term_quorum, raft_test_fixture) { + raft_group gr = raft_group(raft::group_id(0), 3); + gr.enable_all(); + auto leader_id = wait_for_group_leader(gr); + auto leader_raft = gr.get_member(leader_id).consensus; + auto term = leader_raft->term(); + bool success = replicate_random_batches( + term, gr, 5, raft::consistency_level::quorum_ack) + .get0(); + BOOST_REQUIRE(success); +}; + +FIXTURE_TEST(test_replicate_violating_expected_term_leader, raft_test_fixture) { + raft_group gr = raft_group(raft::group_id(0), 3); + gr.enable_all(); + auto leader_id = wait_for_group_leader(gr); + auto leader_raft = gr.get_member(leader_id).consensus; + auto term = leader_raft->term() + model::term_id(100); + bool success = replicate_random_batches( + term, gr, 5, raft::consistency_level::leader_ack) + .get0(); + BOOST_REQUIRE(!success); +}; + +FIXTURE_TEST(test_replicate_violating_expected_term_quorum, raft_test_fixture) { + raft_group gr = raft_group(raft::group_id(0), 3); + gr.enable_all(); + auto leader_id = wait_for_group_leader(gr); + auto leader_raft = gr.get_member(leader_id).consensus; + auto term = leader_raft->term() + model::term_id(100); + bool success = replicate_random_batches( + term, gr, 5, raft::consistency_level::quorum_ack) + .get0(); + BOOST_REQUIRE(!success); +}; + FIXTURE_TEST(test_single_node_recovery, raft_test_fixture) { raft_group gr = raft_group(raft::group_id(0), 3); gr.enable_all(); diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index f004a708376c..c23da3164ff8 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -690,6 +690,28 @@ static ss::future replicate_random_batches( }); } +static ss::future replicate_random_batches( + model::term_id expected_term, + raft_group& gr, + int count, + raft::consistency_level c_lvl = raft::consistency_level::quorum_ack, + model::timeout_clock::duration tout = 1s) { + return retry_with_leader( + gr, 5, tout, [count, expected_term, c_lvl](raft_node& leader_node) { + auto rdr = random_batches_reader(count); + raft::replicate_options opts(c_lvl); + + return leader_node.consensus + ->replicate(expected_term, std::move(rdr), opts) + .then([](result res) { + if (!res) { + return false; + } + return true; + }); + }); +} + /** * Makes compactible batches, having one record per batch */