diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index d9e4a0aa9c..ef9011f35f 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -594,28 +594,24 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon const std::string &app_name = response.app_name; const gpid &pid = response.pid; - if (!response.__isset.is_group_bulk_load_context_cleaned_up) { - dwarn_f( - "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " - "but is_group_bulk_load_context_cleaned is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status)); - return; - } + dassert_f( + response.__isset.is_group_bulk_load_context_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " + "but is_group_bulk_load_context_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); for (const auto &kv : response.group_bulk_load_state) { - if (!kv.second.__isset.is_cleaned_up) { - dwarn_f("receive bulk load response from node({}) app({}), partition({}), " - "primary_status({}), but node({}) is_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status), - kv.first.to_string()); - return; - } + dassert_f(kv.second.__isset.is_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); } { @@ -623,7 +619,7 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon if (_partitions_cleaned_up[pid]) { dwarn_f( "receive bulk load response from node({}) app({}) partition({}), current partition " - "has already be cleaned up", + "has already been cleaned up", primary_addr.to_string(), app_name, pid); @@ -631,22 +627,23 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon } } - bool all_cleaned_up = response.is_group_bulk_load_context_cleaned_up; + // The replicas have cleaned up their bulk load states and removed temporary sst files + bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up; ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " - "is_group_bulk_load_context_cleaned = {}", + "is_group_bulk_load_context_cleaned_up = {}", primary_addr.to_string(), app_name, pid, dsn::enum_to_string(response.primary_bulk_load_status), - all_cleaned_up); + group_cleaned_up); { zauto_write_lock l(_lock); - _partitions_cleaned_up[pid] = all_cleaned_up; + _partitions_cleaned_up[pid] = group_cleaned_up; _partitions_bulk_load_state[pid] = response.group_bulk_load_state; } - if (all_cleaned_up) { - int32_t count; + if (group_cleaned_up) { + int32_t count = 0; { zauto_write_lock l(_lock); count = --_apps_in_progress_count[pid.get_app_id()]; @@ -657,15 +654,16 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon zauto_read_lock l(app_lock()); app = _state->get_app(pid.get_app_id()); if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", + dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote " + "storage", app_name, pid.get_app_id()); - remove_bulk_load_dir(pid.get_app_id(), app_name); + remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name); return; } } ddebug_f("app({}) all partitions cleanup bulk load context", app_name); - remove_bulk_load_dir(std::move(app), true); + remove_bulk_load_dir_on_remote_storage(std::move(app), true); } } } @@ -757,7 +755,7 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string zauto_write_lock l(_lock); if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) { _apps_cleaning_up[app_id] = true; - remove_bulk_load_dir(app_id, app_name); + remove_bulk_load_dir_on_remote_storage(app_id, app_name); } } @@ -1052,7 +1050,8 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err, } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::remove_bulk_load_dir(int32_t app_id, const std::string &app_name) +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id, + const std::string &app_name) { std::string bulk_load_path = get_app_bulk_load_path(app_id); _meta_svc->get_meta_storage()->delete_node_recursively( @@ -1063,35 +1062,30 @@ void bulk_load_service::remove_bulk_load_dir(int32_t app_id, const std::string & } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::remove_bulk_load_dir(std::shared_ptr app, bool need_set_app_flag) +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading) { std::string bulk_load_path = get_app_bulk_load_path(app->app_id); _meta_svc->get_meta_storage()->delete_node_recursively( - std::move(bulk_load_path), [this, app, need_set_app_flag, bulk_load_path]() { - ddebug_f("remove app({}) bulk load dir {}", app->app_name, bulk_load_path); + std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() { + ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path); reset_local_bulk_load_states(app->app_id, app->app_name); - if (need_set_app_flag) { - update_app_is_bulk_loading(std::move(app), false); + if (set_app_not_bulk_loading) { + update_app_not_bulk_loading_on_remote_storage(std::move(app)); } }); } -// ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::update_app_is_bulk_loading(std::shared_ptr app, - bool is_bulk_loading) +template +inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) { - app_info info = *app; - info.__set_is_bulk_loading(is_bulk_loading); - - blob value = dsn::json::json_forwarder::encode(info); - _meta_svc->get_meta_storage()->set_data( - _state->get_app_path(*app), std::move(value), [app, is_bulk_loading, this]() { - { - zauto_write_lock l(app_lock()); - app->is_bulk_loading = is_bulk_loading; - } - ddebug_f("app({}) update app is_bulk_loading to {}", app->app_name, is_bulk_loading); - }); + for (auto iter = mymap.begin(); iter != mymap.end();) { + if (iter->first.get_app_id() == app_id) { + mymap.erase(iter++); + } else { + iter++; + } + } } // ThreadPool: THREAD_POOL_META_STATE @@ -1111,6 +1105,22 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std:: ddebug_f("reset local app({}) bulk load context", app_name); } +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_app_not_bulk_loading_on_remote_storage( + std::shared_ptr app) +{ + app_info info = *app; + info.__set_is_bulk_loading(false); + + blob value = dsn::json::json_forwarder::encode(info); + _meta_svc->get_meta_storage()->set_data( + _state->get_app_path(*app), std::move(value), [app, this]() { + zauto_write_lock l(app_lock()); + app->is_bulk_loading = false; + ddebug_f("app({}) update app is_bulk_loading to false", app->app_name); + }); +} + // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) { @@ -1294,9 +1304,9 @@ void bulk_load_service::continue_bulk_load( derror_f( "app(name={},app_id={}) is not existed or not available", ainfo.app_name, ainfo.app_id); if (app == nullptr) { - remove_bulk_load_dir(ainfo.app_id, ainfo.app_name); + remove_bulk_load_dir_on_remote_storage(ainfo.app_id, ainfo.app_name); } else { - remove_bulk_load_dir(std::move(app), true); + remove_bulk_load_dir_on_remote_storage(std::move(app), true); } return; } @@ -1308,7 +1318,7 @@ void bulk_load_service::continue_bulk_load( ainfo, partition_bulk_load_info_map, different_status_pidx_set)) { - remove_bulk_load_dir(std::move(app), true); + remove_bulk_load_dir_on_remote_storage(std::move(app), true); return; } @@ -1623,7 +1633,7 @@ void bulk_load_service::check_app_bulk_load_consistency(std::shared_ptrapp_name, app_path, is_app_bulk_loading); - update_app_is_bulk_loading(std::move(app), false); + update_app_not_bulk_loading_on_remote_storage(std::move(app)); return; } if (err == ERR_OK && !is_app_bulk_loading) { @@ -1632,7 +1642,7 @@ void bulk_load_service::check_app_bulk_load_consistency(std::shared_ptrapp_name, app_path, is_app_bulk_loading); - remove_bulk_load_dir(std::move(app), false); + remove_bulk_load_dir_on_remote_storage(std::move(app), false); return; } } diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index fb54ce147b..f8258ea4ce 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -43,16 +43,6 @@ struct bulk_load_info DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count) }; -template -inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) -{ - for (auto iter = mymap.begin(); iter != mymap.end();) { - if (iter->first.get_app_id() == app_id) { - mymap.erase(iter++); - } - } -} - class bulk_load_service { public: @@ -215,14 +205,18 @@ class bulk_load_service bulk_load_status::type new_status, bool should_send_request); - // `need_set_app_flag` = true: update app's is_bulk_loading to false on remote_storage - void remove_bulk_load_dir(std::shared_ptr app, bool need_set_app_flag); - - void remove_bulk_load_dir(int32_t app_id, const std::string &app_name); + // called when app is not available or dropped during bulk load, remove bulk load directory on + // remote storage + void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name); - // update app's is_bulk_loading to on remote_storage - void update_app_is_bulk_loading(std::shared_ptr app, bool is_bulk_loading); + // called when app is available, remove bulk load directory on remote storage + // if `set_app_not_bulk_loading` = true: call function + // `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing + void remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading); + // update app's is_bulk_loading to false on remote_storage + void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr app); /// /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 07b6601054..eb97788e03 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -496,17 +496,18 @@ class bulk_load_process_test : public bulk_load_service_test _resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED); } - void mock_response_cleanup_flag(bool finish_cleanup, bulk_load_status::type status) + void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status) { create_basic_response(ERR_OK, status); - partition_bulk_load_state state; + partition_bulk_load_state state, state2; state.__set_is_cleaned_up(true); _resp.group_bulk_load_state[PRIMARY] = state; _resp.group_bulk_load_state[SECONDARY1] = state; - state.__set_is_cleaned_up(finish_cleanup); - _resp.group_bulk_load_state[SECONDARY2] = state; - _resp.__set_is_group_bulk_load_context_cleaned_up(finish_cleanup); + + state2.__set_is_cleaned_up(all_cleaned_up); + _resp.group_bulk_load_state[SECONDARY2] = state2; + _resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up); } void mock_response_paused(bool is_group_paused) @@ -629,21 +630,27 @@ TEST_F(bulk_load_process_test, normal_succeed) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); } -TEST_F(bulk_load_process_test, half_cleanup) +TEST_F(bulk_load_process_test, succeed_not_all_finished) { - mock_response_cleanup_flag(false, bulk_load_status::BLS_FAILED); - test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED); - ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); } -TEST_F(bulk_load_process_test, cleanup_succeed) +TEST_F(bulk_load_process_test, succeed_all_finished) { - mock_response_cleanup_flag(true, bulk_load_status::BLS_SUCCEED); + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED); test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED); - ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); } +TEST_F(bulk_load_process_test, half_cleanup) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); +} + TEST_F(bulk_load_process_test, pausing) { mock_response_paused(false);