diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index b0b93d6b68..ebab0b5975 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -426,14 +426,94 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type bool report_metadata, /*out*/ bulk_load_response &response) { - // TODO(heyuchen): TBD + if (status() != partition_status::PS_PRIMARY) { + response.err = ERR_INVALID_STATE; + return; + } + + if (report_metadata && !_metadata.files.empty()) { + response.__set_metadata(_metadata); + } + + switch (remote_status) { + case bulk_load_status::BLS_DOWNLOADING: + case bulk_load_status::BLS_DOWNLOADED: + report_group_download_progress(response); + break; + // TODO(heyuchen): add other status + default: + break; + } + + response.primary_bulk_load_status = _status; } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_response &response) +{ + if (status() != partition_status::PS_PRIMARY) { + dwarn_replica("replica status={}, should be {}", + enum_to_string(status()), + enum_to_string(partition_status::PS_PRIMARY)); + response.err = ERR_INVALID_STATE; + return; + } + + partition_bulk_load_state primary_state; + primary_state.__set_download_progress(_download_progress.load()); + primary_state.__set_download_status(_download_status.load()); + response.group_bulk_load_state[_replica->_primary_states.membership.primary] = primary_state; + ddebug_replica("primary = {}, download progress = {}%, status = {}", + _replica->_primary_states.membership.primary.to_string(), + primary_state.download_progress, + primary_state.download_status); + + int32_t total_progress = primary_state.download_progress; + for (const auto &target_address : _replica->_primary_states.membership.secondaries) { + const auto &secondary_state = + _replica->_primary_states.secondary_bulk_load_states[target_address]; + int32_t s_progress = + secondary_state.__isset.download_progress ? secondary_state.download_progress : 0; + error_code s_status = + secondary_state.__isset.download_status ? secondary_state.download_status : ERR_OK; + ddebug_replica("secondary = {}, download progress = {}%, status={}", + target_address.to_string(), + s_progress, + s_status); + response.group_bulk_load_state[target_address] = secondary_state; + total_progress += s_progress; + } + + total_progress /= _replica->_primary_states.membership.max_replica_count; + ddebug_replica("total download progress = {}%", total_progress); + response.__set_total_download_progress(total_progress); +} + +// ThreadPool: THREAD_POOL_REPLICATION void replica_bulk_loader::report_bulk_load_states_to_primary( bulk_load_status::type remote_status, /*out*/ group_bulk_load_response &response) { - // TODO(heyuchen): TBD + if (status() != partition_status::PS_SECONDARY) { + response.err = ERR_INVALID_STATE; + return; + } + + partition_bulk_load_state bulk_load_state; + auto local_status = _status; + switch (remote_status) { + case bulk_load_status::BLS_DOWNLOADING: + case bulk_load_status::BLS_DOWNLOADED: + bulk_load_state.__set_download_progress(_download_progress.load()); + bulk_load_state.__set_download_status(_download_status.load()); + break; + // TODO(heyuchen): add other status + default: + break; + } + + response.status = local_status; + response.bulk_load_state = bulk_load_state; } } // namespace replication diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h index 1bab20a55a..e62b649f83 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h @@ -69,6 +69,8 @@ class replica_bulk_loader : replica_base void report_bulk_load_states_to_meta(bulk_load_status::type remote_status, bool report_metadata, /*out*/ bulk_load_response &response); + void report_group_download_progress(/*out*/ bulk_load_response &response); + void report_bulk_load_states_to_primary(bulk_load_status::type remote_status, /*out*/ group_bulk_load_response &response); diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index 0b77fb8793..acb92f0c40 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -61,6 +61,17 @@ class replica_bulk_loader_test : public replica_test_base return _bulk_loader->verify_file(f_meta, LOCAL_DIR); } + int32_t test_report_group_download_progress(bulk_load_status::type status, + int32_t p_progress, + int32_t s1_progress, + int32_t s2_progress) + { + mock_group_progress(status, p_progress, s1_progress, s2_progress); + bulk_load_response response; + _bulk_loader->report_group_download_progress(response); + return response.total_download_progress; + } + /// mock structure functions void @@ -105,14 +116,14 @@ class replica_bulk_loader_test : public replica_test_base void mock_primary_states() { mock_replica_config(partition_status::PS_PRIMARY); - primary_context p_context = _replica->get_primary_context(); - partition_configuration &config = p_context.membership; + partition_configuration config; config.max_replica_count = 3; config.pid = PID; config.ballot = BALLOT; config.primary = PRIMARY; config.secondaries.emplace_back(SECONDARY); config.secondaries.emplace_back(SECONDARY2); + _replica->set_primary_partition_configuration(config); } void create_local_file(const std::string &file_name) @@ -178,6 +189,51 @@ class replica_bulk_loader_test : public replica_test_base return true; } + void mock_replica_bulk_load_varieties(bulk_load_status::type status, + int32_t download_progress, + ingestion_status::type istatus, + bool is_ingestion = false) + { + _bulk_loader->_status = status; + _bulk_loader->_download_progress = download_progress; + // TODO(heyuchen): add ingestion status + } + + void mock_secondary_progress(int32_t secondary_progress1, int32_t secondary_progress2) + { + mock_primary_states(); + partition_bulk_load_state state1, state2; + state1.__set_download_status(ERR_OK); + state1.__set_download_progress(secondary_progress1); + state2.__set_download_status(ERR_OK); + state2.__set_download_progress(secondary_progress2); + _replica->set_secondary_bulk_load_state(SECONDARY, state1); + _replica->set_secondary_bulk_load_state(SECONDARY2, state2); + } + + void mock_group_progress(bulk_load_status::type p_status, + int32_t p_progress, + int32_t s1_progress, + int32_t s2_progress) + { + if (p_status == bulk_load_status::BLS_INVALID) { + p_progress = 0; + } else if (p_status == bulk_load_status::BLS_DOWNLOADED) { + p_progress = 100; + } + mock_replica_bulk_load_varieties(p_status, p_progress, ingestion_status::IS_INVALID); + mock_secondary_progress(s1_progress, s2_progress); + } + + void mock_group_progress(bulk_load_status::type p_status) + { + if (p_status == bulk_load_status::BLS_INVALID) { + mock_group_progress(p_status, 0, 0, 0); + } else if (p_status == bulk_load_status::BLS_DOWNLOADED) { + mock_group_progress(p_status, 100, 100, 100); + } + } + // helper functions bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; } @@ -328,5 +384,30 @@ TEST_F(replica_bulk_loader_test, verify_file_succeed) utils::filesystem::remove_path(LOCAL_DIR); } +// report_group_download_progress unit tests +TEST_F(replica_bulk_loader_test, report_group_download_progress_test) +{ + struct test_struct + { + bulk_load_status::type primary_status; + int32_t primary_progress; + int32_t secondary1_progress; + int32_t secondary2_progress; + int32_t total_progress; + } tests[]{ + {bulk_load_status::BLS_DOWNLOADING, 10, 10, 10, 10}, + {bulk_load_status::BLS_DOWNLOADED, 100, 0, 0, 33}, + {bulk_load_status::BLS_DOWNLOADED, 100, 100, 100, 100}, + }; + + for (auto test : tests) { + ASSERT_EQ(test_report_group_download_progress(test.primary_status, + test.primary_progress, + test.secondary1_progress, + test.secondary2_progress), + test.total_progress); + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 45fe38d882..4c5f73fa4c 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -159,6 +159,8 @@ void primary_context::cleanup_bulk_load_states() // TODO(heyuchen): TBD // primary will save bulk load states reported from secondaries, this function is to cleanup // those states + secondary_bulk_load_states.erase(secondary_bulk_load_states.begin(), + secondary_bulk_load_states.end()); } bool secondary_context::cleanup(bool force) diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index 222dc00b81..dfc134f79f 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -152,6 +152,8 @@ class primary_context // Used for bulk load // group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica node_tasks group_bulk_load_pending_replies; + // bulk_load_state of secondary replicas + std::unordered_map secondary_bulk_load_states; }; class secondary_context diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index c8e15ad9bc..443b5e4603 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -141,8 +141,15 @@ class mock_replica : public replica void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); } decree get_app_last_committed_decree() { return _app->last_committed_decree(); } void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; } - primary_context get_primary_context() { return _primary_states; } - void set_primary_context(primary_context context) { _primary_states = context; } + void set_primary_partition_configuration(partition_configuration &pconfig) + { + _primary_states.membership = pconfig; + } + void set_secondary_bulk_load_state(const rpc_address &node, + const partition_bulk_load_state &state) + { + _primary_states.secondary_bulk_load_states[node] = state; + } private: decree _max_gced_decree{invalid_decree - 1};