diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 6b7d0a5840..e448186d28 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -634,7 +634,10 @@ const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS( const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( "replica.rocksdb_iteration_threshold_time_ms"); const std::string replica_envs::BUSINESS_INFO("business.info"); + const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5; namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 6c844cda4f..b96c621230 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -160,6 +160,8 @@ class bulk_load_constant { public: static const std::string BULK_LOAD_INFO; + static const int32_t BULK_LOAD_REQUEST_INTERVAL; + static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL; // TODO(heyuchen): add more constant in further pr }; diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 6ca2167197..6265db51cf 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -312,8 +312,184 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g void bulk_load_service::on_partition_bulk_load_reply(error_code err, const bulk_load_request &request, const bulk_load_response &response) +{ + const std::string &app_name = request.app_name; + const gpid &pid = request.pid; + const rpc_address &primary_addr = request.primary_addr; + int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL; + + if (err != ERR_OK) { + derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}", + app_name, + pid, + err.to_string()); + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) { + derror_f( + "app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}", + app_name, + pid, + primary_addr.to_string(), + response.err.to_string()); + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err == ERR_BUSY) { + dwarn_f("node({}) has enough replicas downloading, wait for next round to send bulk load " + "request for app({}), partition({})", + primary_addr.to_string(), + app_name, + pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err != ERR_OK) { + derror_f("app({}), partition({}) handle bulk load response failed, error = {}, primary " + "status = {}", + app_name, + pid, + response.err.to_string(), + dsn::enum_to_string(response.primary_bulk_load_status)); + handle_bulk_load_failed(pid.get_app_id()); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + // response.err = ERR_OK + ballot current_ballot; + { + zauto_read_lock l(app_lock()); + std::shared_ptr app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", + app_name, + pid.get_app_id()); + handle_app_unavailable(pid.get_app_id(), app_name); + return; + } + current_ballot = app->partitions[pid.get_partition_index()].ballot; + } + if (request.ballot < current_ballot) { + dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, " + "current ballot= {}", + app_name, + pid, + request.ballot, + current_ballot); + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + // handle bulk load states reported from primary replica + bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id()); + switch (app_status) { + case bulk_load_status::BLS_DOWNLOADING: + handle_app_downloading(response, primary_addr); + break; + case bulk_load_status::BLS_DOWNLOADED: + handle_app_downloaded(response); + // when app status is downloaded or ingesting, send request frequently + interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL; + break; + case bulk_load_status::BLS_INGESTING: + handle_app_ingestion(response, primary_addr); + interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL; + break; + case bulk_load_status::BLS_SUCCEED: + case bulk_load_status::BLS_FAILED: + case bulk_load_status::BLS_CANCELED: + handle_bulk_load_finish(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSING: + handle_app_pausing(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSED: + // paused not send request to replica servers + return; + default: + // do nothing in other status + break; + } + + try_resend_bulk_load_request(app_name, pid, interval); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name, + const gpid &pid, + const int32_t interval) +{ + FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {}); + zauto_read_lock l(_lock); + if (is_app_bulk_loading_unlock(pid.get_app_id())) { + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), + 0, + std::chrono::seconds(interval)); + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_downloading(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is downloading +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_downloaded(const bulk_load_response &response) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is downloaded +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is ingesting +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_pausing(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is pausing +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid) +{ + // TODO(heyuchen): TBD + // replica meets error during bulk load, rollback to downloading to retry bulk load +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_bulk_load_failed(int32_t app_id) { // TODO(heyuchen): TBD + // replica meets serious error during bulk load, such as file on remote storage is damaged + // should stop bulk load process, set bulk load failed } // ThreadPool: THREAD_POOL_META_STATE diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 5a65b1e855..1090a126a2 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -111,9 +111,32 @@ class bulk_load_service const bulk_load_request &request, const bulk_load_response &response); + // if app is still in bulk load, resend bulk_load_request to primary after interval seconds + void try_resend_bulk_load_request(const std::string &app_name, + const gpid &pid, + const int32_t interval); + + void handle_app_downloading(const bulk_load_response &response, + const rpc_address &primary_addr); + + void handle_app_downloaded(const bulk_load_response &response); + + void handle_app_ingestion(const bulk_load_response &response, const rpc_address &primary_addr); + + // when app status is `succeed, `failed`, `canceled`, meta and replica should cleanup bulk load + // states + void handle_bulk_load_finish(const bulk_load_response &response, + const rpc_address &primary_addr); + + void handle_app_pausing(const bulk_load_response &response, const rpc_address &primary_addr); + // app not existed or not available during bulk load void handle_app_unavailable(int32_t app_id, const std::string &app_name); + void try_rollback_to_downloading(const std::string &app_name, const gpid &pid); + + void handle_bulk_load_failed(int32_t app_id); + /// /// update bulk load states to remote storage functions /// @@ -202,6 +225,27 @@ class bulk_load_service } } + inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id) + { + zauto_read_lock l(_lock); + return get_app_bulk_load_status_unlock(app_id); + } + + inline bulk_load_status::type get_app_bulk_load_status_unlock(int32_t app_id) const + { + const auto &iter = _app_bulk_load_info.find(app_id); + if (iter != _app_bulk_load_info.end()) { + return iter->second.status; + } else { + return bulk_load_status::BLS_INVALID; + } + } + + inline bool is_app_bulk_loading_unlock(int32_t app_id) const + { + return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end()); + } + private: friend class bulk_load_service_test; diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index e6803176ec..ed31ad0a53 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -102,5 +102,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed) fail::teardown(); } + +// TODO(heyuchen): add unit tests for on_partition_bulk_load_reply + } // namespace replication } // namespace dsn