diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 2aabb27d19..9fe8077a72 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -288,9 +288,9 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g req->primary_addr = primary_addr; req->remote_provider_name = ainfo.file_provider_type; req->cluster_name = ainfo.cluster_name; - req->meta_bulk_load_status = get_partition_bulk_load_status_unlock(pid); + req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); req->ballot = b; - req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlock(pid); + req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid); ddebug_f("send bulk load request to node({}), app({}), partition({}), partition " "status = {}, remote provider = {}, cluster_name = {}", @@ -428,7 +428,7 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name { FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {}); zauto_read_lock l(_lock); - if (is_app_bulk_loading_unlock(pid.get_app_id())) { + if (is_app_bulk_loading_unlocked(pid.get_app_id())) { tasking::enqueue(LPC_META_STATE_NORMAL, _meta_svc->tracker(), std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), @@ -441,8 +441,72 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name void bulk_load_service::handle_app_downloading(const bulk_load_response &response, const rpc_address &primary_addr) { - // TODO(heyuchen): TBD - // called by `on_partition_bulk_load_reply` when app status is downloading + const std::string &app_name = response.app_name; + const gpid &pid = response.pid; + + if (!response.__isset.total_download_progress) { + dwarn_f( + "recevie bulk load response from node({}) app({}), partition({}), primary_status({}), " + "but total_download_progress is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status)); + return; + } + + for (const auto &kv : response.group_bulk_load_state) { + const auto &bulk_load_states = kv.second; + if (!bulk_load_states.__isset.download_progress || + !bulk_load_states.__isset.download_status) { + dwarn_f("recevie bulk load response from node({}) app({}), partition({}), " + "primary_status({}), but node({}) progress or status is not set", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + kv.first.to_string()); + return; + } + // check partition download status + if (bulk_load_states.download_status != ERR_OK) { + derror_f("app({}) partition({}) on node({}) meet unrecoverable error during " + "downloading files, error = {}", + app_name, + pid, + kv.first.to_string(), + bulk_load_states.download_status); + handle_bulk_load_failed(pid.get_app_id()); + return; + } + } + + // if replica report metadata, update metadata on remote storage + if (response.__isset.metadata && is_partition_metadata_not_updated(pid)) { + update_partition_metadata_on_remote_stroage(app_name, pid, response.metadata); + } + + // update download progress + int32_t total_progress = response.total_download_progress; + ddebug_f("recevie bulk load response from node({}) app({}) partition({}), primary_status({}), " + "total_download_progress = {}", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(response.primary_bulk_load_status), + total_progress); + { + zauto_write_lock l(_lock); + _partitions_total_download_progress[pid] = total_progress; + _partitions_bulk_load_state[pid] = response.group_bulk_load_state; + } + + // update partition status to `downloaded` if all replica downloaded + if (total_progress >= bulk_load_constant::PROGRESS_FINISHED) { + ddebug_f( + "app({}) partirion({}) download all files from remote provider succeed", app_name, pid); + update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED); + } } // ThreadPool: THREAD_POOL_META_STATE @@ -497,6 +561,166 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string // TODO(heyuchen): TBD } +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_partition_metadata_on_remote_stroage( + const std::string &app_name, const gpid &pid, const bulk_load_metadata &metadata) +{ + zauto_read_lock l(_lock); + partition_bulk_load_info pinfo = _partition_bulk_load_info[pid]; + pinfo.metadata = metadata; + blob value = json::json_forwarder::encode(pinfo); + + _meta_svc->get_meta_storage()->set_data( + get_partition_bulk_load_path(pid), std::move(value), [this, app_name, pid, pinfo]() { + zauto_write_lock l(_lock); + _partition_bulk_load_info[pid] = pinfo; + ddebug_f( + "app({}) update partition({}) bulk load metadata, file count = {}, file size = {}", + app_name, + pid, + pinfo.metadata.files.size(), + pinfo.metadata.file_total_size); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_partition_status_on_remote_storage(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request) +{ + zauto_read_lock l(_lock); + partition_bulk_load_info pinfo = _partition_bulk_load_info[pid]; + + if (pinfo.status == new_status) { + return; + } + + if (_partitions_pending_sync_flag[pid]) { + ddebug_f("app({}) partition({}) has already sync bulk load status, wait for next round", + app_name, + pid); + return; + } + + _partitions_pending_sync_flag[pid] = true; + pinfo.status = new_status; + blob value = json::json_forwarder::encode(pinfo); + + _meta_svc->get_meta_storage()->set_data( + get_partition_bulk_load_path(pid), + std::move(value), + std::bind(&bulk_load_service::update_partition_status_on_remote_storage_reply, + this, + app_name, + pid, + new_status, + should_send_request)); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_partition_status_on_remote_storage_reply( + const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request) +{ + { + zauto_write_lock l(_lock); + auto old_status = _partition_bulk_load_info[pid].status; + _partition_bulk_load_info[pid].status = new_status; + _partitions_pending_sync_flag[pid] = false; + + ddebug_f("app({}) update partition({}) status from {} to {}", + app_name, + pid, + dsn::enum_to_string(old_status), + dsn::enum_to_string(new_status)); + + if (new_status == bulk_load_status::BLS_DOWNLOADED && old_status != new_status) { + if (--_apps_in_progress_count[pid.get_app_id()] == 0) { + update_app_status_on_remote_storage_unlocked(pid.get_app_id(), new_status); + } + } + // TODO(heyuchen): add other status + } + if (should_send_request) { + partition_bulk_load(app_name, pid); + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_app_status_on_remote_storage_unlocked( + int32_t app_id, bulk_load_status::type new_status, bool should_send_request) +{ + FAIL_POINT_INJECT_F("meta_update_app_status_on_remote_storage_unlocked", + [](dsn::string_view) {}); + + app_bulk_load_info ainfo = _app_bulk_load_info[app_id]; + auto old_status = ainfo.status; + + if (old_status == new_status && new_status != bulk_load_status::BLS_DOWNLOADING) { + dwarn_f("app({}) old status:{} VS new status:{}, ignore it", + ainfo.app_name, + dsn::enum_to_string(old_status), + dsn::enum_to_string(new_status)); + return; + } + + if (_apps_pending_sync_flag[app_id]) { + ddebug_f("app({}) has already sync bulk load status, wait and retry, current status = {}, " + "new status = {}", + ainfo.app_name, + dsn::enum_to_string(old_status), + dsn::enum_to_string(new_status)); + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::update_app_status_on_remote_storage_unlocked, + this, + app_id, + new_status, + should_send_request), + 0, + std::chrono::seconds(1)); + return; + } + + _apps_pending_sync_flag[app_id] = true; + ainfo.status = new_status; + blob value = dsn::json::json_forwarder::encode(ainfo); + + _meta_svc->get_meta_storage()->set_data( + get_app_bulk_load_path(app_id), + std::move(value), + std::bind(&bulk_load_service::update_app_status_on_remote_storage_reply, + this, + ainfo, + old_status, + new_status, + should_send_request)); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk_load_info &ainfo, + bulk_load_status::type old_status, + bulk_load_status::type new_status, + bool should_send_request) +{ + int32_t app_id = ainfo.app_id; + { + zauto_write_lock l(_lock); + _app_bulk_load_info[app_id] = ainfo; + _apps_pending_sync_flag[app_id] = false; + _apps_in_progress_count[app_id] = ainfo.partition_count; + ddebug_f("update app({}) status from {} to {}", + ainfo.app_name, + dsn::enum_to_string(old_status), + dsn::enum_to_string(new_status)); + } + + // TODO(heyuchen): add other status +} + // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 1090a126a2..b9259cce3c 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -151,6 +151,36 @@ class bulk_load_service int32_t partition_count, start_bulk_load_rpc rpc); + // Called by `handle_app_downloading` + // update partition bulk load metadata reported by replica server on remote storage + void update_partition_metadata_on_remote_stroage(const std::string &app_name, + const gpid &pid, + const bulk_load_metadata &metadata); + + // update partition bulk load status on remote storage + // if should_send_request = true, will send bulk load request after update local partition + // status, this parameter will be true when restarting bulk load, status will turn from paused + // to downloading + void update_partition_status_on_remote_storage(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request = false); + + void update_partition_status_on_remote_storage_reply(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request); + + // update app bulk load status on remote storage + void update_app_status_on_remote_storage_unlocked(int32_t app_id, + bulk_load_status::type new_status, + bool should_send_request = false); + + void update_app_status_on_remote_storage_reply(const app_bulk_load_info &ainfo, + bulk_load_status::type old_status, + bulk_load_status::type new_status, + bool should_send_request); + /// /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch @@ -205,7 +235,13 @@ class bulk_load_service return oss.str(); } - inline bool is_partition_metadata_not_updated_unlock(gpid pid) const + inline bool is_partition_metadata_not_updated(gpid pid) + { + zauto_read_lock l(_lock); + return is_partition_metadata_not_updated_unlocked(pid); + } + + inline bool is_partition_metadata_not_updated_unlocked(gpid pid) const { const auto &iter = _partition_bulk_load_info.find(pid); if (iter == _partition_bulk_load_info.end()) { @@ -215,7 +251,7 @@ class bulk_load_service return (metadata.files.size() == 0 && metadata.file_total_size == 0); } - inline bulk_load_status::type get_partition_bulk_load_status_unlock(gpid pid) const + inline bulk_load_status::type get_partition_bulk_load_status_unlocked(gpid pid) const { const auto &iter = _partition_bulk_load_info.find(pid); if (iter != _partition_bulk_load_info.end()) { @@ -228,10 +264,10 @@ class bulk_load_service inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id) { zauto_read_lock l(_lock); - return get_app_bulk_load_status_unlock(app_id); + return get_app_bulk_load_status_unlocked(app_id); } - inline bulk_load_status::type get_app_bulk_load_status_unlock(int32_t app_id) const + inline bulk_load_status::type get_app_bulk_load_status_unlocked(int32_t app_id) const { const auto &iter = _app_bulk_load_info.find(app_id); if (iter != _app_bulk_load_info.end()) { @@ -241,7 +277,7 @@ class bulk_load_service } } - inline bool is_app_bulk_loading_unlock(int32_t app_id) const + inline bool is_app_bulk_loading_unlocked(int32_t app_id) const { return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end()); } @@ -266,6 +302,12 @@ class bulk_load_service std::unordered_map _partition_bulk_load_info; std::unordered_map _partitions_pending_sync_flag; + + // partition_index -> group total download progress + std::unordered_map _partitions_total_download_progress; + // partition_index -> group bulk load states(node address -> state) + std::unordered_map> + _partitions_bulk_load_state; }; } // namespace replication diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index ed31ad0a53..c72283c48c 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -61,11 +61,41 @@ class bulk_load_service_test : public meta_test_base APP_NAME, CLUSTER, provider, app_id, partition_count, hint_msg); } + void mock_meta_bulk_load_context(int32_t app_id, + int32_t in_progress_partition_count, + bulk_load_status::type status) + { + bulk_svc()._bulk_load_app_id.insert(app_id); + bulk_svc()._apps_in_progress_count[app_id] = in_progress_partition_count; + bulk_svc()._app_bulk_load_info[app_id].status = status; + for (int i = 0; i < in_progress_partition_count; ++i) { + gpid pid = gpid(app_id, i); + bulk_svc()._partition_bulk_load_info[pid].status = status; + } + } + + void on_partition_bulk_load_reply(error_code err, + const bulk_load_request &request, + const bulk_load_response &response) + { + bulk_svc().on_partition_bulk_load_reply(err, request, response); + } + bool app_is_bulk_loading(const std::string &app_name) { return find_app(app_name)->is_bulk_loading; } + bool need_update_metadata(gpid pid) + { + return bulk_svc().is_partition_metadata_not_updated(pid); + } + + bulk_load_status::type get_app_bulk_load_status(int32_t app_id) + { + return bulk_svc().get_app_bulk_load_status_unlocked(app_id); + } + public: int32_t APP_ID = 1; std::string APP_NAME = "bulk_load_test"; @@ -103,7 +133,147 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed) fail::teardown(); } -// TODO(heyuchen): add unit tests for on_partition_bulk_load_reply +/// bulk load process unit tests +class bulk_load_process_test : public bulk_load_service_test +{ +public: + void SetUp() + { + bulk_load_service_test::SetUp(); + create_app(APP_NAME); + + fail::setup(); + fail::cfg("meta_check_bulk_load_request_params", "return()"); + fail::cfg("meta_bulk_load_partition_bulk_load", "return()"); + fail::cfg("meta_bulk_load_resend_request", "return()"); + + auto resp = start_bulk_load(APP_NAME); + ASSERT_EQ(resp.err, ERR_OK); + std::shared_ptr app = find_app(APP_NAME); + _app_id = app->app_id; + _partition_count = app->partition_count; + ASSERT_EQ(app->is_bulk_loading, true); + } + + void TearDown() + { + fail::teardown(); + bulk_load_service_test::TearDown(); + } + + void create_request(bulk_load_status::type status) + { + _req.app_name = APP_NAME; + _req.ballot = BALLOT; + _req.cluster_name = CLUSTER; + _req.pid = gpid(_app_id, _pidx); + _req.primary_addr = PRIMARY; + _req.meta_bulk_load_status = status; + } + + void create_basic_response(error_code err, bulk_load_status::type status) + { + _resp.app_name = APP_NAME; + _resp.pid = gpid(_app_id, _pidx); + _resp.err = err; + _resp.primary_bulk_load_status = status; + } + + void mock_response_progress(error_code progress_err, bool finish_download) + { + create_basic_response(ERR_OK, bulk_load_status::BLS_DOWNLOADING); + + partition_bulk_load_state state, state2; + int32_t secondary2_progress = finish_download ? 100 : 0; + int32_t total_progress = finish_download ? 100 : 66; + state.__set_download_status(ERR_OK); + state.__set_download_progress(100); + state2.__set_download_status(progress_err); + state2.__set_download_progress(secondary2_progress); + + _resp.group_bulk_load_state[PRIMARY] = state; + _resp.group_bulk_load_state[SECONDARY1] = state; + _resp.group_bulk_load_state[SECONDARY2] = state2; + _resp.__set_total_download_progress(total_progress); + } + + void mock_response_bulk_load_metadata() + { + mock_response_progress(ERR_OK, false); + + file_meta f_meta; + f_meta.name = "mock_remote_file"; + f_meta.size = 100; + f_meta.md5 = "mock_md5"; + + bulk_load_metadata metadata; + metadata.files.emplace_back(f_meta); + metadata.file_total_size = 100; + + _resp.__set_metadata(metadata); + } + + void test_on_partition_bulk_load_reply(int32_t in_progress_count, + bulk_load_status::type status, + error_code resp_err = ERR_OK) + { + mock_meta_bulk_load_context(_app_id, in_progress_count, status); + create_request(status); + auto response = _resp; + response.err = resp_err; + on_partition_bulk_load_reply(ERR_OK, _req, response); + wait_all(); + } + +public: + const int32_t _pidx = 0; + const rpc_address PRIMARY = rpc_address("127.0.0.1", 10086); + const rpc_address SECONDARY1 = rpc_address("127.0.0.1", 10085); + const rpc_address SECONDARY2 = rpc_address("127.0.0.1", 10087); + + int32_t _app_id; + int32_t _partition_count; + bulk_load_request _req; + bulk_load_response _resp; +}; + +/// on_partition_bulk_load_reply unit tests + +// TODO(heyuchen): +// add `downloading_fs_error` unit tests after implement function `handle_bulk_load_failed` +// add `downloading_corrupt` unit tests after implement function `handle_bulk_load_failed` + +TEST_F(bulk_load_process_test, downloading_busy) +{ + test_on_partition_bulk_load_reply( + _partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_BUSY); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING); +} + +TEST_F(bulk_load_process_test, downloading_report_metadata) +{ + mock_response_bulk_load_metadata(); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING); + + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING); + ASSERT_FALSE(need_update_metadata(gpid(_app_id, _pidx))); +} + +TEST_F(bulk_load_process_test, normal_downloading) +{ + mock_response_progress(ERR_OK, false); + test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING); +} + +TEST_F(bulk_load_process_test, downloaded_succeed) +{ + mock_response_progress(ERR_OK, true); + test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADED); +} + +// TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply` } // namespace replication } // namespace dsn