diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index b7f62f8fbb..8f02380e14 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -379,6 +379,10 @@ class register_child_response; class bulk_load_metadata; +class start_bulk_load_request; + +class start_bulk_load_response; + typedef struct _mutation_header__isset { _mutation_header__isset() @@ -6314,6 +6318,119 @@ inline std::ostream &operator<<(std::ostream &out, const bulk_load_metadata &obj obj.printTo(out); return out; } + +typedef struct _start_bulk_load_request__isset +{ + _start_bulk_load_request__isset() + : app_name(false), cluster_name(false), file_provider_type(false) + { + } + bool app_name : 1; + bool cluster_name : 1; + bool file_provider_type : 1; +} _start_bulk_load_request__isset; + +class start_bulk_load_request +{ +public: + start_bulk_load_request(const start_bulk_load_request &); + start_bulk_load_request(start_bulk_load_request &&); + start_bulk_load_request &operator=(const start_bulk_load_request &); + start_bulk_load_request &operator=(start_bulk_load_request &&); + start_bulk_load_request() : app_name(), cluster_name(), file_provider_type() {} + + virtual ~start_bulk_load_request() throw(); + std::string app_name; + std::string cluster_name; + std::string file_provider_type; + + _start_bulk_load_request__isset __isset; + + void __set_app_name(const std::string &val); + + void __set_cluster_name(const std::string &val); + + void __set_file_provider_type(const std::string &val); + + bool operator==(const start_bulk_load_request &rhs) const + { + if (!(app_name == rhs.app_name)) + return false; + if (!(cluster_name == rhs.cluster_name)) + return false; + if (!(file_provider_type == rhs.file_provider_type)) + return false; + return true; + } + bool operator!=(const start_bulk_load_request &rhs) const { return !(*this == rhs); } + + bool operator<(const start_bulk_load_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(start_bulk_load_request &a, start_bulk_load_request &b); + +inline std::ostream &operator<<(std::ostream &out, const start_bulk_load_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _start_bulk_load_response__isset +{ + _start_bulk_load_response__isset() : err(false), hint_msg(false) {} + bool err : 1; + bool hint_msg : 1; +} _start_bulk_load_response__isset; + +class start_bulk_load_response +{ +public: + start_bulk_load_response(const start_bulk_load_response &); + start_bulk_load_response(start_bulk_load_response &&); + start_bulk_load_response &operator=(const start_bulk_load_response &); + start_bulk_load_response &operator=(start_bulk_load_response &&); + start_bulk_load_response() : hint_msg() {} + + virtual ~start_bulk_load_response() throw(); + ::dsn::error_code err; + std::string hint_msg; + + _start_bulk_load_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + void __set_hint_msg(const std::string &val); + + bool operator==(const start_bulk_load_response &rhs) const + { + if (!(err == rhs.err)) + return false; + if (!(hint_msg == rhs.hint_msg)) + return false; + return true; + } + bool operator!=(const start_bulk_load_response &rhs) const { return !(*this == rhs); } + + bool operator<(const start_bulk_load_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(start_bulk_load_response &a, start_bulk_load_response &b); + +inline std::ostream &operator<<(std::ostream &out, const start_bulk_load_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 59e3957ced..6b7d0a5840 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -516,7 +516,7 @@ void replication_options::initialize() bulk_load_provider_root = dsn_config_get_value_string("replication", "bulk_load_provider_root", - "bulk_load_root", + "bulk_load_provider_root", "bulk load root on remote file provider"); replica_helper::load_meta_servers(meta_servers); diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index ce7f68891a..67d24474c9 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -35,6 +35,8 @@ namespace replication { typedef std::unordered_map<::dsn::rpc_address, partition_status::type> node_statuses; typedef std::unordered_map<::dsn::rpc_address, dsn::task_ptr> node_tasks; +typedef rpc_holder start_bulk_load_rpc; + class replication_options { public: diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 0b9cf81a21..985882a087 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -14880,5 +14880,265 @@ void bulk_load_metadata::printTo(std::ostream &out) const << "file_total_size=" << to_string(file_total_size); out << ")"; } + +start_bulk_load_request::~start_bulk_load_request() throw() {} + +void start_bulk_load_request::__set_app_name(const std::string &val) { this->app_name = val; } + +void start_bulk_load_request::__set_cluster_name(const std::string &val) +{ + this->cluster_name = val; +} + +void start_bulk_load_request::__set_file_provider_type(const std::string &val) +{ + this->file_provider_type = val; +} + +uint32_t start_bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->app_name); + this->__isset.app_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->cluster_name); + this->__isset.cluster_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->file_provider_type); + this->__isset.file_provider_type = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t start_bulk_load_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("start_bulk_load_request"); + + xfer += oprot->writeFieldBegin("app_name", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->app_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("cluster_name", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->cluster_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("file_provider_type", ::apache::thrift::protocol::T_STRING, 3); + xfer += oprot->writeString(this->file_provider_type); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(start_bulk_load_request &a, start_bulk_load_request &b) +{ + using ::std::swap; + swap(a.app_name, b.app_name); + swap(a.cluster_name, b.cluster_name); + swap(a.file_provider_type, b.file_provider_type); + swap(a.__isset, b.__isset); +} + +start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other648) +{ + app_name = other648.app_name; + cluster_name = other648.cluster_name; + file_provider_type = other648.file_provider_type; + __isset = other648.__isset; +} +start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other649) +{ + app_name = std::move(other649.app_name); + cluster_name = std::move(other649.cluster_name); + file_provider_type = std::move(other649.file_provider_type); + __isset = std::move(other649.__isset); +} +start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other650) +{ + app_name = other650.app_name; + cluster_name = other650.cluster_name; + file_provider_type = other650.file_provider_type; + __isset = other650.__isset; + return *this; +} +start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other651) +{ + app_name = std::move(other651.app_name); + cluster_name = std::move(other651.cluster_name); + file_provider_type = std::move(other651.file_provider_type); + __isset = std::move(other651.__isset); + return *this; +} +void start_bulk_load_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "start_bulk_load_request("; + out << "app_name=" << to_string(app_name); + out << ", " + << "cluster_name=" << to_string(cluster_name); + out << ", " + << "file_provider_type=" << to_string(file_provider_type); + out << ")"; +} + +start_bulk_load_response::~start_bulk_load_response() throw() {} + +void start_bulk_load_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +void start_bulk_load_response::__set_hint_msg(const std::string &val) { this->hint_msg = val; } + +uint32_t start_bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->hint_msg); + this->__isset.hint_msg = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t start_bulk_load_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("start_bulk_load_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("hint_msg", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->hint_msg); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(start_bulk_load_response &a, start_bulk_load_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.hint_msg, b.hint_msg); + swap(a.__isset, b.__isset); +} + +start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other652) +{ + err = other652.err; + hint_msg = other652.hint_msg; + __isset = other652.__isset; +} +start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other653) +{ + err = std::move(other653.err); + hint_msg = std::move(other653.hint_msg); + __isset = std::move(other653.__isset); +} +start_bulk_load_response &start_bulk_load_response:: +operator=(const start_bulk_load_response &other654) +{ + err = other654.err; + hint_msg = other654.hint_msg; + __isset = other654.__isset; + return *this; +} +start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other655) +{ + err = std::move(other655.err); + hint_msg = std::move(other655.hint_msg); + __isset = std::move(other655.__isset); + return *this; +} +void start_bulk_load_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "start_bulk_load_response("; + out << "err=" << to_string(err); + out << ", " + << "hint_msg=" << to_string(hint_msg); + out << ")"; +} } } // namespace diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index fce0f6968c..333f06eaa1 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. #include +#include #include "meta_bulk_load_service.h" @@ -29,6 +30,232 @@ void bulk_load_service::initialize_bulk_load_service() } } +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + response.err = ERR_OK; + + std::shared_ptr app; + { + zauto_read_lock l(app_lock()); + + app = _state->get_app(request.app_name); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app({}) is not existed or not available", request.app_name); + response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.hint_msg = fmt::format( + "app {}", response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped"); + return; + } + + if (app->is_bulk_loading) { + derror_f("app({}) is already executing bulk load, please wait", app->app_name); + response.err = ERR_BUSY; + response.hint_msg = "app is already executing bulk load"; + return; + } + } + + std::string hint_msg; + error_code e = check_bulk_load_request_params(request.app_name, + request.cluster_name, + request.file_provider_type, + app->app_id, + app->partition_count, + hint_msg); + if (e != ERR_OK) { + response.err = e; + response.hint_msg = hint_msg; + return; + } + + ddebug_f("app({}) start bulk load, cluster_name = {}, provider = {}", + request.app_name, + request.cluster_name, + request.file_provider_type); + + // avoid possible load balancing + _meta_svc->set_function_level(meta_function_level::fl_steady); + + do_start_app_bulk_load(std::move(app), std::move(rpc)); +} + +// ThreadPool: THREAD_POOL_META_STATE +error_code bulk_load_service::check_bulk_load_request_params(const std::string &app_name, + const std::string &cluster_name, + const std::string &file_provider, + const int32_t app_id, + const int32_t partition_count, + std::string &hint_msg) +{ + FAIL_POINT_INJECT_F("meta_check_bulk_load_request_params", + [](dsn::string_view) -> error_code { return ERR_OK; }); + + // check file provider + dsn::dist::block_service::block_filesystem *blk_fs = + _meta_svc->get_block_service_manager().get_block_filesystem(file_provider); + if (blk_fs == nullptr) { + derror_f("invalid remote file provider type: {}", file_provider); + hint_msg = "invalid file_provider"; + return ERR_INVALID_PARAMETERS; + } + + // sync get bulk_load_info file_handler + const std::string remote_path = get_bulk_load_info_path(app_name, cluster_name); + dsn::dist::block_service::create_file_request cf_req; + cf_req.file_name = remote_path; + cf_req.ignore_metadata = true; + error_code err = ERR_OK; + dsn::dist::block_service::block_file_ptr file_handler = nullptr; + blk_fs + ->create_file( + cf_req, + TASK_CODE_EXEC_INLINED, + [&err, &file_handler](const dsn::dist::block_service::create_file_response &resp) { + err = resp.err; + file_handler = resp.file_handle; + }) + ->wait(); + if (err != ERR_OK || file_handler == nullptr) { + derror_f( + "failed to get file({}) handler on remote provider({})", remote_path, file_provider); + hint_msg = "file_provider error"; + return ERR_FILE_OPERATION_FAILED; + } + + // sync read bulk_load_info on file provider + dsn::dist::block_service::read_response r_resp; + file_handler + ->read(dsn::dist::block_service::read_request{0, -1}, + TASK_CODE_EXEC_INLINED, + [&r_resp](const dsn::dist::block_service::read_response &resp) { r_resp = resp; }) + ->wait(); + if (r_resp.err != ERR_OK) { + derror_f("failed to read file({}) on remote provider({}), error = {}", + file_provider, + remote_path, + r_resp.err.to_string()); + hint_msg = "read bulk_load_info failed"; + return r_resp.err; + } + + bulk_load_info bl_info; + if (!::dsn::json::json_forwarder::decode(r_resp.buffer, bl_info)) { + derror_f("file({}) is damaged on remote file provider({})", remote_path, file_provider); + hint_msg = "bulk_load_info damaged"; + return ERR_CORRUPTION; + } + + if (bl_info.app_id != app_id || bl_info.partition_count != partition_count) { + derror_f("app({}) information is inconsistent, local app_id({}) VS remote app_id({}), " + "local partition_count({}) VS remote partition_count({})", + app_name, + app_id, + bl_info.app_id, + partition_count, + bl_info.partition_count); + hint_msg = "app_id or partition_count is inconsistent"; + return ERR_INCONSISTENT_STATE; + } + + return ERR_OK; +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::do_start_app_bulk_load(std::shared_ptr app, + start_bulk_load_rpc rpc) +{ + app_info info = *app; + info.__set_is_bulk_loading(true); + + blob value = dsn::json::json_forwarder::encode(info); + _meta_svc->get_meta_storage()->set_data( + _state->get_app_path(*app), std::move(value), [app, rpc, this]() { + { + zauto_write_lock l(app_lock()); + app->is_bulk_loading = true; + } + { + zauto_write_lock l(_lock); + _bulk_load_app_id.insert(app->app_id); + _apps_in_progress_count[app->app_id] = app->partition_count; + } + create_app_bulk_load_dir( + app->app_name, app->app_id, app->partition_count, std::move(rpc)); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::create_app_bulk_load_dir(const std::string &app_name, + int32_t app_id, + int32_t partition_count, + start_bulk_load_rpc rpc) +{ + const auto req = rpc.request(); + + app_bulk_load_info ainfo; + ainfo.app_id = app_id; + ainfo.app_name = app_name; + ainfo.partition_count = partition_count; + ainfo.status = bulk_load_status::BLS_DOWNLOADING; + ainfo.cluster_name = req.cluster_name; + ainfo.file_provider_type = req.file_provider_type; + blob value = dsn::json::json_forwarder::encode(ainfo); + + _meta_svc->get_meta_storage()->create_node( + get_app_bulk_load_path(app_id), std::move(value), [rpc, ainfo, this]() { + dinfo_f("create app({}) bulk load dir", ainfo.app_name); + { + zauto_write_lock l(_lock); + _app_bulk_load_info[ainfo.app_id] = ainfo; + _apps_pending_sync_flag[ainfo.app_id] = false; + } + for (int32_t i = 0; i < ainfo.partition_count; ++i) { + create_partition_bulk_load_dir( + ainfo.app_name, gpid(ainfo.app_id, i), ainfo.partition_count, std::move(rpc)); + } + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::create_partition_bulk_load_dir(const std::string &app_name, + const gpid &pid, + int32_t partition_count, + start_bulk_load_rpc rpc) +{ + partition_bulk_load_info pinfo; + pinfo.status = bulk_load_status::BLS_DOWNLOADING; + blob value = dsn::json::json_forwarder::encode(pinfo); + + _meta_svc->get_meta_storage()->create_node( + get_partition_bulk_load_path(pid), + std::move(value), + [app_name, pid, partition_count, rpc, pinfo, this]() { + dinfo_f("app({}) create partition({}) bulk_load_info", app_name, pid.to_string()); + { + zauto_write_lock l(_lock); + _partition_bulk_load_info[pid] = pinfo; + _partitions_pending_sync_flag[pid] = false; + if (--_apps_in_progress_count[pid.get_app_id()] == 0) { + ddebug_f("app({}) start bulk load succeed", app_name); + _apps_in_progress_count[pid.get_app_id()] = partition_count; + auto response = rpc.response(); + response.err = ERR_OK; + } + } + // start send bulk load to replica servers + partition_bulk_load(app_name, pid); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid) +{ + // TODO(heyuchen): TBD +} + // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker) { diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 41b641559c..8578e675f9 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -85,13 +85,48 @@ class bulk_load_service void initialize_bulk_load_service(); + // client -> meta server to start bulk load + void on_start_bulk_load(start_bulk_load_rpc rpc); + private: - void create_bulk_load_root_dir(error_code &err, task_tracker &tracker); + // Called by `on_start_bulk_load`, check request params + // - ERR_OK: pass params check + // - ERR_INVALID_PARAMETERS: wrong file_provider type + // - ERR_FILE_OPERATION_FAILED: file_provider error + // - ERR_OBJECT_NOT_FOUND: bulk_load_info not exist, may wrong cluster_name or app_name + // - ERR_CORRUPTION: bulk_load_info is damaged on file_provider + // - ERR_INCONSISTENT_STATE: app_id or partition_count inconsistent + error_code check_bulk_load_request_params(const std::string &app_name, + const std::string &cluster_name, + const std::string &file_provider, + const int32_t app_id, + const int32_t partition_count, + std::string &hint_msg); + + void do_start_app_bulk_load(std::shared_ptr app, start_bulk_load_rpc rpc); + + void partition_bulk_load(const std::string &app_name, const gpid &pid); + + /// + /// update bulk load states to remote storage functions + /// + + void create_app_bulk_load_dir(const std::string &app_name, + int32_t app_id, + int32_t partition_count, + start_bulk_load_rpc rpc); + + void create_partition_bulk_load_dir(const std::string &app_name, + const gpid &pid, + int32_t partition_count, + start_bulk_load_rpc rpc); /// /// sync bulk load states from remote storage /// called when service initialized or meta server leader switch /// + void create_bulk_load_root_dir(error_code &err, task_tracker &tracker); + void sync_apps_bulk_load_from_remote_stroage(error_code &err, task_tracker &tracker); /// @@ -100,6 +135,46 @@ class bulk_load_service /// void try_to_continue_bulk_load(); + /// + /// helper functions + /// + // get bulk_load_info path on file provider + // ///bulk_load_info + inline std::string get_bulk_load_info_path(const std::string &app_name, + const std::string &cluster_name) const + { + std::ostringstream oss; + oss << _meta_svc->get_options().bulk_load_provider_root << "/" << cluster_name << "/" + << app_name << "/" << bulk_load_constant::BULK_LOAD_INFO; + return oss.str(); + } + + // get app_bulk_load_info path on remote stroage + // <_bulk_load_root>/ + inline std::string get_app_bulk_load_path(int32_t app_id) const + { + std::stringstream oss; + oss << _bulk_load_root << "/" << app_id; + return oss.str(); + } + + // get partition_bulk_load_info path on remote stroage + // <_bulk_load_root>// + inline std::string get_partition_bulk_load_path(const std::string &app_bulk_load_path, + int partition_id) const + { + std::stringstream oss; + oss << app_bulk_load_path << "/" << partition_id; + return oss.str(); + } + + inline std::string get_partition_bulk_load_path(const gpid &pid) const + { + std::stringstream oss; + oss << get_app_bulk_load_path(pid.get_app_id()) << "/" << pid.get_partition_index(); + return oss.str(); + } + private: friend class bulk_load_service_test; diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 574824c2e2..fc1a6bdee9 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -387,6 +387,8 @@ void meta_service::register_rpc_handlers() register_rpc_handler_with_rpc_holder(RPC_CM_REGISTER_CHILD_REPLICA, "register_child_on_meta", &meta_service::on_register_child_on_meta); + register_rpc_handler_with_rpc_holder( + RPC_CM_START_BULK_LOAD, "start_bulk_load", &meta_service::on_start_bulk_load); } int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) @@ -947,5 +949,21 @@ void meta_service::on_register_child_on_meta(register_child_rpc rpc) server_state::sStateHash); } +void meta_service::on_start_bulk_load(start_bulk_load_rpc rpc) +{ + auto &response = rpc.response(); + RPC_CHECK_STATUS(rpc.dsn_request(), response); + + if (!_bulk_load_svc) { + derror("meta doesn't support bulk load"); + response.err = ERR_SERVICE_NOT_ACTIVE; + } else { + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _bulk_load_svc->on_start_bulk_load(std::move(rpc)); }, + server_state::sStateHash); + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 54ff89ba29..dd2bdaa5bd 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -187,6 +187,9 @@ class meta_service : public serverlet void on_app_partition_split(app_partition_split_rpc rpc); void on_register_child_on_meta(register_child_rpc rpc); + // bulk load + void on_start_bulk_load(start_bulk_load_rpc rpc); + // common routines // ret: // 1. the meta is leader @@ -201,6 +204,7 @@ class meta_service : public serverlet friend class replication_checker; friend class test::test_checker; friend class meta_service_test_app; + friend class bulk_load_service_test; replication_options _opts; meta_options _meta_opts; diff --git a/src/dist/replication/meta_server/server_state.h b/src/dist/replication/meta_server/server_state.h index 6eaa6bbd04..fdf1e71305 100644 --- a/src/dist/replication/meta_server/server_state.h +++ b/src/dist/replication/meta_server/server_state.h @@ -305,6 +305,7 @@ class server_state friend class meta_duplication_service; friend class meta_split_service; friend class bulk_load_service; + friend class bulk_load_service_test; dsn::task_tracker _tracker; diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 20c1ffba10..8ead061d22 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -904,6 +904,30 @@ struct bulk_load_metadata 2:i64 file_total_size; } +// client -> meta, start bulk load +struct start_bulk_load_request +{ + 1:string app_name; + 2:string cluster_name; + 3:string file_provider_type; +} + +struct start_bulk_load_response +{ + // Possible error: + // - ERR_OK: start bulk load succeed + // - ERR_APP_NOT_EXIST: app not exist + // - ERR_APP_DROPPED: app has been dropped + // - ERR_BUSY: app is already executing bulk load + // - ERR_INVALID_PARAMETERS: wrong file_provider type + // - ERR_FILE_OPERATION_FAILED: remote file_provider error + // - ERR_OBJECT_NOT_FOUND: bulk_load_info not exist on file_provider + // - ERR_CORRUPTION: bulk_load_info is damaged on file_provider + // - ERR_INCONSISTENT_STATE: app_id or partition_count inconsistent + 1:dsn.error_code err; + 2:string hint_msg; +} + /* service replica_s { diff --git a/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt b/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt index 05b5204474..33708072fb 100644 --- a/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt +++ b/src/dist/replication/test/meta_test/unit_test/CMakeLists.txt @@ -31,7 +31,7 @@ set(MY_PROJ_LIBS crypto gtest) -set(MY_BOOST_LIBS Boost::system Boost::filesystem) +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) # Extra files that will be installed set(MY_BINPLACES clear.sh run.sh config-ddl-test.ini config-test.ini suite1 suite2) diff --git a/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp new file mode 100644 index 0000000000..e6803176ec --- /dev/null +++ b/src/dist/replication/test/meta_test/unit_test/meta_bulk_load_service_test.cpp @@ -0,0 +1,106 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include +#include + +#include "meta_test_base.h" + +namespace dsn { +namespace replication { +class bulk_load_service_test : public meta_test_base +{ +public: + bulk_load_service_test() {} + + /// bulk load functions + + start_bulk_load_response start_bulk_load(const std::string &app_name) + { + auto request = dsn::make_unique(); + request->app_name = app_name; + request->cluster_name = CLUSTER; + request->file_provider_type = PROVIDER; + + start_bulk_load_rpc rpc(std::move(request), RPC_CM_START_BULK_LOAD); + bulk_svc().on_start_bulk_load(rpc); + wait_all(); + return rpc.response(); + } + + error_code check_start_bulk_load_request_params(const std::string provider, + int32_t app_id, + int32_t partition_count) + { + std::string hint_msg; + return bulk_svc().check_bulk_load_request_params( + APP_NAME, CLUSTER, provider, app_id, partition_count, hint_msg); + } + + bool app_is_bulk_loading(const std::string &app_name) + { + return find_app(app_name)->is_bulk_loading; + } + +public: + int32_t APP_ID = 1; + std::string APP_NAME = "bulk_load_test"; + int32_t PARTITION_COUNT = 8; + std::string CLUSTER = "cluster"; + std::string PROVIDER = "local_service"; + int64_t BALLOT = 4; +}; + +/// start bulk load unit tests +TEST_F(bulk_load_service_test, start_bulk_load_with_not_existed_app) +{ + auto resp = start_bulk_load("table_not_exist"); + ASSERT_EQ(resp.err, ERR_APP_NOT_EXIST); +} + +TEST_F(bulk_load_service_test, start_bulk_load_with_wrong_provider) +{ + create_app(APP_NAME); + error_code err = check_start_bulk_load_request_params("wrong_provider", 1, PARTITION_COUNT); + ASSERT_EQ(err, ERR_INVALID_PARAMETERS); +} + +TEST_F(bulk_load_service_test, start_bulk_load_succeed) +{ + create_app(APP_NAME); + fail::setup(); + fail::cfg("meta_check_bulk_load_request_params", "return()"); + fail::cfg("meta_bulk_load_partition_bulk_load", "return()"); + + auto resp = start_bulk_load(APP_NAME); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_TRUE(app_is_bulk_loading(APP_NAME)); + + fail::teardown(); +} +} // namespace replication +} // namespace dsn