From a583cf2969839bca26f43fb5315941ffb89f7da7 Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Wed, 16 Dec 2020 16:38:47 +0800 Subject: [PATCH] feat(bulk-load): support user-define remote storage root path (#686) --- .../dist/replication/replication_ddl_client.h | 3 +- .../dsn/dist/replication/replication_types.h | 36 ++++++++-- src/client/replication_ddl_client.cpp | 4 +- src/common/replication_common.cpp | 5 -- src/common/replication_common.h | 1 - src/common/replication_types.cpp | 72 +++++++++++++++++++ src/meta/meta_bulk_load_service.cpp | 12 +++- src/meta/meta_bulk_load_service.h | 20 ++++-- src/meta/test/meta_bulk_load_service_test.cpp | 5 +- src/replica/bulk_load/replica_bulk_loader.cpp | 41 ++++++----- src/replica/bulk_load/replica_bulk_loader.h | 21 +++--- .../test/replica_bulk_loader_test.cpp | 6 +- src/replication.thrift | 9 ++- 13 files changed, 176 insertions(+), 59 deletions(-) diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 5c0f8718aa..e51878ee53 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -180,7 +180,8 @@ class replication_ddl_client error_with start_bulk_load(const std::string &app_name, const std::string &cluster_name, - const std::string &file_provider_type); + const std::string &file_provider_type, + const std::string &remote_root_path); error_with control_bulk_load(const std::string &app_name, const bulk_load_control_type::type control_type); diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 8ab649d274..ebd96b0e08 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -6384,12 +6384,13 @@ inline std::ostream &operator<<(std::ostream &out, const bulk_load_metadata &obj typedef struct _start_bulk_load_request__isset { _start_bulk_load_request__isset() - : app_name(false), cluster_name(false), file_provider_type(false) + : app_name(false), cluster_name(false), file_provider_type(false), remote_root_path(false) { } bool app_name : 1; bool cluster_name : 1; bool file_provider_type : 1; + bool remote_root_path : 1; } _start_bulk_load_request__isset; class start_bulk_load_request @@ -6399,12 +6400,15 @@ class start_bulk_load_request start_bulk_load_request(start_bulk_load_request &&); start_bulk_load_request &operator=(const start_bulk_load_request &); start_bulk_load_request &operator=(start_bulk_load_request &&); - start_bulk_load_request() : app_name(), cluster_name(), file_provider_type() {} + start_bulk_load_request() : app_name(), cluster_name(), file_provider_type(), remote_root_path() + { + } virtual ~start_bulk_load_request() throw(); std::string app_name; std::string cluster_name; std::string file_provider_type; + std::string remote_root_path; _start_bulk_load_request__isset __isset; @@ -6414,6 +6418,8 @@ class start_bulk_load_request void __set_file_provider_type(const std::string &val); + void __set_remote_root_path(const std::string &val); + bool operator==(const start_bulk_load_request &rhs) const { if (!(app_name == rhs.app_name)) @@ -6422,6 +6428,8 @@ class start_bulk_load_request return false; if (!(file_provider_type == rhs.file_provider_type)) return false; + if (!(remote_root_path == rhs.remote_root_path)) + return false; return true; } bool operator!=(const start_bulk_load_request &rhs) const { return !(*this == rhs); } @@ -6598,7 +6606,8 @@ typedef struct _bulk_load_request__isset cluster_name(false), ballot(false), meta_bulk_load_status(false), - query_bulk_load_metadata(false) + query_bulk_load_metadata(false), + remote_root_path(false) { } bool pid : 1; @@ -6609,6 +6618,7 @@ typedef struct _bulk_load_request__isset bool ballot : 1; bool meta_bulk_load_status : 1; bool query_bulk_load_metadata : 1; + bool remote_root_path : 1; } _bulk_load_request__isset; class bulk_load_request @@ -6624,7 +6634,8 @@ class bulk_load_request cluster_name(), ballot(0), meta_bulk_load_status((bulk_load_status::type)0), - query_bulk_load_metadata(0) + query_bulk_load_metadata(0), + remote_root_path() { } @@ -6637,6 +6648,7 @@ class bulk_load_request int64_t ballot; bulk_load_status::type meta_bulk_load_status; bool query_bulk_load_metadata; + std::string remote_root_path; _bulk_load_request__isset __isset; @@ -6656,6 +6668,8 @@ class bulk_load_request void __set_query_bulk_load_metadata(const bool val); + void __set_remote_root_path(const std::string &val); + bool operator==(const bulk_load_request &rhs) const { if (!(pid == rhs.pid)) @@ -6674,6 +6688,8 @@ class bulk_load_request return false; if (!(query_bulk_load_metadata == rhs.query_bulk_load_metadata)) return false; + if (!(remote_root_path == rhs.remote_root_path)) + return false; return true; } bool operator!=(const bulk_load_request &rhs) const { return !(*this == rhs); } @@ -6839,7 +6855,8 @@ typedef struct _group_bulk_load_request__isset config(false), provider_name(false), cluster_name(false), - meta_bulk_load_status(false) + meta_bulk_load_status(false), + remote_root_path(false) { } bool app_name : 1; @@ -6848,6 +6865,7 @@ typedef struct _group_bulk_load_request__isset bool provider_name : 1; bool cluster_name : 1; bool meta_bulk_load_status : 1; + bool remote_root_path : 1; } _group_bulk_load_request__isset; class group_bulk_load_request @@ -6861,7 +6879,8 @@ class group_bulk_load_request : app_name(), provider_name(), cluster_name(), - meta_bulk_load_status((bulk_load_status::type)0) + meta_bulk_load_status((bulk_load_status::type)0), + remote_root_path() { } @@ -6872,6 +6891,7 @@ class group_bulk_load_request std::string provider_name; std::string cluster_name; bulk_load_status::type meta_bulk_load_status; + std::string remote_root_path; _group_bulk_load_request__isset __isset; @@ -6887,6 +6907,8 @@ class group_bulk_load_request void __set_meta_bulk_load_status(const bulk_load_status::type val); + void __set_remote_root_path(const std::string &val); + bool operator==(const group_bulk_load_request &rhs) const { if (!(app_name == rhs.app_name)) @@ -6901,6 +6923,8 @@ class group_bulk_load_request return false; if (!(meta_bulk_load_status == rhs.meta_bulk_load_status)) return false; + if (!(remote_root_path == rhs.remote_root_path)) + return false; return true; } bool operator!=(const group_bulk_load_request &rhs) const { return !(*this == rhs); } diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 487e8fba62..0de14b8a40 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -1552,12 +1552,14 @@ void replication_ddl_client::query_disk_info( error_with replication_ddl_client::start_bulk_load(const std::string &app_name, const std::string &cluster_name, - const std::string &file_provider_type) + const std::string &file_provider_type, + const std::string &remote_root_path) { auto req = make_unique(); req->app_name = app_name; req->cluster_name = cluster_name; req->file_provider_type = file_provider_type; + req->remote_root_path = remote_root_path; return call_rpc_sync(start_bulk_load_rpc(std::move(req), RPC_CM_START_BULK_LOAD)); } diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index 1e9a12cb4d..fb2f0c1a26 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -514,11 +514,6 @@ void replication_options::initialize() cold_backup_checkpoint_reserve_minutes, "reserve minutes of cold backup checkpoint"); - bulk_load_provider_root = dsn_config_get_value_string("replication", - "bulk_load_provider_root", - "bulk_load_provider_root", - "bulk load root on remote file provider"); - max_concurrent_bulk_load_downloading_count = FLAGS_max_concurrent_bulk_load_downloading_count; replica_helper::load_meta_servers(meta_servers); diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 3eda1b3d24..e42d5c7edb 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -118,7 +118,6 @@ class replication_options std::string cold_backup_root; int32_t cold_backup_checkpoint_reserve_minutes; - std::string bulk_load_provider_root; int32_t max_concurrent_bulk_load_downloading_count; public: diff --git a/src/common/replication_types.cpp b/src/common/replication_types.cpp index b319b88131..fc93b3df99 100644 --- a/src/common/replication_types.cpp +++ b/src/common/replication_types.cpp @@ -14943,6 +14943,11 @@ void start_bulk_load_request::__set_file_provider_type(const std::string &val) this->file_provider_type = val; } +void start_bulk_load_request::__set_remote_root_path(const std::string &val) +{ + this->remote_root_path = val; +} + uint32_t start_bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -14986,6 +14991,14 @@ uint32_t start_bulk_load_request::read(::apache::thrift::protocol::TProtocol *ip xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->remote_root_path); + this->__isset.remote_root_path = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -15016,6 +15029,10 @@ uint32_t start_bulk_load_request::write(::apache::thrift::protocol::TProtocol *o xfer += oprot->writeString(this->file_provider_type); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("remote_root_path", ::apache::thrift::protocol::T_STRING, 4); + xfer += oprot->writeString(this->remote_root_path); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -15027,6 +15044,7 @@ void swap(start_bulk_load_request &a, start_bulk_load_request &b) swap(a.app_name, b.app_name); swap(a.cluster_name, b.cluster_name); swap(a.file_provider_type, b.file_provider_type); + swap(a.remote_root_path, b.remote_root_path); swap(a.__isset, b.__isset); } @@ -15035,6 +15053,7 @@ start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request & app_name = other648.app_name; cluster_name = other648.cluster_name; file_provider_type = other648.file_provider_type; + remote_root_path = other648.remote_root_path; __isset = other648.__isset; } start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other649) @@ -15042,6 +15061,7 @@ start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other app_name = std::move(other649.app_name); cluster_name = std::move(other649.cluster_name); file_provider_type = std::move(other649.file_provider_type); + remote_root_path = std::move(other649.remote_root_path); __isset = std::move(other649.__isset); } start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other650) @@ -15049,6 +15069,7 @@ start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_loa app_name = other650.app_name; cluster_name = other650.cluster_name; file_provider_type = other650.file_provider_type; + remote_root_path = other650.remote_root_path; __isset = other650.__isset; return *this; } @@ -15057,6 +15078,7 @@ start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_requ app_name = std::move(other651.app_name); cluster_name = std::move(other651.cluster_name); file_provider_type = std::move(other651.file_provider_type); + remote_root_path = std::move(other651.remote_root_path); __isset = std::move(other651.__isset); return *this; } @@ -15069,6 +15091,8 @@ void start_bulk_load_request::printTo(std::ostream &out) const << "cluster_name=" << to_string(cluster_name); out << ", " << "file_provider_type=" << to_string(file_provider_type); + out << ", " + << "remote_root_path=" << to_string(remote_root_path); out << ")"; } @@ -15432,6 +15456,11 @@ void bulk_load_request::__set_query_bulk_load_metadata(const bool val) this->query_bulk_load_metadata = val; } +void bulk_load_request::__set_remote_root_path(const std::string &val) +{ + this->remote_root_path = val; +} + uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -15517,6 +15546,14 @@ uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 9: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->remote_root_path); + this->__isset.remote_root_path = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -15568,6 +15605,10 @@ uint32_t bulk_load_request::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeBool(this->query_bulk_load_metadata); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("remote_root_path", ::apache::thrift::protocol::T_STRING, 9); + xfer += oprot->writeString(this->remote_root_path); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -15584,6 +15625,7 @@ void swap(bulk_load_request &a, bulk_load_request &b) swap(a.ballot, b.ballot); swap(a.meta_bulk_load_status, b.meta_bulk_load_status); swap(a.query_bulk_load_metadata, b.query_bulk_load_metadata); + swap(a.remote_root_path, b.remote_root_path); swap(a.__isset, b.__isset); } @@ -15597,6 +15639,7 @@ bulk_load_request::bulk_load_request(const bulk_load_request &other662) ballot = other662.ballot; meta_bulk_load_status = other662.meta_bulk_load_status; query_bulk_load_metadata = other662.query_bulk_load_metadata; + remote_root_path = other662.remote_root_path; __isset = other662.__isset; } bulk_load_request::bulk_load_request(bulk_load_request &&other663) @@ -15609,6 +15652,7 @@ bulk_load_request::bulk_load_request(bulk_load_request &&other663) ballot = std::move(other663.ballot); meta_bulk_load_status = std::move(other663.meta_bulk_load_status); query_bulk_load_metadata = std::move(other663.query_bulk_load_metadata); + remote_root_path = std::move(other663.remote_root_path); __isset = std::move(other663.__isset); } bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other664) @@ -15621,6 +15665,7 @@ bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other66 ballot = other664.ballot; meta_bulk_load_status = other664.meta_bulk_load_status; query_bulk_load_metadata = other664.query_bulk_load_metadata; + remote_root_path = other664.remote_root_path; __isset = other664.__isset; return *this; } @@ -15634,6 +15679,7 @@ bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other665) ballot = std::move(other665.ballot); meta_bulk_load_status = std::move(other665.meta_bulk_load_status); query_bulk_load_metadata = std::move(other665.query_bulk_load_metadata); + remote_root_path = std::move(other665.remote_root_path); __isset = std::move(other665.__isset); return *this; } @@ -15656,6 +15702,8 @@ void bulk_load_request::printTo(std::ostream &out) const << "meta_bulk_load_status=" << to_string(meta_bulk_load_status); out << ", " << "query_bulk_load_metadata=" << to_string(query_bulk_load_metadata); + out << ", " + << "remote_root_path=" << to_string(remote_root_path); out << ")"; } @@ -16046,6 +16094,11 @@ void group_bulk_load_request::__set_meta_bulk_load_status(const bulk_load_status this->meta_bulk_load_status = val; } +void group_bulk_load_request::__set_remote_root_path(const std::string &val) +{ + this->remote_root_path = val; +} + uint32_t group_bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -16115,6 +16168,14 @@ uint32_t group_bulk_load_request::read(::apache::thrift::protocol::TProtocol *ip xfer += iprot->skip(ftype); } break; + case 7: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->remote_root_path); + this->__isset.remote_root_path = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -16157,6 +16218,10 @@ uint32_t group_bulk_load_request::write(::apache::thrift::protocol::TProtocol *o xfer += oprot->writeI32((int32_t)this->meta_bulk_load_status); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("remote_root_path", ::apache::thrift::protocol::T_STRING, 7); + xfer += oprot->writeString(this->remote_root_path); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -16171,6 +16236,7 @@ void swap(group_bulk_load_request &a, group_bulk_load_request &b) swap(a.provider_name, b.provider_name); swap(a.cluster_name, b.cluster_name); swap(a.meta_bulk_load_status, b.meta_bulk_load_status); + swap(a.remote_root_path, b.remote_root_path); swap(a.__isset, b.__isset); } @@ -16182,6 +16248,7 @@ group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request & provider_name = other680.provider_name; cluster_name = other680.cluster_name; meta_bulk_load_status = other680.meta_bulk_load_status; + remote_root_path = other680.remote_root_path; __isset = other680.__isset; } group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other681) @@ -16192,6 +16259,7 @@ group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other provider_name = std::move(other681.provider_name); cluster_name = std::move(other681.cluster_name); meta_bulk_load_status = std::move(other681.meta_bulk_load_status); + remote_root_path = std::move(other681.remote_root_path); __isset = std::move(other681.__isset); } group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other682) @@ -16202,6 +16270,7 @@ group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_loa provider_name = other682.provider_name; cluster_name = other682.cluster_name; meta_bulk_load_status = other682.meta_bulk_load_status; + remote_root_path = other682.remote_root_path; __isset = other682.__isset; return *this; } @@ -16213,6 +16282,7 @@ group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_requ provider_name = std::move(other683.provider_name); cluster_name = std::move(other683.cluster_name); meta_bulk_load_status = std::move(other683.meta_bulk_load_status); + remote_root_path = std::move(other683.remote_root_path); __isset = std::move(other683.__isset); return *this; } @@ -16231,6 +16301,8 @@ void group_bulk_load_request::printTo(std::ostream &out) const << "cluster_name=" << to_string(cluster_name); out << ", " << "meta_bulk_load_status=" << to_string(meta_bulk_load_status); + out << ", " + << "remote_root_path=" << to_string(remote_root_path); out << ")"; } diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 456bb18ea7..3f0df95602 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -59,6 +59,7 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) error_code e = check_bulk_load_request_params(request.app_name, request.cluster_name, request.file_provider_type, + request.remote_root_path, app->app_id, app->partition_count, hint_msg); @@ -86,6 +87,7 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) error_code bulk_load_service::check_bulk_load_request_params(const std::string &app_name, const std::string &cluster_name, const std::string &file_provider, + const std::string &remote_root_path, const int32_t app_id, const int32_t partition_count, std::string &hint_msg) @@ -103,7 +105,8 @@ error_code bulk_load_service::check_bulk_load_request_params(const std::string & } // sync get bulk_load_info file_handler - const std::string remote_path = get_bulk_load_info_path(app_name, cluster_name); + const std::string remote_path = + get_bulk_load_info_path(app_name, cluster_name, remote_root_path); dsn::dist::block_service::create_file_request cf_req; cf_req.file_name = remote_path; cf_req.ignore_metadata = true; @@ -202,6 +205,7 @@ void bulk_load_service::create_app_bulk_load_dir(const std::string &app_name, ainfo.status = bulk_load_status::BLS_DOWNLOADING; ainfo.cluster_name = req.cluster_name; ainfo.file_provider_type = req.file_provider_type; + ainfo.remote_root_path = req.remote_root_path; blob value = dsn::json::json_forwarder::encode(ainfo); _meta_svc->get_meta_storage()->create_node( @@ -291,15 +295,17 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); req->ballot = b; req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid); + req->remote_root_path = ainfo.remote_root_path; ddebug_f("send bulk load request to node({}), app({}), partition({}), partition " - "status = {}, remote provider = {}, cluster_name = {}", + "status = {}, remote provider = {}, cluster_name = {}, remote_root_path = {}", primary_addr.to_string(), app_name, pid, dsn::enum_to_string(req->meta_bulk_load_status), req->remote_provider_name, - req->cluster_name); + req->cluster_name, + req->remote_root_path); bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, pid.thread_hash()); rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err) mutable { diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 0fd0a4dc1e..b29b38ee3a 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -23,8 +23,14 @@ struct app_bulk_load_info std::string cluster_name; std::string file_provider_type; bulk_load_status::type status; - DEFINE_JSON_SERIALIZATION( - app_id, partition_count, app_name, cluster_name, file_provider_type, status) + std::string remote_root_path; + DEFINE_JSON_SERIALIZATION(app_id, + partition_count, + app_name, + cluster_name, + file_provider_type, + status, + remote_root_path) }; struct partition_bulk_load_info @@ -110,6 +116,7 @@ class bulk_load_service error_code check_bulk_load_request_params(const std::string &app_name, const std::string &cluster_name, const std::string &file_provider, + const std::string &remote_root_path, const int32_t app_id, const int32_t partition_count, std::string &hint_msg); @@ -264,13 +271,14 @@ class bulk_load_service /// helper functions /// // get bulk_load_info path on file provider - // ///bulk_load_info + // ///bulk_load_info inline std::string get_bulk_load_info_path(const std::string &app_name, - const std::string &cluster_name) const + const std::string &cluster_name, + const std::string &remote_root_path) const { std::ostringstream oss; - oss << _meta_svc->get_options().bulk_load_provider_root << "/" << cluster_name << "/" - << app_name << "/" << bulk_load_constant::BULK_LOAD_INFO; + oss << remote_root_path << "/" << cluster_name << "/" << app_name << "/" + << bulk_load_constant::BULK_LOAD_INFO; return oss.str(); } diff --git a/src/meta/test/meta_bulk_load_service_test.cpp b/src/meta/test/meta_bulk_load_service_test.cpp index 3d79c54377..820e5a8792 100644 --- a/src/meta/test/meta_bulk_load_service_test.cpp +++ b/src/meta/test/meta_bulk_load_service_test.cpp @@ -27,6 +27,7 @@ class bulk_load_service_test : public meta_test_base request->app_name = app_name; request->cluster_name = CLUSTER; request->file_provider_type = PROVIDER; + request->remote_root_path = ROOT_PATH; start_bulk_load_rpc rpc(std::move(request), RPC_CM_START_BULK_LOAD); bulk_svc().on_start_bulk_load(rpc); @@ -40,7 +41,7 @@ class bulk_load_service_test : public meta_test_base { std::string hint_msg; return bulk_svc().check_bulk_load_request_params( - APP_NAME, CLUSTER, provider, app_id, partition_count, hint_msg); + APP_NAME, CLUSTER, provider, ROOT_PATH, app_id, partition_count, hint_msg); } error_code control_bulk_load(int32_t app_id, @@ -298,6 +299,7 @@ class bulk_load_service_test : public meta_test_base int32_t PARTITION_COUNT = 8; std::string CLUSTER = "cluster"; std::string PROVIDER = "local_service"; + std::string ROOT_PATH = "bulk_load_root"; int64_t BALLOT = 4; }; @@ -789,6 +791,7 @@ class bulk_load_failover_test : public bulk_load_service_test ainfo.app_name = app_name; ainfo.cluster_name = CLUSTER; ainfo.file_provider_type = PROVIDER; + ainfo.remote_root_path = ROOT_PATH; ainfo.partition_count = partition_count; ainfo.status = status; _app_bulk_load_info_map[app_id] = ainfo; diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 08f273a5e1..5e36b65292 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -45,19 +45,21 @@ void replica_bulk_loader::on_bulk_load(const bulk_load_request &request, return; } - ddebug_replica( - "receive bulk load request, remote provider = {}, cluster_name = {}, app_name = {}, " - "meta_bulk_load_status = {}, local bulk_load_status = {}", - request.remote_provider_name, - request.cluster_name, - request.app_name, - enum_to_string(request.meta_bulk_load_status), - enum_to_string(_status)); + ddebug_replica("receive bulk load request, remote provider = {}, remote_root_path = {}, " + "cluster_name = {}, app_name = {}, " + "meta_bulk_load_status = {}, local bulk_load_status = {}", + request.remote_provider_name, + request.remote_root_path, + request.cluster_name, + request.app_name, + enum_to_string(request.meta_bulk_load_status), + enum_to_string(_status)); error_code ec = do_bulk_load(request.app_name, request.meta_bulk_load_status, request.cluster_name, - request.remote_provider_name); + request.remote_provider_name, + request.remote_root_path); if (ec != ERR_OK) { response.err = ec; response.primary_bulk_load_status = _status; @@ -104,6 +106,7 @@ void replica_bulk_loader::broadcast_group_bulk_load(const bulk_load_request &met request->cluster_name = meta_req.cluster_name; request->provider_name = meta_req.remote_provider_name; request->meta_bulk_load_status = meta_req.meta_bulk_load_status; + request->remote_root_path = meta_req.remote_root_path; ddebug_replica("send group_bulk_load_request to {}", addr.to_string()); @@ -158,7 +161,8 @@ void replica_bulk_loader::on_group_bulk_load(const group_bulk_load_request &requ error_code ec = do_bulk_load(request.app_name, request.meta_bulk_load_status, request.cluster_name, - request.provider_name); + request.provider_name, + request.remote_root_path); if (ec != ERR_OK) { response.err = ec; response.status = _status; @@ -216,7 +220,8 @@ void replica_bulk_loader::on_group_bulk_load_reply(error_code err, error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, bulk_load_status::type meta_status, const std::string &cluster_name, - const std::string &provider_name) + const std::string &provider_name, + const std::string &remote_root_path) { if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY) { return ERR_INVALID_STATE; @@ -237,7 +242,9 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, local_status == bulk_load_status::BLS_PAUSED || local_status == bulk_load_status::BLS_INGESTING || local_status == bulk_load_status::BLS_SUCCEED) { - ec = start_download(app_name, cluster_name, provider_name); + const std::string remote_dir = get_remote_bulk_load_dir( + app_name, cluster_name, remote_root_path, get_gpid().get_partition_index()); + ec = start_download(remote_dir, provider_name); } break; case bulk_load_status::BLS_INGESTING: @@ -322,8 +329,7 @@ replica_bulk_loader::validate_status(const bulk_load_status::type meta_status, } // ThreadPool: THREAD_POOL_REPLICATION -error_code replica_bulk_loader::start_download(const std::string &app_name, - const std::string &cluster_name, +error_code replica_bulk_loader::start_download(const std::string &remote_dir, const std::string &provider_name) { if (_stub->_bulk_load_downloading_count.load() >= @@ -351,7 +357,7 @@ error_code replica_bulk_loader::start_download(const std::string &app_name, // start download _is_downloading.store(true); ddebug_replica("start to download sst files"); - error_code err = download_sst_files(app_name, cluster_name, provider_name); + error_code err = download_sst_files(remote_dir, provider_name); if (err != ERR_OK) { try_decrease_bulk_load_download_count(); } @@ -359,8 +365,7 @@ error_code replica_bulk_loader::start_download(const std::string &app_name, } // ThreadPool: THREAD_POOL_REPLICATION -error_code replica_bulk_loader::download_sst_files(const std::string &app_name, - const std::string &cluster_name, +error_code replica_bulk_loader::download_sst_files(const std::string &remote_dir, const std::string &provider_name) { FAIL_POINT_INJECT_F("replica_bulk_loader_download_sst_files", @@ -379,8 +384,6 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name, return ERR_FILE_OPERATION_FAILED; } - const std::string remote_dir = - get_remote_bulk_load_dir(app_name, cluster_name, get_gpid().get_partition_index()); dist::block_service::block_filesystem *fs = _stub->_block_service_manager.get_or_create_block_filesystem(provider_name); diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 8c4fde84f5..d5de58cc6b 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -31,7 +31,8 @@ class replica_bulk_loader : replica_base error_code do_bulk_load(const std::string &app_name, bulk_load_status::type meta_status, const std::string &cluster_name, - const std::string &provider_name); + const std::string &provider_name, + const std::string &remote_root_path); // compare meta bulk load status and local bulk load status // \return ERR_INVALID_STATE if local status is invalid @@ -43,18 +44,14 @@ class replica_bulk_loader : replica_base // replica start or restart download sst files from remote provider // \return ERR_BUSY if node has already had enough replica executing downloading // \return download errors by function `download_sst_files` - error_code start_download(const std::string &app_name, - const std::string &cluster_name, - const std::string &provider_name); + error_code start_download(const std::string &remote_dir, const std::string &provider_name); // download metadata and sst files from remote provider // metadata and sst files will be downloaded in {_dir}/.bulk_load directory // \return ERR_FILE_OPERATION_FAILED: create local bulk load dir failed // \return download metadata file error, see function `do_download` // \return parse metadata file error, see function `parse_bulk_load_metadata` - error_code download_sst_files(const std::string &app_name, - const std::string &cluster_name, - const std::string &provider_name); + error_code download_sst_files(const std::string &remote_dir, const std::string &provider_name); // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed // \return ERR_CORRUPTION: parse failed @@ -94,18 +91,18 @@ class replica_bulk_loader : replica_base /// /// bulk load path on remote file provider: - /// ///{bulk_load_info} - /// //// - /// ////bulk_load_metadata + /// ///{bulk_load_info} + /// //// + /// ////bulk_load_metadata /// // get partition's file dir on remote file provider inline std::string get_remote_bulk_load_dir(const std::string &app_name, const std::string &cluster_name, + const std::string &remote_root_path, uint32_t pidx) const { std::ostringstream oss; - oss << _replica->_options->bulk_load_provider_root << "/" << cluster_name << "/" << app_name - << "/" << pidx; + oss << remote_root_path << "/" << cluster_name << "/" << app_name << "/" << pidx; return oss.str(); } diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp index 22852125a0..11a1d5931c 100644 --- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp @@ -44,7 +44,9 @@ class replica_bulk_loader_test : public replica_test_base error_code test_start_downloading() { - return _bulk_loader->start_download(APP_NAME, CLUSTER, PROVIDER); + const std::string remote_dir = _bulk_loader->get_remote_bulk_load_dir( + APP_NAME, CLUSTER, ROOT_PATH, PID.get_partition_index()); + return _bulk_loader->start_download(remote_dir, PROVIDER); } void test_rollback_to_downloading(bulk_load_status::type cur_status) @@ -172,6 +174,7 @@ class replica_bulk_loader_test : public replica_test_base _req.meta_bulk_load_status = status; _req.pid = PID; _req.remote_provider_name = PROVIDER; + _req.remote_root_path = ROOT_PATH; stub->set_bulk_load_downloading_count(downloading_count); } @@ -407,6 +410,7 @@ class replica_bulk_loader_test : public replica_test_base std::string APP_NAME = "replica"; std::string CLUSTER = "cluster"; std::string PROVIDER = "local_service"; + std::string ROOT_PATH = "bulk_load_root"; gpid PID = gpid(1, 0); ballot BALLOT = 3; rpc_address PRIMARY = rpc_address("127.0.0.2", 34801); diff --git a/src/replication.thrift b/src/replication.thrift index 86463a95ef..bf39cbe3d0 100644 --- a/src/replication.thrift +++ b/src/replication.thrift @@ -918,9 +918,10 @@ struct bulk_load_metadata // client -> meta, start bulk load struct start_bulk_load_request { - 1:string app_name; - 2:string cluster_name; - 3:string file_provider_type; + 1:string app_name; + 2:string cluster_name; + 3:string file_provider_type; + 4:string remote_root_path; } struct start_bulk_load_response @@ -959,6 +960,7 @@ struct bulk_load_request 6:i64 ballot; 7:bulk_load_status meta_bulk_load_status; 8:bool query_bulk_load_metadata; + 9:string remote_root_path; } struct bulk_load_response @@ -991,6 +993,7 @@ struct group_bulk_load_request 4:string provider_name; 5:string cluster_name; 6:bulk_load_status meta_bulk_load_status; + 7:string remote_root_path; } struct group_bulk_load_response