From 4de3b62f49406f7282cbf647a66efd6b9b94efda Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 18 Jun 2020 13:29:06 +0800 Subject: [PATCH 1/7] meta handle bulk load succeed --- .../meta_server/meta_bulk_load_service.cpp | 139 +++++++++++++++++- .../meta_server/meta_bulk_load_service.h | 27 ++++ .../unit_test/meta_bulk_load_service_test.cpp | 30 ++++ 3 files changed, 194 insertions(+), 2 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 3498693071..eaa0218be7 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -572,8 +572,84 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response, const rpc_address &primary_addr) { - // TODO(heyuchen): TBD - // called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled + const std::string &app_name = response.app_name; + const gpid &pid = response.pid; + + if (!response.__isset.is_group_bulk_load_context_cleaned_up) { + dwarn_f( + "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " + "but is_group_bulk_load_context_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); + return; + } + + for (const auto &kv : response.group_bulk_load_state) { + if (!kv.second.__isset.is_cleaned_up) { + dwarn_f("receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); + return; + } + } + + { + zauto_read_lock l(_lock); + if (_partitions_cleaned_up[pid]) { + dwarn_f( + "receive bulk load response from node({}) app({}) partition({}), current partition " + "has already be cleaned up", + primary_addr.to_string(), + app_name, + pid); + return; + } + } + + bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up; + ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " + "is_group_bulk_load_context_cleaned_up = {}", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + group_cleaned_up); + { + zauto_write_lock l(_lock); + _partitions_cleaned_up[pid] = group_cleaned_up; + _partitions_bulk_load_state[pid] = response.group_bulk_load_state; + } + + if (group_cleaned_up) { + int32_t count; + { + zauto_write_lock l(_lock); + count = --_apps_in_progress_count[pid.get_app_id()]; + } + if (count == 0) { + std::shared_ptr app; + { + zauto_read_lock l(app_lock()); + app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote " + "storage", + app_name, + pid.get_app_id()); + remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name); + return; + } + } + ddebug_f("app({}) all partitions cleanup bulk load context", app_name); + remove_bulk_load_dir_on_remote_storage(std::move(app), true); + } + } } // ThreadPool: THREAD_POOL_META_STATE @@ -898,6 +974,65 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err, } // ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id, + const std::string &app_name) +{ + std::string bulk_load_path = get_app_bulk_load_path(app_id); + _meta_svc->get_meta_storage()->delete_node_recursively( + std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() { + ddebug_f("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path); + reset_local_bulk_load_states(app_id, app_name); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading) +{ + std::string bulk_load_path = get_app_bulk_load_path(app->app_id); + _meta_svc->get_meta_storage()->delete_node_recursively( + std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() { + ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path); + reset_local_bulk_load_states(app->app_id, app->app_name); + if (set_app_not_bulk_loading) { + update_app_not_bulk_loading_on_remote_storage(std::move(app)); + } + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name) +{ + zauto_write_lock l(_lock); + _app_bulk_load_info.erase(app_id); + _apps_in_progress_count.erase(app_id); + _apps_pending_sync_flag.erase(app_id); + erase_map_elem_by_id(app_id, _partitions_pending_sync_flag); + erase_map_elem_by_id(app_id, _partitions_bulk_load_state); + erase_map_elem_by_id(app_id, _partition_bulk_load_info); + erase_map_elem_by_id(app_id, _partitions_total_download_progress); + erase_map_elem_by_id(app_id, _partitions_cleaned_up); + // TODO(heyuchen): add other varieties + _bulk_load_app_id.erase(app_id); + ddebug_f("reset local app({}) bulk load context", app_name); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_app_not_bulk_loading_on_remote_storage( + std::shared_ptr app) +{ + app_info info = *app; + info.__set_is_bulk_loading(false); + + blob value = dsn::json::json_forwarder::encode(info); + _meta_svc->get_meta_storage()->set_data( + _state->get_app_path(*app), std::move(value), [app, this]() { + zauto_write_lock l(app_lock()); + app->is_bulk_loading = false; + ddebug_f("app({}) update app is_bulk_loading to false", app->app_name); + }); +} + void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker) { blob value = blob(); diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 02faae1220..dcc35111f7 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -43,6 +43,16 @@ struct bulk_load_info DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count) }; +template +inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) +{ + for (auto iter = mymap.begin(); iter != mymap.end();) { + if (iter->first.get_app_id() == app_id) { + mymap.erase(iter++); + } + } +} + /// /// Bulk load process: /// when client sent `start_bulk_load_rpc` to meta server to start bulk load, @@ -144,6 +154,8 @@ class bulk_load_service const std::string &app_name, const gpid &pid); + void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name); + /// /// update bulk load states to remote storage functions /// @@ -188,6 +200,19 @@ class bulk_load_service bulk_load_status::type new_status, bool should_send_request); + // callled when app is not available or dropped during bulk load, remove bulk load directory on + // remote storage + void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name); + + // called when app is available, remove bulk load directory on remote storage + // if `set_app_not_bulk_loading` = true: call function + // `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing + void remove_bulk_load_dir_on_remote_storage(std::shared_ptr app, + bool set_app_not_bulk_loading); + + // update app's is_bulk_loading to false on remote_storage + void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr app); + /// /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch @@ -315,6 +340,8 @@ class bulk_load_service // partition_index -> group bulk load states(node address -> state) std::unordered_map> _partitions_bulk_load_state; + + std::unordered_map _partitions_cleaned_up; }; } // namespace replication diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index 5c31a13703..b2defa1b07 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -235,6 +235,20 @@ class bulk_load_process_test : public bulk_load_service_test _resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED); } + void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status) + { + create_basic_response(ERR_OK, status); + + partition_bulk_load_state state, state2; + state.__set_is_cleaned_up(true); + _resp.group_bulk_load_state[PRIMARY] = state; + _resp.group_bulk_load_state[SECONDARY1] = state; + + state2.__set_is_cleaned_up(all_cleaned_up); + _resp.group_bulk_load_state[SECONDARY2] = state2; + _resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up); + } + void test_on_partition_bulk_load_reply(int32_t in_progress_count, bulk_load_status::type status, error_code resp_err = ERR_OK) @@ -327,6 +341,22 @@ TEST_F(bulk_load_process_test, normal_succeed) ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); } +TEST_F(bulk_load_process_test, succeed_not_all_finished) +{ + mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); +} + +TEST_F(bulk_load_process_test, succeed_all_finished) +{ + mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED); + ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); +} + +// TODO(heyuchen): add half cleanup test while failed + // TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply` /// on_partition_ingestion_reply unit tests From e14261e3e3673c82a6b3da580c2534771426298b Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 18 Jun 2020 15:17:22 +0800 Subject: [PATCH 2/7] small fix --- src/dist/replication/meta_server/meta_bulk_load_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index eaa0218be7..e13f1c72b0 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -627,7 +627,7 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon } if (group_cleaned_up) { - int32_t count; + int32_t count = 0; { zauto_write_lock l(_lock); count = --_apps_in_progress_count[pid.get_app_id()]; From 5962f622b62de176b85c2d60ada456fc21207dcf Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 22 Jun 2020 13:44:10 +0800 Subject: [PATCH 3/7] small fix --- src/dist/replication/meta_server/meta_bulk_load_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index e13f1c72b0..4e3c17515e 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -604,7 +604,7 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon if (_partitions_cleaned_up[pid]) { dwarn_f( "receive bulk load response from node({}) app({}) partition({}), current partition " - "has already be cleaned up", + "has already been cleaned up", primary_addr.to_string(), app_name, pid); From a2a1d9394f33f73519c90243a285ca9ad16d65f9 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 22 Jun 2020 17:51:46 +0800 Subject: [PATCH 4/7] fix by review --- .../meta_server/meta_bulk_load_service.cpp | 11 +++++++++++ .../replication/meta_server/meta_bulk_load_service.h | 12 +----------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 4e3c17515e..03c7a5ab07 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -612,6 +612,7 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon } } + // The replicas have cleaned up their bulk load states and removed temporary sst files bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up; ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " "is_group_bulk_load_context_cleaned_up = {}", @@ -1000,6 +1001,16 @@ void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr +inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) +{ + for (auto iter = mymap.begin(); iter != mymap.end();) { + if (iter->first.get_app_id() == app_id) { + mymap.erase(iter++); + } + } +} + // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index dcc35111f7..060e3d01b9 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -43,16 +43,6 @@ struct bulk_load_info DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count) }; -template -inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &mymap) -{ - for (auto iter = mymap.begin(); iter != mymap.end();) { - if (iter->first.get_app_id() == app_id) { - mymap.erase(iter++); - } - } -} - /// /// Bulk load process: /// when client sent `start_bulk_load_rpc` to meta server to start bulk load, @@ -200,7 +190,7 @@ class bulk_load_service bulk_load_status::type new_status, bool should_send_request); - // callled when app is not available or dropped during bulk load, remove bulk load directory on + // called when app is not available or dropped during bulk load, remove bulk load directory on // remote storage void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name); From 48de65c93be92186d1f01314ea0900a268029308 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 23 Jun 2020 09:15:58 +0800 Subject: [PATCH 5/7] fix by review --- .../meta_server/meta_bulk_load_service.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 03c7a5ab07..299f25b1f0 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -576,7 +576,8 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon const gpid &pid = response.pid; if (!response.__isset.is_group_bulk_load_context_cleaned_up) { - dwarn_f( + dassert_f( + false, "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " "but is_group_bulk_load_context_cleaned_up is not set", primary_addr.to_string(), @@ -588,13 +589,14 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon for (const auto &kv : response.group_bulk_load_state) { if (!kv.second.__isset.is_cleaned_up) { - dwarn_f("receive bulk load response from node({}) app({}), partition({}), " - "primary_status({}), but node({}) is_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status), - kv.first.to_string()); + dassert_f(false, + "receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); return; } } From 36cf83f22c92d5613375529895ae2e9afabf3884 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 23 Jun 2020 16:46:25 +0800 Subject: [PATCH 6/7] small fix --- .../meta_server/meta_bulk_load_service.cpp | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 299f25b1f0..9e1abbff23 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -575,30 +575,24 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon const std::string &app_name = response.app_name; const gpid &pid = response.pid; - if (!response.__isset.is_group_bulk_load_context_cleaned_up) { - dassert_f( - false, - "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " - "but is_group_bulk_load_context_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status)); - return; - } + dassert_f( + response.__isset.is_group_bulk_load_context_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " + "but is_group_bulk_load_context_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); for (const auto &kv : response.group_bulk_load_state) { - if (!kv.second.__isset.is_cleaned_up) { - dassert_f(false, - "receive bulk load response from node({}) app({}), partition({}), " - "primary_status({}), but node({}) is_cleaned_up is not set", - primary_addr.to_string(), - app_name, - pid, - dsn::enum_to_string(response.primary_bulk_load_status), - kv.first.to_string()); - return; - } + dassert_f(kv.second.__isset.is_cleaned_up, + "receive bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) is_cleaned_up is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); } { From 37ba11839a761f202836fbef06a646aa22b4f175 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 24 Jun 2020 10:31:26 +0800 Subject: [PATCH 7/7] bug fix in erase_map_elem_by_id --- src/dist/replication/meta_server/meta_bulk_load_service.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 9e1abbff23..a2d829f23e 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -1003,6 +1003,8 @@ inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &my for (auto iter = mymap.begin(); iter != mymap.end();) { if (iter->first.get_app_id() == app_id) { mymap.erase(iter++); + } else { + iter++; } } }