diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index c7bf61fa0f..e361cc0fb6 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -233,10 +233,13 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, case bulk_load_status::BLS_PAUSING: pause_bulk_load(); break; - case bulk_load_status::BLS_CANCELED: { + case bulk_load_status::BLS_CANCELED: handle_bulk_load_finish(bulk_load_status::BLS_CANCELED); - } break; - // TODO(heyuchen): add other bulk load status + break; + case bulk_load_status::BLS_FAILED: + handle_bulk_load_finish(bulk_load_status::BLS_FAILED); + // TODO(heyuchen): add perf-counter here + break; default: break; } @@ -604,12 +607,12 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type break; case bulk_load_status::BLS_SUCCEED: case bulk_load_status::BLS_CANCELED: + case bulk_load_status::BLS_FAILED: report_group_cleaned_up(response); break; case bulk_load_status::BLS_PAUSING: report_group_is_paused(response); break; - // TODO(heyuchen): add other status default: break; } @@ -795,12 +798,12 @@ void replica_bulk_loader::report_bulk_load_states_to_primary( break; case bulk_load_status::BLS_SUCCEED: case bulk_load_status::BLS_CANCELED: + case bulk_load_status::BLS_FAILED: bulk_load_state.__set_is_cleaned_up(is_cleaned_up()); break; case bulk_load_status::BLS_PAUSING: bulk_load_state.__set_is_paused(local_status == bulk_load_status::BLS_PAUSED); break; - // TODO(heyuchen): add other status default: break; } diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index 9692ddbe67..1fb7558bb1 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -519,10 +519,13 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) // Test cases // - bulk load succeed // - double bulk load finish - // TODO(heyuchen): add following cases - // istatus, is_ingestion, create_dir will be used in further tests + // - cancel during downloaded + // - cancel during ingestion + // - cancel during succeed // - failed during downloading // - failed during ingestion + // Tip: bulk load dir will be removed if bulk load finished, so we should create dir before some + // cases struct test_struct { bulk_load_status::type local_status; @@ -560,6 +563,18 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) ingestion_status::IS_INVALID, false, bulk_load_status::BLS_CANCELED, + true}, + {bulk_load_status::BLS_DOWNLOADING, + 10, + ingestion_status::IS_INVALID, + false, + bulk_load_status::BLS_FAILED, + true}, + {bulk_load_status::BLS_INGESTING, + 100, + ingestion_status::type::IS_FAILED, + false, + bulk_load_status::BLS_FAILED, true}}; for (auto test : tests) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 2b04987ff4..5a5c4bb95b 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -712,15 +712,21 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::handle_bulk_load_failed(int32_t app_id) { - // TODO(heyuchen): TBD - // replica meets serious error during bulk load, such as file on remote storage is damaged - // should stop bulk load process, set bulk load failed + zauto_write_lock l(_lock); + if (!_apps_cleaning_up[app_id]) { + _apps_cleaning_up[app_id] = true; + update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED); + } } // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name) { - // TODO(heyuchen): TBD + zauto_write_lock l(_lock); + if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) { + _apps_cleaning_up[app_id] = true; + remove_bulk_load_dir_on_remote_storage(app_id, app_name); + } } // ThreadPool: THREAD_POOL_META_STATE @@ -799,7 +805,6 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply( dsn::enum_to_string(old_status), dsn::enum_to_string(new_status)); - // TODO(heyuchen): add other status switch (new_status) { case bulk_load_status::BLS_DOWNLOADED: case bulk_load_status::BLS_INGESTING: @@ -822,6 +827,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply( } break; default: + // do nothing in other status break; } } @@ -912,10 +918,10 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk } } - // TODO(heyuchen): add other status if (new_status == bulk_load_status::BLS_PAUSING || new_status == bulk_load_status::BLS_DOWNLOADING || - new_status == bulk_load_status::BLS_CANCELED) { + new_status == bulk_load_status::BLS_CANCELED || + new_status == bulk_load_status::BLS_FAILED) { for (int i = 0; i < ainfo.partition_count; ++i) { update_partition_status_on_remote_storage( ainfo.app_name, gpid(app_id, i), new_status, should_send_request); @@ -1088,7 +1094,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std:: erase_map_elem_by_id(app_id, _partition_bulk_load_info); erase_map_elem_by_id(app_id, _partitions_total_download_progress); erase_map_elem_by_id(app_id, _partitions_cleaned_up); - // TODO(heyuchen): add other varieties + _apps_cleaning_up.erase(app_id); _bulk_load_app_id.erase(app_id); ddebug_f("reset local app({}) bulk load context", app_name); } diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index ef3e4fc1cd..55b3c17a4c 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -334,6 +334,8 @@ class bulk_load_service _partitions_bulk_load_state; std::unordered_map _partitions_cleaned_up; + // Used for bulk load failed and app unavailable to avoid duplicated clean up + std::unordered_map _apps_cleaning_up; }; } // namespace replication diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 17b9876fe3..9f38a896f6 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -350,9 +350,12 @@ class bulk_load_process_test : public bulk_load_service_test /// on_partition_bulk_load_reply unit tests -// TODO(heyuchen): -// add `downloading_fs_error` unit tests after implement function `handle_bulk_load_failed` -// add `downloading_corrupt` unit tests after implement function `handle_bulk_load_failed` +TEST_F(bulk_load_process_test, downloading_fs_error) +{ + test_on_partition_bulk_load_reply( + _partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_FS_INTERNAL); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} TEST_F(bulk_load_process_test, downloading_busy) { @@ -361,6 +364,13 @@ TEST_F(bulk_load_process_test, downloading_busy) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING); } +TEST_F(bulk_load_process_test, downloading_corrupt) +{ + mock_response_progress(ERR_CORRUPTION, false); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + TEST_F(bulk_load_process_test, downloading_report_metadata) { mock_response_bulk_load_metadata(); @@ -399,7 +409,12 @@ TEST_F(bulk_load_process_test, ingestion_running) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING); } -// TODO(heyuchen): add ingestion_error unit tests after implement function `handle_app_failed` +TEST_F(bulk_load_process_test, ingestion_error) +{ + mock_response_ingestion_status(ingestion_status::IS_FAILED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} TEST_F(bulk_load_process_test, normal_succeed) { @@ -436,7 +451,19 @@ TEST_F(bulk_load_process_test, cancel_all_finished) ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); } -// TODO(heyuchen): add half cleanup test while failed +TEST_F(bulk_load_process_test, failed_not_all_finished) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + +TEST_F(bulk_load_process_test, failed_all_finished) +{ + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_FAILED); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_FAILED); + ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); +} TEST_F(bulk_load_process_test, pausing) { @@ -457,7 +484,6 @@ TEST_F(bulk_load_process_test, pause_succeed) /// on_partition_ingestion_reply unit tests // TODO(heyuchen): // add ingest_rpc_error unit tests after implement function `rollback_downloading` -// add ingest_wrong unit tests after implement function `handle_app_failed` TEST_F(bulk_load_process_test, ingest_empty_write_error) { @@ -467,6 +493,14 @@ TEST_F(bulk_load_process_test, ingest_empty_write_error) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING); } +TEST_F(bulk_load_process_test, ingest_wrong) +{ + mock_ingestion_context(ERR_OK, 1, _partition_count); + test_on_partition_ingestion_reply(_ingestion_resp, gpid(_app_id, _pidx)); + wait_all(); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + TEST_F(bulk_load_process_test, ingest_succeed) { mock_ingestion_context(ERR_OK, 0, 1);