diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index e361cc0fb6..f6530be6e7 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -183,9 +183,33 @@ void replica_bulk_loader::on_group_bulk_load_reply(error_code err, _replica->_primary_states.group_bulk_load_pending_replies.erase(req.target_address); - // TODO(heyuchen): TBD - // if error happened, reset secondary bulk_load_state - // otherwise, set secondary bulk_load_states from resp + if (err != ERR_OK) { + derror_replica("failed to receive group_bulk_load_reply from {}, error = {}", + req.target_address.to_string(), + err.to_string()); + _replica->_primary_states.reset_node_bulk_load_states(req.target_address); + return; + } + + if (resp.err != ERR_OK) { + derror_replica("receive group_bulk_load response from {} failed, error = {}", + req.target_address.to_string(), + resp.err.to_string()); + _replica->_primary_states.reset_node_bulk_load_states(req.target_address); + return; + } + + if (req.config.ballot != get_ballot()) { + derror_replica("recevied wrong group_bulk_load response from {}, request ballot = {}, " + "current ballot = {}", + req.target_address.to_string(), + req.config.ballot, + get_ballot()); + _replica->_primary_states.reset_node_bulk_load_states(req.target_address); + return; + } + + _replica->_primary_states.secondary_bulk_load_states[req.target_address] = resp.bulk_load_state; } // ThreadPool: THREAD_POOL_REPLICATION @@ -484,7 +508,9 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta } if (status() == partition_status::PS_PRIMARY) { - // TODO(heyuchen): cleanup _primary_states.secondary_bulk_load_states + for (const auto &target_address : _replica->_primary_states.membership.secondaries) { + _replica->_primary_states.reset_node_bulk_load_states(target_address); + } } ddebug_replica("bulk load finished, old_status = {}, new_status = {}", @@ -543,7 +569,7 @@ void replica_bulk_loader::clear_bulk_load_states() _replica->_is_bulk_load_ingestion = false; _replica->_app->set_ingestion_status(ingestion_status::IS_INVALID); - // TODO(heyuchen): clear other states + // TODO(heyuchen): clear other states for perf-counter _status = bulk_load_status::BLS_INVALID; } @@ -812,5 +838,16 @@ void replica_bulk_loader::report_bulk_load_states_to_primary( response.bulk_load_state = bulk_load_state; } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_bulk_loader::clear_bulk_load_states_if_needed(partition_status::type new_status) +{ + partition_status::type old_status = status(); + if ((new_status == partition_status::PS_PRIMARY || + new_status == partition_status::PS_SECONDARY) && + new_status != old_status) { + clear_bulk_load_states(); + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h index 0dc88bafc8..1a86d4f04a 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h @@ -88,6 +88,9 @@ class replica_bulk_loader : replica_base void report_bulk_load_states_to_primary(bulk_load_status::type remote_status, /*out*/ group_bulk_load_response &response); + // called by `update_local_configuration` to do possible states cleaning up + void clear_bulk_load_states_if_needed(partition_status::type new_status); + /// /// bulk load path on remote file provider: /// ///{bulk_load_info} diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index 1fb7558bb1..0d7f829bce 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -135,6 +135,17 @@ class replica_bulk_loader_test : public replica_test_base return response.is_group_bulk_load_paused; } + void test_on_group_bulk_load_reply(bulk_load_status::type req_status, + ballot req_ballot, + error_code resp_error = ERR_OK, + error_code rpc_error = ERR_OK) + { + create_group_bulk_load_request(req_status, req_ballot); + group_bulk_load_response resp; + resp.err = resp_error; + _bulk_loader->on_group_bulk_load_reply(rpc_error, _group_req, resp); + } + /// mock structure functions void @@ -322,6 +333,15 @@ class replica_bulk_loader_test : public replica_test_base _replica->set_secondary_bulk_load_state(SECONDARY2, state2); } + void mock_group_ingestion_states(ingestion_status::type s1_status, + ingestion_status::type s2_status, + bool is_empty_prepare_sent = true) + { + mock_replica_bulk_load_varieties( + bulk_load_status::BLS_INGESTING, 100, ingestion_status::IS_SUCCEED); + mock_secondary_ingestion_states(s1_status, s2_status, is_empty_prepare_sent); + } + void mock_group_cleanup_flag(bulk_load_status::type primary_status, bool s1_cleaned_up = true, bool s2_cleaned_up = true) @@ -343,6 +363,19 @@ class replica_bulk_loader_test : public replica_test_base bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; } bool is_cleaned_up() { return _bulk_loader->is_cleaned_up(); } int32_t get_download_progress() { return _bulk_loader->_download_progress.load(); } + bool is_secondary_bulk_load_state_reset() + { + const partition_bulk_load_state &state = _replica->get_secondary_bulk_load_state(SECONDARY); + bool is_download_state_reset = + (state.__isset.download_progress && state.__isset.download_status && + state.download_progress == 0 && state.download_status == ERR_OK); + bool is_ingestion_status_reset = + (state.__isset.ingest_status && state.ingest_status == ingestion_status::IS_INVALID); + bool is_cleanup_flag_reset = (state.__isset.is_cleaned_up && !state.is_cleaned_up); + bool is_paused_flag_reset = (state.__isset.is_paused && !state.is_paused); + return is_download_state_reset && is_ingestion_status_reset && is_cleanup_flag_reset && + is_paused_flag_reset; + } public: std::unique_ptr _replica; @@ -748,5 +781,58 @@ TEST_F(replica_bulk_loader_test, report_group_is_paused_test) } } +// on_group_bulk_load_reply unit tests +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloading_error) +{ + mock_group_progress(bulk_load_status::BLS_DOWNLOADING, 30, 30, 60); + test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADING, BALLOT, ERR_BUSY); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloaded_error) +{ + mock_group_progress(bulk_load_status::BLS_DOWNLOADED); + test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADED, BALLOT, ERR_INVALID_STATE); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_ingestion_error) +{ + mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED); + test_on_group_bulk_load_reply( + bulk_load_status::BLS_INGESTING, BALLOT - 1, ERR_OK, ERR_INVALID_STATE); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_succeed_error) +{ + mock_group_cleanup_flag(bulk_load_status::BLS_SUCCEED); + test_on_group_bulk_load_reply( + bulk_load_status::BLS_SUCCEED, BALLOT - 1, ERR_OK, ERR_INVALID_STATE); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_failed_error) +{ + mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED); + test_on_group_bulk_load_reply(bulk_load_status::BLS_FAILED, BALLOT, ERR_OK, ERR_TIMEOUT); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_pausing_error) +{ + mock_group_progress(bulk_load_status::BLS_PAUSED, 100, 50, 10); + test_on_group_bulk_load_reply( + bulk_load_status::BLS_PAUSING, BALLOT, ERR_OK, ERR_NETWORK_FAILURE); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + +TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_rpc_error) +{ + mock_group_cleanup_flag(bulk_load_status::BLS_INVALID, true, false); + test_on_group_bulk_load_reply(bulk_load_status::BLS_CANCELED, BALLOT, ERR_OBJECT_NOT_FOUND); + ASSERT_TRUE(is_secondary_bulk_load_state_reset()); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 94350d878a..86ace011b6 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -38,6 +38,7 @@ #include "mutation.h" #include "mutation_log.h" #include "replica_stub.h" +#include "bulk_load/replica_bulk_loader.h" #include #include #include @@ -749,6 +750,8 @@ bool replica::update_local_configuration(const replica_configuration &config, max_prepared_decree(), last_committed_decree()); + _bulk_loader->clear_bulk_load_states_if_needed(config.status); + // Notice: there has five ways that primary can change its partition_status // 1, primary change partition config, such as add/remove secondary // 2, downgrage to secondary because of load balance diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 833b31425a..e84d1520dc 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -154,6 +154,15 @@ bool primary_context::check_exist(::dsn::rpc_address node, partition_status::typ } } +void primary_context::reset_node_bulk_load_states(const rpc_address &node) +{ + secondary_bulk_load_states[node].__set_download_progress(0); + secondary_bulk_load_states[node].__set_download_status(ERR_OK); + secondary_bulk_load_states[node].__set_ingest_status(ingestion_status::IS_INVALID); + secondary_bulk_load_states[node].__set_is_cleaned_up(false); + secondary_bulk_load_states[node].__set_is_paused(false); +} + void primary_context::cleanup_bulk_load_states() { secondary_bulk_load_states.erase(secondary_bulk_load_states.begin(), diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index ae214ee500..d88f4a966b 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -97,6 +97,9 @@ class primary_context void do_cleanup_pending_mutations(bool clean_pending_mutations = true); + // reset bulk load states in secondary_bulk_load_states by node address + void reset_node_bulk_load_states(const rpc_address &node); + void cleanup_bulk_load_states(); public: diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 2dc415596f..3d7f119a84 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -147,6 +147,10 @@ class mock_replica : public replica { _primary_states.membership = pconfig; } + partition_bulk_load_state get_secondary_bulk_load_state(const rpc_address &node) + { + return _primary_states.secondary_bulk_load_states[node]; + } void set_secondary_bulk_load_state(const rpc_address &node, const partition_bulk_load_state &state) {