From 58cf8ec8bd6ff69492c04e579999b26586e69097 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 15 May 2020 14:20:58 +0800 Subject: [PATCH 1/4] add meta on_partition_bulk_load_reply --- .../replication/common/replication_common.cpp | 10 ++ .../replication/common/replication_common.h | 2 + .../meta_server/meta_bulk_load_service.cpp | 151 ++++++++++++++++++ .../meta_server/meta_bulk_load_service.h | 39 +++++ .../unit_test/meta_bulk_load_service_test.cpp | 3 + 5 files changed, 205 insertions(+) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 6b7d0a5840..d31503edb2 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -110,6 +110,8 @@ replication_options::replication_options() max_concurrent_uploading_file_count = 10; cold_backup_checkpoint_reserve_minutes = 10; + + partition_bulk_load_interval_ms = 10000; } replication_options::~replication_options() {} @@ -519,6 +521,12 @@ void replication_options::initialize() "bulk_load_provider_root", "bulk load root on remote file provider"); + partition_bulk_load_interval_ms = (int32_t)dsn_config_get_value_uint64( + "replication", + "partition_bulk_load_interval_ms", + partition_bulk_load_interval_ms, + "every this period(ms) meta server send bulk load request to replica server"); + replica_helper::load_meta_servers(meta_servers); sanity_check(); @@ -634,7 +642,9 @@ const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS( const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( "replica.rocksdb_iteration_threshold_time_ms"); const std::string replica_envs::BUSINESS_INFO("business.info"); + const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS = 5000; namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 6c844cda4f..0875248eec 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -118,6 +118,7 @@ class replication_options int32_t cold_backup_checkpoint_reserve_minutes; std::string bulk_load_provider_root; + int32_t partition_bulk_load_interval_ms; public: replication_options(); @@ -160,6 +161,7 @@ class bulk_load_constant { public: static const std::string BULK_LOAD_INFO; + static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; // TODO(heyuchen): add more constant in further pr }; diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 6ca2167197..21ddc55929 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -312,8 +312,159 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g void bulk_load_service::on_partition_bulk_load_reply(error_code err, const bulk_load_request &request, const bulk_load_response &response) +{ + const std::string &app_name = request.app_name; + const gpid &pid = request.pid; + const rpc_address &primary_addr = request.primary_addr; + int32_t interval_ms = _meta_svc->get_options().partition_bulk_load_interval_ms; + + if (err != ERR_OK) { + derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}", + pid.get_app_id(), + pid, + err.to_string()); + try_rollback_to_downloading(pid.get_app_id(), app_name); + } else if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) { + derror_f( + "app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}", + app_name, + pid, + primary_addr.to_string(), + response.err.to_string()); + try_rollback_to_downloading(pid.get_app_id(), app_name); + } else if (response.err == ERR_BUSY) { + dwarn_f("node({}) has enough replicas downloading, wait to next round to send bulk load " + "request for app({}), partition({})", + primary_addr.to_string(), + app_name, + pid); + } else if (response.err != ERR_OK) { + derror_f("app({}), partition({}) handle bulk load response failed, error = {}, primary " + "status = {}", + app_name, + pid, + response.err.to_string(), + dsn::enum_to_string(response.primary_bulk_load_status)); + handle_bulk_load_failed(pid.get_app_id()); + } else { // response.err = ERR_OK + ballot current_ballot; + { + zauto_read_lock l(app_lock()); + std::shared_ptr app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", + app_name, + pid.get_app_id()); + handle_app_unavailable(pid.get_app_id(), app_name); + return; + } + current_ballot = app->partitions[pid.get_partition_index()].ballot; + } + if (request.ballot < current_ballot) { + dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, " + "current ballot= {}", + app_name, + pid, + request.ballot, + current_ballot); + try_rollback_to_downloading(pid.get_app_id(), app_name); + } + + // handle bulk load states reported from primary replica + bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id()); + switch (app_status) { + case bulk_load_status::BLS_DOWNLOADING: + handle_app_downloading(response, primary_addr); + break; + case bulk_load_status::BLS_DOWNLOADED: + handle_app_downloaded(response); + // when app status or partition status is ingesting, send request frequently + interval_ms = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; + break; + case bulk_load_status::BLS_INGESTING: + handle_app_ingestion(response, primary_addr); + interval_ms = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; + break; + case bulk_load_status::BLS_SUCCEED: + case bulk_load_status::BLS_FAILED: + case bulk_load_status::BLS_CANCELED: + handle_bulk_load_finish(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSING: + handle_app_pausing(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSED: + // paused not send request to replica servers + return; + default: + // do nothing in other status + break; + } + } + + // resend bulk_load_request to primary after interval_ms if app is still in bulk load + zauto_read_lock l(_lock); + if (is_app_bulk_loading_unlock(pid.get_app_id())) { + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), + 0, + std::chrono::milliseconds(interval_ms)); + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_downloading(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is downloading +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_downloaded(const bulk_load_response &response) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is downloaded +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is ingesting +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_pausing(const bulk_load_response &response, + const rpc_address &primary_addr) +{ + // TODO(heyuchen): TBD + // called by `on_partition_bulk_load_reply` when app status is pausing +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::try_rollback_to_downloading(int32_t app_id, const std::string &app_name) +{ + // TODO(heyuchen): TBD + // replica meets error during bulk load, rollback to downloading to retry bulk load +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_bulk_load_failed(int32_t app_id) { // TODO(heyuchen): TBD + // replica meets serious error during bulk load, such as file on remote storage is damaged + // should stop bulk load process, set bulk load failed } // ThreadPool: THREAD_POOL_META_STATE diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 5a65b1e855..aa81fad9ce 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -111,9 +111,27 @@ class bulk_load_service const bulk_load_request &request, const bulk_load_response &response); + void handle_app_downloading(const bulk_load_response &response, + const rpc_address &primary_addr); + + void handle_app_downloaded(const bulk_load_response &response); + + void handle_app_ingestion(const bulk_load_response &response, const rpc_address &primary_addr); + + // when app status is `succeed, `failed`, `canceled`, meta and replica should cleanup bulk load + // states + void handle_bulk_load_finish(const bulk_load_response &response, + const rpc_address &primary_addr); + + void handle_app_pausing(const bulk_load_response &response, const rpc_address &primary_addr); + // app not existed or not available during bulk load void handle_app_unavailable(int32_t app_id, const std::string &app_name); + void try_rollback_to_downloading(int32_t app_id, const std::string &app_name); + + void handle_bulk_load_failed(int32_t app_id); + /// /// update bulk load states to remote storage functions /// @@ -202,6 +220,27 @@ class bulk_load_service } } + inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id) + { + zauto_read_lock l(_lock); + return get_app_bulk_load_status_unlock(app_id); + } + + inline bulk_load_status::type get_app_bulk_load_status_unlock(int32_t app_id) const + { + const auto &iter = _app_bulk_load_info.find(app_id); + if (iter != _app_bulk_load_info.end()) { + return iter->second.status; + } else { + return bulk_load_status::BLS_INVALID; + } + } + + inline bool is_app_bulk_loading_unlock(int32_t app_id) const + { + return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end()); + } + private: friend class bulk_load_service_test; diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp index e6803176ec..ed31ad0a53 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -102,5 +102,8 @@ TEST_F(bulk_load_service_test, start_bulk_load_succeed) fail::teardown(); } + +// TODO(heyuchen): add unit tests for on_partition_bulk_load_reply + } // namespace replication } // namespace dsn From a8b06b4c68b033e46d0c7a8a2fd8482a714c4ffb Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 15 May 2020 16:23:51 +0800 Subject: [PATCH 2/4] refactor code --- .../replication/common/replication_common.cpp | 11 +- .../replication/common/replication_common.h | 4 +- .../meta_server/meta_bulk_load_service.cpp | 145 ++++++++++-------- .../meta_server/meta_bulk_load_service.h | 7 +- 4 files changed, 95 insertions(+), 72 deletions(-) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index d31503edb2..e448186d28 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -110,8 +110,6 @@ replication_options::replication_options() max_concurrent_uploading_file_count = 10; cold_backup_checkpoint_reserve_minutes = 10; - - partition_bulk_load_interval_ms = 10000; } replication_options::~replication_options() {} @@ -521,12 +519,6 @@ void replication_options::initialize() "bulk_load_provider_root", "bulk load root on remote file provider"); - partition_bulk_load_interval_ms = (int32_t)dsn_config_get_value_uint64( - "replication", - "partition_bulk_load_interval_ms", - partition_bulk_load_interval_ms, - "every this period(ms) meta server send bulk load request to replica server"); - replica_helper::load_meta_servers(meta_servers); sanity_check(); @@ -644,7 +636,8 @@ const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( const std::string replica_envs::BUSINESS_INFO("business.info"); const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); -const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS = 5000; +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5; namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 0875248eec..b96c621230 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -118,7 +118,6 @@ class replication_options int32_t cold_backup_checkpoint_reserve_minutes; std::string bulk_load_provider_root; - int32_t partition_bulk_load_interval_ms; public: replication_options(); @@ -161,7 +160,8 @@ class bulk_load_constant { public: static const std::string BULK_LOAD_INFO; - static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; + static const int32_t BULK_LOAD_REQUEST_INTERVAL; + static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL; // TODO(heyuchen): add more constant in further pr }; diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 21ddc55929..12b2a95cbb 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -316,29 +316,41 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, const std::string &app_name = request.app_name; const gpid &pid = request.pid; const rpc_address &primary_addr = request.primary_addr; - int32_t interval_ms = _meta_svc->get_options().partition_bulk_load_interval_ms; + int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL; if (err != ERR_OK) { derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}", pid.get_app_id(), pid, err.to_string()); - try_rollback_to_downloading(pid.get_app_id(), app_name); - } else if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) { + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) { derror_f( "app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}", app_name, pid, primary_addr.to_string(), response.err.to_string()); - try_rollback_to_downloading(pid.get_app_id(), app_name); - } else if (response.err == ERR_BUSY) { + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err == ERR_BUSY) { dwarn_f("node({}) has enough replicas downloading, wait to next round to send bulk load " "request for app({}), partition({})", primary_addr.to_string(), app_name, pid); - } else if (response.err != ERR_OK) { + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + if (response.err != ERR_OK) { derror_f("app({}), partition({}) handle bulk load response failed, error = {}, primary " "status = {}", app_name, @@ -346,70 +358,83 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, response.err.to_string(), dsn::enum_to_string(response.primary_bulk_load_status)); handle_bulk_load_failed(pid.get_app_id()); - } else { // response.err = ERR_OK - ballot current_ballot; - { - zauto_read_lock l(app_lock()); - std::shared_ptr app = _state->get_app(pid.get_app_id()); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", - app_name, - pid.get_app_id()); - handle_app_unavailable(pid.get_app_id(), app_name); - return; - } - current_ballot = app->partitions[pid.get_partition_index()].ballot; - } - if (request.ballot < current_ballot) { - dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, " - "current ballot= {}", - app_name, - pid, - request.ballot, - current_ballot); - try_rollback_to_downloading(pid.get_app_id(), app_name); - } + try_resend_bulk_load_request(app_name, pid, interval); + return; + } - // handle bulk load states reported from primary replica - bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id()); - switch (app_status) { - case bulk_load_status::BLS_DOWNLOADING: - handle_app_downloading(response, primary_addr); - break; - case bulk_load_status::BLS_DOWNLOADED: - handle_app_downloaded(response); - // when app status or partition status is ingesting, send request frequently - interval_ms = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; - break; - case bulk_load_status::BLS_INGESTING: - handle_app_ingestion(response, primary_addr); - interval_ms = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL_MS; - break; - case bulk_load_status::BLS_SUCCEED: - case bulk_load_status::BLS_FAILED: - case bulk_load_status::BLS_CANCELED: - handle_bulk_load_finish(response, primary_addr); - break; - case bulk_load_status::BLS_PAUSING: - handle_app_pausing(response, primary_addr); - break; - case bulk_load_status::BLS_PAUSED: - // paused not send request to replica servers + // response.err = ERR_OK + ballot current_ballot; + { + zauto_read_lock l(app_lock()); + std::shared_ptr app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", + app_name, + pid.get_app_id()); + handle_app_unavailable(pid.get_app_id(), app_name); return; - default: - // do nothing in other status - break; } + current_ballot = app->partitions[pid.get_partition_index()].ballot; + } + if (request.ballot < current_ballot) { + dwarn_f("receive out-date response, app({}), partition({}), request ballot = {}, " + "current ballot= {}", + app_name, + pid, + request.ballot, + current_ballot); + try_rollback_to_downloading(app_name, pid); + try_resend_bulk_load_request(app_name, pid, interval); + return; + } + + // handle bulk load states reported from primary replica + bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id()); + switch (app_status) { + case bulk_load_status::BLS_DOWNLOADING: + handle_app_downloading(response, primary_addr); + break; + case bulk_load_status::BLS_DOWNLOADED: + handle_app_downloaded(response); + // when app status is downloaded or ingesting, send request frequently + interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL; + break; + case bulk_load_status::BLS_INGESTING: + handle_app_ingestion(response, primary_addr); + interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL; + break; + case bulk_load_status::BLS_SUCCEED: + case bulk_load_status::BLS_FAILED: + case bulk_load_status::BLS_CANCELED: + handle_bulk_load_finish(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSING: + handle_app_pausing(response, primary_addr); + break; + case bulk_load_status::BLS_PAUSED: + // paused not send request to replica servers + return; + default: + // do nothing in other status + break; } - // resend bulk_load_request to primary after interval_ms if app is still in bulk load + try_resend_bulk_load_request(app_name, pid, interval); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name, + const gpid &pid, + const int32_t interval) +{ + FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {}); zauto_read_lock l(_lock); if (is_app_bulk_loading_unlock(pid.get_app_id())) { tasking::enqueue(LPC_META_STATE_NORMAL, _meta_svc->tracker(), std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), 0, - std::chrono::milliseconds(interval_ms)); + std::chrono::seconds(interval)); } } @@ -453,7 +478,7 @@ void bulk_load_service::handle_app_pausing(const bulk_load_response &response, } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::try_rollback_to_downloading(int32_t app_id, const std::string &app_name) +void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid) { // TODO(heyuchen): TBD // replica meets error during bulk load, rollback to downloading to retry bulk load diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index aa81fad9ce..1090a126a2 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -111,6 +111,11 @@ class bulk_load_service const bulk_load_request &request, const bulk_load_response &response); + // if app is still in bulk load, resend bulk_load_request to primary after interval seconds + void try_resend_bulk_load_request(const std::string &app_name, + const gpid &pid, + const int32_t interval); + void handle_app_downloading(const bulk_load_response &response, const rpc_address &primary_addr); @@ -128,7 +133,7 @@ class bulk_load_service // app not existed or not available during bulk load void handle_app_unavailable(int32_t app_id, const std::string &app_name); - void try_rollback_to_downloading(int32_t app_id, const std::string &app_name); + void try_rollback_to_downloading(const std::string &app_name, const gpid &pid); void handle_bulk_load_failed(int32_t app_id); From 03cb7a75ccf04ac8a82fb255bab1c33b4c366590 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 18 May 2020 11:13:30 +0800 Subject: [PATCH 3/4] small fix --- src/dist/replication/meta_server/meta_bulk_load_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 12b2a95cbb..557de61232 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -320,7 +320,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, if (err != ERR_OK) { derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}", - pid.get_app_id(), + app_name, pid, err.to_string()); try_rollback_to_downloading(app_name, pid); From 98151a405460b0558c9345359ff5ca2a8d7bf5d0 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 18 May 2020 15:43:57 +0800 Subject: [PATCH 4/4] fix log --- src/dist/replication/meta_server/meta_bulk_load_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 557de61232..6265db51cf 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -341,7 +341,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, } if (response.err == ERR_BUSY) { - dwarn_f("node({}) has enough replicas downloading, wait to next round to send bulk load " + dwarn_f("node({}) has enough replicas downloading, wait for next round to send bulk load " "request for app({}), partition({})", primary_addr.to_string(), app_name,