diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index 44a547886c..c7bf61fa0f 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -233,6 +233,9 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, case bulk_load_status::BLS_PAUSING: pause_bulk_load(); break; + case bulk_load_status::BLS_CANCELED: { + handle_bulk_load_finish(bulk_load_status::BLS_CANCELED); + } break; // TODO(heyuchen): add other bulk load status default: break; @@ -600,6 +603,7 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type report_group_ingestion_status(response); break; case bulk_load_status::BLS_SUCCEED: + case bulk_load_status::BLS_CANCELED: report_group_cleaned_up(response); break; case bulk_load_status::BLS_PAUSING: @@ -790,6 +794,7 @@ void replica_bulk_loader::report_bulk_load_states_to_primary( bulk_load_state.__set_ingest_status(_replica->_app->get_ingestion_status()); break; case bulk_load_status::BLS_SUCCEED: + case bulk_load_status::BLS_CANCELED: bulk_load_state.__set_is_cleaned_up(is_cleaned_up()); break; case bulk_load_status::BLS_PAUSING: diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index 4168c54cb9..9692ddbe67 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -523,9 +523,6 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) // istatus, is_ingestion, create_dir will be used in further tests // - failed during downloading // - failed during ingestion - // - cancel during downloaded - // - cancel during ingestion - // - cancel during succeed struct test_struct { bulk_load_status::type local_status; @@ -545,7 +542,25 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test) ingestion_status::IS_INVALID, false, bulk_load_status::BLS_SUCCEED, - false}}; + false}, + {bulk_load_status::BLS_DOWNLOADED, + 100, + ingestion_status::IS_INVALID, + false, + bulk_load_status::BLS_CANCELED, + true}, + {bulk_load_status::BLS_INGESTING, + 100, + ingestion_status::type::IS_RUNNING, + true, + bulk_load_status::BLS_CANCELED, + true}, + {bulk_load_status::BLS_SUCCEED, + 100, + ingestion_status::IS_INVALID, + false, + bulk_load_status::BLS_CANCELED, + true}}; for (auto test : tests) { if (test.create_dir) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 33f7520c7c..2b04987ff4 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -575,24 +575,27 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon const std::string &app_name = response.app_name; const gpid &pid = response.pid; - dassert_f( - response.__isset.is_group_bulk_load_context_cleaned_up, - "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " - "but is_group_bulk_load_context_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status)); + if (!response.__isset.is_group_bulk_load_context_cleaned_up) { + dwarn_f("receive bulk load response from node({}) app({}) partition({}), " + "primary_status({}), but is_group_bulk_load_context_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); + return; + } for (const auto &kv : response.group_bulk_load_state) { - dassert_f(kv.second.__isset.is_cleaned_up, - "receive bulk load response from node({}) app({}), partition({}), " - "primary_status({}), but node({}) is_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status), - kv.first.to_string()); + if (!kv.second.__isset.is_cleaned_up) { + dwarn_f("receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); + return; + } } { @@ -911,7 +914,8 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk // TODO(heyuchen): add other status if (new_status == bulk_load_status::BLS_PAUSING || - new_status == bulk_load_status::BLS_DOWNLOADING) { + new_status == bulk_load_status::BLS_DOWNLOADING || + new_status == bulk_load_status::BLS_CANCELED) { for (int i = 0; i < ainfo.partition_count; ++i) { update_partition_status_on_remote_storage( ainfo.app_name, gpid(app_id, i), new_status, should_send_request); @@ -1175,9 +1179,28 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc) bulk_load_status::BLS_DOWNLOADING, true)); case bulk_load_control_type::BLC_CANCEL: - // TODO(heyuchen): TBD + if (app_status != bulk_load_status::BLS_DOWNLOADING && + app_status != bulk_load_status::BLS_PAUSED) { + auto hint_msg = fmt::format("can not cancel bulk load for app({}) with status({})", + app_name, + dsn::enum_to_string(app_status)); + derror_f("{}", hint_msg); + response.err = ERR_INVALID_STATE; + response.__set_hint_msg(hint_msg); + return; + } case bulk_load_control_type::BLC_FORCE_CANCEL: - // TODO(heyuchen): TBD + ddebug_f("app({}) start to {} cancel bulk load, original status = {}", + app_name, + control_type == bulk_load_control_type::BLC_FORCE_CANCEL ? "force" : "", + dsn::enum_to_string(app_status)); + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::update_app_status_on_remote_storage_unlocked, + this, + app_id, + bulk_load_status::BLS_CANCELED, + app_status == bulk_load_status::BLS_PAUSED)); default: break; } diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index b057cb60cb..17b9876fe3 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -172,7 +172,6 @@ TEST_F(bulk_load_service_test, control_bulk_load_test) fail::setup(); fail::cfg("meta_update_app_status_on_remote_storage_unlocked", "return()"); - // TODO(heyuchen): add restart/cancel/force_cancel test cases struct control_test { bulk_load_control_type::type type; @@ -182,7 +181,11 @@ TEST_F(bulk_load_service_test, control_bulk_load_test) {bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADING, ERR_OK}, {bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADED, ERR_INVALID_STATE}, {bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSED, ERR_OK}, - {bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE}}; + {bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE}, + {bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_DOWNLOADING, ERR_OK}, + {bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_PAUSED, ERR_OK}, + {bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_INGESTING, ERR_INVALID_STATE}, + {bulk_load_control_type::BLC_FORCE_CANCEL, bulk_load_status::BLS_SUCCEED, ERR_OK}}; for (auto test : tests) { ASSERT_EQ(control_bulk_load(app->app_id, test.type, test.app_status), test.expected_err); @@ -419,6 +422,20 @@ TEST_F(bulk_load_process_test, succeed_all_finished) ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); } +TEST_F(bulk_load_process_test, cancel_not_all_finished) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_CANCELED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_CANCELED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_CANCELED); +} + +TEST_F(bulk_load_process_test, cancel_all_finished) +{ + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_CANCELED); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_CANCELED); + ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); +} + // TODO(heyuchen): add half cleanup test while failed TEST_F(bulk_load_process_test, pausing)