diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 3498693071..a2d829f23e 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -572,8 +572,81 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response, const rpc_address &primary_addr) { - // TODO(heyuchen): TBD - // called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled + const std::string &app_name = response.app_name; + const gpid &pid = response.pid; + + dassert_f( + response.__isset.is_group_bulk_load_context_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " + "but is_group_bulk_load_context_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); + + for (const auto &kv : response.group_bulk_load_state) { + dassert_f(kv.second.__isset.is_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); + } + + { + zauto_read_lock l(_lock); + if (_partitions_cleaned_up[pid]) { + dwarn_f( + "receive bulk load response from node({}) app({}) partition({}), current partition " + "has already been cleaned up", + primary_addr.to_string(), + app_name, + pid); + return; + } + } + + // The replicas have cleaned up their bulk load states and removed temporary sst files + bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up; + ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " + "is_group_bulk_load_context_cleaned_up = {}", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + group_cleaned_up); + { + zauto_write_lock l(_lock); + _partitions_cleaned_up[pid] = group_cleaned_up; + _partitions_bulk_load_state[pid] = response.group_bulk_load_state; + } + + if (group_cleaned_up) { + int32_t count = 0; + { + zauto_write_lock l(_lock); + count = --_apps_in_progress_count[pid.get_app_id()]; + } + if (count == 0) { + std::shared_ptr app; + { + zauto_read_lock l(app_lock()); + app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote " + "storage", + app_name, + pid.get_app_id()); + remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name); + return; + } + } + ddebug_f("app({}) all partitions cleanup bulk load context", app_name); + remove_bulk_load_dir_on_remote_storage(std::move(app), true); + } + } } // ThreadPool: THREAD_POOL_META_STATE @@ -898,6 +971,77 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err, } // ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id, + const std::string &app_name) +{ + std::string bulk_load_path = get_app_bulk_load_path(app_id); + _meta_svc->get_meta_storage()->delete_node_recursively( + std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() { + ddebug_f("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path); + reset_local_bulk_load_states(app_id, app_name); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading) +{ + std::string bulk_load_path = get_app_bulk_load_path(app->app_id); + _meta_svc->get_meta_storage()->delete_node_recursively( + std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() { + ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path); + reset_local_bulk_load_states(app->app_id, app->app_name); + if (set_app_not_bulk_loading) { + update_app_not_bulk_loading_on_remote_storage(std::move(app)); + } + }); +} + +template +inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) +{ + for (auto iter = mymap.begin(); iter != mymap.end();) { + if (iter->first.get_app_id() == app_id) { + mymap.erase(iter++); + } else { + iter++; + } + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name) +{ + zauto_write_lock l(_lock); + _app_bulk_load_info.erase(app_id); + _apps_in_progress_count.erase(app_id); + _apps_pending_sync_flag.erase(app_id); + erase_map_elem_by_id(app_id, _partitions_pending_sync_flag); + erase_map_elem_by_id(app_id, _partitions_bulk_load_state); + erase_map_elem_by_id(app_id, _partition_bulk_load_info); + erase_map_elem_by_id(app_id, _partitions_total_download_progress); + erase_map_elem_by_id(app_id, _partitions_cleaned_up); + // TODO(heyuchen): add other varieties + _bulk_load_app_id.erase(app_id); + ddebug_f("reset local app({}) bulk load context", app_name); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_app_not_bulk_loading_on_remote_storage( + std::shared_ptr app) +{ + app_info info = *app; + info.__set_is_bulk_loading(false); + + blob value = dsn::json::json_forwarder::encode(info); + _meta_svc->get_meta_storage()->set_data( + _state->get_app_path(*app), std::move(value), [app, this]() { + zauto_write_lock l(app_lock()); + app->is_bulk_loading = false; + ddebug_f("app({}) update app is_bulk_loading to false", app->app_name); + }); +} + void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker) { blob value = blob(); diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 02faae1220..060e3d01b9 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -144,6 +144,8 @@ class bulk_load_service const std::string &app_name, const gpid &pid); + void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name); + /// /// update bulk load states to remote storage functions /// @@ -188,6 +190,19 @@ class bulk_load_service bulk_load_status::type new_status, bool should_send_request); + // called when app is not available or dropped during bulk load, remove bulk load directory on + // remote storage + void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name); + + // called when app is available, remove bulk load directory on remote storage + // if `set_app_not_bulk_loading` = true: call function + // `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing + void remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading); + + // update app's is_bulk_loading to false on remote_storage + void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr app); + /// /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch @@ -315,6 +330,8 @@ class bulk_load_service // partition_index -> group bulk load states(node address -> state) std::unordered_map> _partitions_bulk_load_state; + + std::unordered_map _partitions_cleaned_up; }; } // namespace replication diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 5c31a13703..b2defa1b07 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -235,6 +235,20 @@ class bulk_load_process_test : public bulk_load_service_test _resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED); } + void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status) + { + create_basic_response(ERR_OK, status); + + partition_bulk_load_state state, state2; + state.__set_is_cleaned_up(true); + _resp.group_bulk_load_state[PRIMARY] = state; + _resp.group_bulk_load_state[SECONDARY1] = state; + + state2.__set_is_cleaned_up(all_cleaned_up); + _resp.group_bulk_load_state[SECONDARY2] = state2; + _resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up); + } + void test_on_partition_bulk_load_reply(int32_t in_progress_count, bulk_load_status::type status, error_code resp_err = ERR_OK) @@ -327,6 +341,22 @@ TEST_F(bulk_load_process_test, normal_succeed) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); } +TEST_F(bulk_load_process_test, succeed_not_all_finished) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); +} + +TEST_F(bulk_load_process_test, succeed_all_finished) +{ + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED); + ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); +} + +// TODO(heyuchen): add half cleanup test while failed + // TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply` /// on_partition_ingestion_reply unit tests