diff --git a/include/dsn/cpp/json_helper.h b/include/dsn/cpp/json_helper.h index 4ba6434638..e9e2deb195 100644 --- a/include/dsn/cpp/json_helper.h +++ b/include/dsn/cpp/json_helper.h @@ -249,6 +249,18 @@ inline bool json_decode(const JsonObject &in, std::string &str) return true; } +inline void json_encode(JsonWriter &out, const error_code &err) +{ + const char *str = err.to_string(); + out.String(str, strlen(str), true); +} +inline bool json_decode(const JsonObject &in, error_code &err) +{ + dverify(in.IsString()); + err = error_code(in.GetString()); + return true; +} + // json serialization for bool types. // for compatibility, we treat bool as integers, which is not this case in json standard inline void json_encode(JsonWriter &out, bool t) { out.Int(t ? 1 : 0); } diff --git a/include/dsn/utility/error_code.h b/include/dsn/utility/error_code.h index f4ed63c6e7..a2ad2a928a 100644 --- a/include/dsn/utility/error_code.h +++ b/include/dsn/utility/error_code.h @@ -130,4 +130,5 @@ DEFINE_ERR_CODE(ERR_SPLITTING) DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED) DEFINE_ERR_CODE(ERR_CHILD_NOT_READY) DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT) +DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED) } // namespace dsn diff --git a/src/common/bulk_load.thrift b/src/common/bulk_load.thrift index d03f1cd8fa..afe0c91df7 100644 --- a/src/common/bulk_load.thrift +++ b/src/common/bulk_load.thrift @@ -195,9 +195,14 @@ struct query_bulk_load_request struct query_bulk_load_response { // Possible error: + // - ERR_OK // - ERR_APP_NOT_EXIST: app not exist // - ERR_APP_DROPPED: app has been dropped - // - ERR_INVALID_STATE: app is not executing bulk load + // - ERR_FILE_OPERATION_FAILED: local file system error + // - ERR_FS_INTERNAL: remote file system error + // - ERR_CORRUPTION: file not exist or damaged + // - ERR_INGESTION_FAILED: ingest failed + // - ERR_RETRY_EXHAUSTED: retry too many times 1:dsn.error_code err; 2:string app_name; 3:bulk_load_status app_status; @@ -206,4 +211,5 @@ struct query_bulk_load_response // detailed bulk load state for each replica 6:list> bulk_load_states; 7:optional string hint_msg; + 8:bool is_bulk_loading; } diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index bc5c6c6016..fdbbf387a2 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -66,18 +66,27 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) auto &response = rpc.response(); response.err = ERR_OK; + if (!_meta_svc->try_lock_meta_op_status(meta_op_status::BULKLOAD)) { + response.hint_msg = "meta server is busy now, please wait"; + derror_f("{}", response.hint_msg); + response.err = ERR_BUSY; + return; + } + std::shared_ptr app = get_app(request.app_name); if (app == nullptr || app->status != app_status::AS_AVAILABLE) { response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; response.hint_msg = fmt::format( "app {} is ", response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available"); derror_f("{}", response.hint_msg); + _meta_svc->unlock_meta_op_status(); return; } if (app->is_bulk_loading) { response.err = ERR_BUSY; response.hint_msg = fmt::format("app({}) is already executing bulk load", app->app_name); derror_f("{}", response.hint_msg); + _meta_svc->unlock_meta_op_status(); return; } @@ -92,6 +101,7 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) if (e != ERR_OK) { response.err = e; response.hint_msg = hint_msg; + _meta_svc->unlock_meta_op_status(); return; } @@ -101,6 +111,8 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) request.file_provider_type, request.remote_root_path); + // clear old bulk load result + reset_local_bulk_load_states(app->app_id, app->app_name, true); // avoid possible load balancing _meta_svc->set_function_level(meta_function_level::fl_steady); @@ -234,21 +246,31 @@ void bulk_load_service::create_app_bulk_load_dir(const std::string &app_name, ainfo.cluster_name = req.cluster_name; ainfo.file_provider_type = req.file_provider_type; ainfo.remote_root_path = req.remote_root_path; - blob value = dsn::json::json_forwarder::encode(ainfo); + ainfo.is_ever_ingesting = false; + ainfo.bulk_load_err = ERR_OK; - _meta_svc->get_meta_storage()->create_node( - get_app_bulk_load_path(app_id), std::move(value), [rpc, ainfo, this]() { - dinfo_f("create app({}) bulk load dir", ainfo.app_name); - { - zauto_write_lock l(_lock); - _app_bulk_load_info[ainfo.app_id] = ainfo; - _apps_pending_sync_flag[ainfo.app_id] = false; - _apps_rollback_count[ainfo.app_id] = 0; - } - for (int32_t i = 0; i < ainfo.partition_count; ++i) { - create_partition_bulk_load_dir( - ainfo.app_name, gpid(ainfo.app_id, i), ainfo.partition_count, std::move(rpc)); - } + _meta_svc->get_meta_storage()->delete_node_recursively( + get_app_bulk_load_path(app_id), [this, rpc, ainfo]() { + std::string bulk_load_path = get_app_bulk_load_path(ainfo.app_id); + ddebug_f("remove app({}) bulk load dir {} succeed", ainfo.app_name, bulk_load_path); + + blob value = dsn::json::json_forwarder::encode(ainfo); + _meta_svc->get_meta_storage()->create_node( + std::move(bulk_load_path), std::move(value), [this, rpc, ainfo]() { + dinfo_f("create app({}) bulk load dir", ainfo.app_name); + { + zauto_write_lock l(_lock); + _app_bulk_load_info[ainfo.app_id] = ainfo; + _apps_pending_sync_flag[ainfo.app_id] = false; + _apps_rollback_count[ainfo.app_id] = 0; + } + for (int32_t i = 0; i < ainfo.partition_count; ++i) { + create_partition_bulk_load_dir(ainfo.app_name, + gpid(ainfo.app_id, i), + ainfo.partition_count, + std::move(rpc)); + } + }); }); } @@ -437,7 +459,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, primary_addr.to_string(), response.err.to_string(), dsn::enum_to_string(response.primary_bulk_load_status)); - handle_bulk_load_failed(pid.get_app_id()); + handle_bulk_load_failed(pid.get_app_id(), response.err); try_resend_bulk_load_request(app_name, pid); return; } @@ -550,7 +572,17 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons pid, kv.first.to_string(), bulk_load_states.download_status); - handle_bulk_load_failed(pid.get_app_id()); + + error_code err = ERR_UNKNOWN; + // ERR_FILE_OPERATION_FAILED: local file system error + // ERR_FS_INTERNAL: remote file system error + // ERR_CORRUPTION: file not exist or damaged + if (ERR_FILE_OPERATION_FAILED == bulk_load_states.download_status || + ERR_FS_INTERNAL == bulk_load_states.download_status || + ERR_CORRUPTION == bulk_load_states.download_status) { + err = bulk_load_states.download_status; + } + handle_bulk_load_failed(pid.get_app_id(), err); return; } } @@ -619,7 +651,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, pid, kv.first.to_string()); decrease_app_ingestion_count(pid); - handle_bulk_load_failed(pid.get_app_id()); + handle_bulk_load_failed(pid.get_app_id(), ERR_INGESTION_FAILED); return; } } @@ -717,8 +749,9 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name); return; } - ddebug_f("app({}) all partitions cleanup bulk load context", app_name); - remove_bulk_load_dir_on_remote_storage(std::move(app), true); + ddebug_f("app({}) update app to not bulk loading", app_name); + update_app_not_bulk_loading_on_remote_storage(std::move(app)); + reset_local_bulk_load_states(pid.get_app_id(), app_name, false); } } } @@ -797,8 +830,12 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, "app({}) has been rollback to downloading for {} times, make bulk load process failed", app_name, _apps_rollback_count[pid.get_app_id()]); - update_app_status_on_remote_storage_unlocked(pid.get_app_id(), - bulk_load_status::type::BLS_FAILED); + + update_app_status_on_remote_storage_unlocked( + pid.get_app_id(), + bulk_load_status::BLS_FAILED, + _app_bulk_load_info[pid.get_app_id()].is_ever_ingesting ? ERR_INGESTION_FAILED + : ERR_RETRY_EXHAUSTED); return; } ddebug_f("app({}) will rolling back from {} to {}, current rollback_count = {}", @@ -813,12 +850,12 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::handle_bulk_load_failed(int32_t app_id) +void bulk_load_service::handle_bulk_load_failed(int32_t app_id, error_code err) { zauto_write_lock l(_lock); if (!_apps_cleaning_up[app_id]) { _apps_cleaning_up[app_id] = true; - update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED); + update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED, err); } } @@ -828,7 +865,7 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string zauto_write_lock l(_lock); if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) { _apps_cleaning_up[app_id] = true; - remove_bulk_load_dir_on_remote_storage(app_id, app_name); + reset_local_bulk_load_states_unlocked(app_id, app_name, false); } } @@ -970,7 +1007,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply( // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::update_app_status_on_remote_storage_unlocked( - int32_t app_id, bulk_load_status::type new_status, bool should_send_request) + int32_t app_id, bulk_load_status::type new_status, error_code err, bool should_send_request) { FAIL_POINT_INJECT_F("meta_update_app_status_on_remote_storage_unlocked", [](dsn::string_view) {}); @@ -998,6 +1035,7 @@ void bulk_load_service::update_app_status_on_remote_storage_unlocked( this, app_id, new_status, + err, should_send_request), 0, std::chrono::seconds(1)); @@ -1005,7 +1043,12 @@ void bulk_load_service::update_app_status_on_remote_storage_unlocked( } _apps_pending_sync_flag[app_id] = true; + + if (bulk_load_status::BLS_INGESTING == new_status) { + ainfo.is_ever_ingesting = true; + } ainfo.status = new_status; + ainfo.bulk_load_err = err; blob value = dsn::json::json_forwarder::encode(ainfo); _meta_svc->get_meta_storage()->set_data( @@ -1113,7 +1156,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g derror_f("app({}) partition({}) doesn't have bulk load metadata, set bulk load failed", app_name, pid); - handle_bulk_load_failed(pid.get_app_id()); + handle_bulk_load_failed(pid.get_app_id(), ERR_CORRUPTION); return; } @@ -1221,10 +1264,13 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err, primary_addr.to_string(), resp.err, resp.rocksdb_error); - tasking::enqueue( - LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::handle_bulk_load_failed, this, pid.get_app_id())); + + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::handle_bulk_load_failed, + this, + pid.get_app_id(), + ERR_INGESTION_FAILED)); return; } @@ -1242,7 +1288,7 @@ void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id, _meta_svc->get_meta_storage()->delete_node_recursively( std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() { ddebug_f("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path); - reset_local_bulk_load_states(app_id, app_name); + reset_local_bulk_load_states(app_id, app_name, true); }); } @@ -1254,7 +1300,7 @@ void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptrget_meta_storage()->delete_node_recursively( std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() { ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path); - reset_local_bulk_load_states(app->app_id, app->app_name); + reset_local_bulk_load_states(app->app_id, app->app_name, true); if (set_app_not_bulk_loading) { update_app_not_bulk_loading_on_remote_storage(std::move(app)); } @@ -1275,23 +1321,38 @@ inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map &my } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name) +void bulk_load_service::reset_local_bulk_load_states_unlocked(int32_t app_id, + const std::string &app_name, + bool is_reset_result) { - zauto_write_lock l(_lock); - _app_bulk_load_info.erase(app_id); _apps_in_progress_count.erase(app_id); _apps_pending_sync_flag.erase(app_id); erase_map_elem_by_id(app_id, _partitions_pending_sync_flag); - erase_map_elem_by_id(app_id, _partitions_bulk_load_state); - erase_map_elem_by_id(app_id, _partition_bulk_load_info); erase_map_elem_by_id(app_id, _partitions_total_download_progress); - erase_map_elem_by_id(app_id, _partitions_cleaned_up); _apps_rolling_back.erase(app_id); _apps_rollback_count.erase(app_id); _apps_ingesting_count.erase(app_id); - _apps_cleaning_up.erase(app_id); _bulk_load_app_id.erase(app_id); - ddebug_f("reset local app({}) bulk load context", app_name); + + if (is_reset_result) { + _app_bulk_load_info.erase(app_id); + erase_map_elem_by_id(app_id, _partitions_bulk_load_state); + erase_map_elem_by_id(app_id, _partition_bulk_load_info); + erase_map_elem_by_id(app_id, _partitions_cleaned_up); + _apps_cleaning_up.erase(app_id); + } + + ddebug_f( + "reset local app({}) bulk load context, is_reset_result({})", app_name, is_reset_result); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, + const std::string &app_name, + bool is_reset_result) +{ + zauto_write_lock l(_lock); + reset_local_bulk_load_states_unlocked(app_id, app_name, is_reset_result); } // ThreadPool: THREAD_POOL_META_STATE @@ -1307,6 +1368,7 @@ void bulk_load_service::update_app_not_bulk_loading_on_remote_storage( zauto_write_lock l(app_lock()); app->is_bulk_loading = false; ddebug_f("app({}) update app is_bulk_loading to false", app->app_name); + _meta_svc->unlock_meta_op_status(); }); } @@ -1362,7 +1424,7 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc) } ddebug_f("app({}) restart bulk load", app_name); update_app_status_on_remote_storage_unlocked( - app_id, bulk_load_status::BLS_DOWNLOADING, true); + app_id, bulk_load_status::BLS_DOWNLOADING, ERR_OK, true); } break; case bulk_load_control_type::BLC_CANCEL: if (app_status != bulk_load_status::BLS_DOWNLOADING && @@ -1380,8 +1442,10 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc) app_name, control_type == bulk_load_control_type::BLC_FORCE_CANCEL ? "force" : "", dsn::enum_to_string(app_status)); - update_app_status_on_remote_storage_unlocked( - app_id, bulk_load_status::BLS_CANCELED, app_status == bulk_load_status::BLS_PAUSED); + update_app_status_on_remote_storage_unlocked(app_id, + bulk_load_status::BLS_CANCELED, + ERR_OK, + app_status == bulk_load_status::BLS_PAUSED); } break; default: break; @@ -1402,17 +1466,16 @@ void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) if (app == nullptr || app->status != app_status::AS_AVAILABLE) { auto hint_msg = fmt::format("app({}) is not existed or not available", app_name); derror_f("{}", hint_msg); - response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.err = (app == nullptr) ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; response.__set_hint_msg(hint_msg); return; } if (!app->is_bulk_loading) { - auto hint_msg = fmt::format("app({}) is not during bulk load", app_name); - derror_f("{}", hint_msg); - response.err = ERR_INVALID_STATE; + auto hint_msg = + fmt::format("app({}) is not during bulk load, return last time result", app_name); + dwarn_f("{}", hint_msg); response.__set_hint_msg(hint_msg); - return; } int32_t app_id = app->app_id; @@ -1436,6 +1499,12 @@ void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) } } + response.is_bulk_loading = app->is_bulk_loading; + + if (!app->is_bulk_loading && bulk_load_status::BLS_FAILED == response.app_status) { + response.err = get_app_bulk_load_err_unlocked(app_id); + } + ddebug_f("query app({}) bulk_load_status({}) succeed", app_name, dsn::enum_to_string(response.app_status)); @@ -1710,6 +1779,10 @@ void bulk_load_service::do_continue_app_bulk_load( const int32_t same_count = pinfo_map.size() - different_count; const int32_t invalid_count = partition_count - pinfo_map.size(); + if (!_meta_svc->try_lock_meta_op_status(meta_op_status::BULKLOAD)) { + derror_f("fatal, the op status of meta server must be meta_op_status::FREE"); + return; + } ddebug_f( "app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} " "partitions have bulk_load_info, {} partitions have same status with app, {} " @@ -1858,16 +1931,6 @@ void bulk_load_service::check_app_bulk_load_states(std::shared_ptr ap return; } - if (err == ERR_OK && !is_app_bulk_loading) { - derror_f("app({}): bulk load dir({}) exist, but is_bulk_loading = {}, remove " - "useless bulk load dir", - app->app_name, - app_path, - is_app_bulk_loading); - remove_bulk_load_dir_on_remote_storage(std::move(app), false); - return; - } - // Normal cases: // err = ERR_OBJECT_NOT_FOUND, is_app_bulk_load = false: app is not executing bulk load // err = ERR_OK, is_app_bulk_load = true: app used to be executing bulk load diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 1a7f70101d..23e533e5e4 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -40,13 +40,17 @@ struct app_bulk_load_info std::string file_provider_type; bulk_load_status::type status; std::string remote_root_path; + bool is_ever_ingesting; + error_code bulk_load_err; DEFINE_JSON_SERIALIZATION(app_id, partition_count, app_name, cluster_name, file_provider_type, status, - remote_root_path) + remote_root_path, + is_ever_ingesting, + bulk_load_err) }; struct partition_bulk_load_info @@ -176,7 +180,7 @@ class bulk_load_service void try_rollback_to_downloading(const std::string &app_name, const gpid &pid); - void handle_bulk_load_failed(int32_t app_id); + void handle_bulk_load_failed(int32_t app_id, error_code err); // Called when app bulk load status update to ingesting // create ingestion_request and send it to primary @@ -192,7 +196,14 @@ class bulk_load_service const gpid &pid, const rpc_address &primary_addr); - void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name); + // is_reset_all + // - true : reset all states in memory + // - false : keep the bulk load results in memory, reset others + void reset_local_bulk_load_states_unlocked(int32_t app_id, + const std::string &app_name, + bool is_reset_all); + void + reset_local_bulk_load_states(int32_t app_id, const std::string &app_name, bool is_reset_all); /// /// update bulk load states to remote storage functions @@ -231,6 +242,7 @@ class bulk_load_service // update app bulk load status on remote storage void update_app_status_on_remote_storage_unlocked(int32_t app_id, bulk_load_status::type new_status, + error_code err = ERR_OK, bool should_send_request = false); void update_app_status_on_remote_storage_reply(const app_bulk_load_info &ainfo, @@ -392,6 +404,16 @@ class bulk_load_service } } + inline error_code get_app_bulk_load_err_unlocked(int32_t app_id) const + { + const auto &iter = _app_bulk_load_info.find(app_id); + if (iter != _app_bulk_load_info.end()) { + return iter->second.bulk_load_err; + } else { + return ERR_OK; + } + } + inline bool is_app_bulk_loading_unlocked(int32_t app_id) const { return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end()); diff --git a/src/meta/test/meta_bulk_load_service_test.cpp b/src/meta/test/meta_bulk_load_service_test.cpp index e8bce1d102..fd660af532 100644 --- a/src/meta/test/meta_bulk_load_service_test.cpp +++ b/src/meta/test/meta_bulk_load_service_test.cpp @@ -168,6 +168,11 @@ class bulk_load_service_test : public meta_test_base return bulk_svc().get_app_bulk_load_status_unlocked(app_id); } + error_code get_app_bulk_load_err(int32_t app_id) + { + return bulk_svc().get_app_bulk_load_err_unlocked(app_id); + } + void test_on_partition_ingestion_reply(ingestion_response &resp, const gpid &pid, error_code rpc_err = ERR_OK) @@ -178,7 +183,7 @@ class bulk_load_service_test : public meta_test_base void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name) { - bulk_svc().reset_local_bulk_load_states(app_id, app_name); + bulk_svc().reset_local_bulk_load_states(app_id, app_name, true); } int32_t get_app_in_process_count(int32_t app_id) @@ -365,6 +370,9 @@ class bulk_load_service_test : public meta_test_base return bulk_svc()._bulk_load_app_id.find(app_id) == bulk_svc()._bulk_load_app_id.end(); } + meta_op_status get_op_status() { return _ms->get_op_status(); } + + void unlock_meta_op_status() { return _ms->unlock_meta_op_status(); } public: int32_t APP_ID = 1; std::string APP_NAME = "bulk_load_test"; @@ -381,6 +389,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_with_not_existed_app) { auto resp = start_bulk_load("table_not_exist"); ASSERT_EQ(resp.err, ERR_APP_NOT_EXIST); + meta_op_status st = get_op_status(); + ASSERT_EQ(st, meta_op_status::FREE); } TEST_F(bulk_load_service_test, start_bulk_load_with_wrong_provider) @@ -388,6 +398,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_with_wrong_provider) create_app(APP_NAME); error_code err = check_start_bulk_load_request_params("wrong_provider", 1, PARTITION_COUNT); ASSERT_EQ(err, ERR_INVALID_PARAMETERS); + meta_op_status st = get_op_status(); + ASSERT_EQ(st, meta_op_status::FREE); } TEST_F(bulk_load_service_test, start_bulk_load_succeed) @@ -400,7 +412,9 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed) auto resp = start_bulk_load(APP_NAME); ASSERT_EQ(resp.err, ERR_OK); ASSERT_TRUE(app_is_bulk_loading(APP_NAME)); - + meta_op_status st = get_op_status(); + ASSERT_EQ(st, meta_op_status::BULKLOAD); + unlock_meta_op_status(); fail::teardown(); } @@ -419,6 +433,9 @@ TEST_F(bulk_load_service_test, check_partition_status_app_wrong_test) app->status = app_status::AS_DROPPED; ASSERT_FALSE(check_partition_status(table_name, false, false, gpid(app->app_id, 0), false)); ASSERT_TRUE(is_app_bulk_load_states_reset(app->app_id)); + meta_op_status st = get_op_status(); + ASSERT_EQ(st, meta_op_status::BULKLOAD); + unlock_meta_op_status(); } TEST_F(bulk_load_service_test, check_partition_status_test) @@ -497,7 +514,7 @@ TEST_F(bulk_load_service_test, control_bulk_load_test) TEST_F(bulk_load_service_test, query_bulk_load_status_with_wrong_state) { create_app(APP_NAME); - ASSERT_EQ(query_bulk_load(APP_NAME), ERR_INVALID_STATE); + ASSERT_EQ(query_bulk_load(APP_NAME), ERR_OK); } TEST_F(bulk_load_service_test, query_bulk_load_status_success) @@ -528,10 +545,12 @@ class bulk_load_process_test : public bulk_load_service_test _app_id = app->app_id; _partition_count = app->partition_count; ASSERT_EQ(app->is_bulk_loading, true); + ASSERT_EQ(get_op_status(), meta_op_status::BULKLOAD); } void TearDown() { + unlock_meta_op_status(); fail::teardown(); bulk_load_service_test::TearDown(); } @@ -678,6 +697,7 @@ TEST_F(bulk_load_process_test, downloading_fs_error) test_on_partition_bulk_load_reply( _partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_FS_INTERNAL); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_FS_INTERNAL); } TEST_F(bulk_load_process_test, downloading_busy) @@ -692,6 +712,7 @@ TEST_F(bulk_load_process_test, downloading_corrupt) mock_response_progress(ERR_CORRUPTION, false); test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_CORRUPTION); } TEST_F(bulk_load_process_test, downloading_report_metadata) @@ -751,6 +772,7 @@ TEST_F(bulk_load_process_test, ingestion_error) test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); ASSERT_EQ(get_app_ingesting_count(_app_id), 2); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_INGESTION_FAILED); } TEST_F(bulk_load_process_test, normal_succeed) @@ -759,6 +781,7 @@ TEST_F(bulk_load_process_test, normal_succeed) test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_INGESTING); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); ASSERT_EQ(get_app_ingesting_count(_app_id), 0); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_OK); } TEST_F(bulk_load_process_test, succeed_not_all_finished) @@ -766,6 +789,7 @@ TEST_F(bulk_load_process_test, succeed_not_all_finished) mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED); test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_OK); } TEST_F(bulk_load_process_test, succeed_all_finished) @@ -773,6 +797,7 @@ TEST_F(bulk_load_process_test, succeed_all_finished) mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED); test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED); ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_OK); } TEST_F(bulk_load_process_test, cancel_not_all_finished) @@ -794,6 +819,7 @@ TEST_F(bulk_load_process_test, failed_not_all_finished) mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED); test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_OK); } TEST_F(bulk_load_process_test, failed_all_finished) @@ -801,6 +827,7 @@ TEST_F(bulk_load_process_test, failed_all_finished) mock_response_cleaned_up_flag(true, bulk_load_status::BLS_FAILED); test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_FAILED); ASSERT_FALSE(app_is_bulk_loading(APP_NAME)); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_OK); } TEST_F(bulk_load_process_test, pausing) @@ -849,6 +876,7 @@ TEST_F(bulk_load_process_test, rollback_count_exceed) _partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_INVALID_STATE, true); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); ASSERT_EQ(get_app_in_process_count(_app_id), _partition_count); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_RETRY_EXHAUSTED); } TEST_F(bulk_load_process_test, response_ingestion_error) @@ -903,6 +931,7 @@ TEST_F(bulk_load_process_test, ingest_wrong) wait_all(); ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED); ASSERT_EQ(get_app_ingesting_count(_app_id), 3); + ASSERT_EQ(get_app_bulk_load_err(_app_id), ERR_INGESTION_FAILED); } TEST_F(bulk_load_process_test, ingest_succeed) @@ -973,6 +1002,8 @@ class bulk_load_failover_test : public bulk_load_service_test ainfo.remote_root_path = ROOT_PATH; ainfo.partition_count = partition_count; ainfo.status = status; + ainfo.is_ever_ingesting = false; + ainfo.bulk_load_err = ERR_OK; _app_bulk_load_info_map[app_id] = ainfo; } diff --git a/src/meta/test/meta_http_service_test.cpp b/src/meta/test/meta_http_service_test.cpp index 5dee7444f3..1dba8e8356 100644 --- a/src/meta/test/meta_http_service_test.cpp +++ b/src/meta/test/meta_http_service_test.cpp @@ -250,7 +250,7 @@ class meta_bulk_load_http_test : public meta_test_base void reset_local_bulk_load_states() { auto app = find_app(APP_NAME); - bulk_svc().reset_local_bulk_load_states(app->app_id, APP_NAME); + bulk_svc().reset_local_bulk_load_states(app->app_id, APP_NAME, true); app->is_bulk_loading = false; } @@ -307,7 +307,7 @@ TEST_F(meta_bulk_load_http_test, query_bulk_load_request) {APP_NAME, R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_DOWNLOADING"})"}, {NOT_BULK_LOAD, - R"({"error":"ERR_INVALID_STATE","app_status":"replication::bulk_load_status::BLS_INVALID"})"}, + R"({"error":"ERR_OK","app_status":"replication::bulk_load_status::BLS_INVALID"})"}, {NOT_FOUND, R"({"error":"ERR_APP_NOT_EXIST","app_status":"replication::bulk_load_status::BLS_INVALID"})"}}; for (const auto &test : tests) {