From 9b98a654b5af3a9ef6eb96914e0ff2db45541d9b Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Wed, 13 Jan 2021 11:20:11 +0800 Subject: [PATCH] feat(split): add meta control split (#679) --- .../dsn/dist/replication/replication.codes.h | 1 + .../dist/replication/replication_ddl_client.h | 11 + .../dsn/dist/replication/replication_types.h | 145 ++ src/client/replication_ddl_client.cpp | 35 + src/common/replication_common.h | 1 + src/common/replication_types.cpp | 1528 ++++++++++------- src/meta/meta_service.cpp | 20 + src/meta/meta_service.h | 1 + src/meta/meta_split_service.cpp | 136 ++ src/meta/meta_split_service.h | 22 + src/meta/test/meta_split_service_test.cpp | 223 +++ src/replication.thrift | 33 + 12 files changed, 1541 insertions(+), 615 deletions(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 39c378b7e4..dd813f014b 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -106,6 +106,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_DUPLICATION_SYNC, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_UPDATE_APP_ENV, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_DDD_DIAGNOSE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_PARTITION_SPLIT, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_PARTITION_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_REGISTER_CHILD_REPLICA, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 837d3f570b..09ccb79f18 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -195,6 +195,17 @@ class replication_ddl_client // partition split error_with start_partition_split(const std::string &app_name, int partition_count); + error_with pause_partition_split(const std::string &app_name, + const int32_t parent_pidx); + error_with restart_partition_split(const std::string &app_name, + const int32_t parent_pidx); + error_with cancel_partition_split(const std::string &app_name, + const int32_t old_partition_count); + error_with + control_partition_split(const std::string &app_name, + split_control_type::type control_type, + const int32_t parent_pidx, + const int32_t old_partition_count); private: bool static valid_app_char(int c); diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 63806a765c..728e5f6861 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -189,6 +189,18 @@ struct duplication_fail_mode extern const std::map _duplication_fail_mode_VALUES_TO_NAMES; +struct split_control_type +{ + enum type + { + PAUSE = 0, + RESTART = 1, + CANCEL = 2 + }; +}; + +extern const std::map _split_control_type_VALUES_TO_NAMES; + struct bulk_load_status { enum type @@ -449,6 +461,10 @@ class start_partition_split_request; class start_partition_split_response; +class control_split_request; + +class control_split_response; + class notify_catch_up_request; class notify_cacth_up_response; @@ -6279,6 +6295,135 @@ inline std::ostream &operator<<(std::ostream &out, const start_partition_split_r return out; } +typedef struct _control_split_request__isset +{ + _control_split_request__isset() + : app_name(false), control_type(false), parent_pidx(false), old_partition_count(false) + { + } + bool app_name : 1; + bool control_type : 1; + bool parent_pidx : 1; + bool old_partition_count : 1; +} _control_split_request__isset; + +class control_split_request +{ +public: + control_split_request(const control_split_request &); + control_split_request(control_split_request &&); + control_split_request &operator=(const control_split_request &); + control_split_request &operator=(control_split_request &&); + control_split_request() + : app_name(), + control_type((split_control_type::type)0), + parent_pidx(0), + old_partition_count(0) + { + } + + virtual ~control_split_request() throw(); + std::string app_name; + split_control_type::type control_type; + int32_t parent_pidx; + int32_t old_partition_count; + + _control_split_request__isset __isset; + + void __set_app_name(const std::string &val); + + void __set_control_type(const split_control_type::type val); + + void __set_parent_pidx(const int32_t val); + + void __set_old_partition_count(const int32_t val); + + bool operator==(const control_split_request &rhs) const + { + if (!(app_name == rhs.app_name)) + return false; + if (!(control_type == rhs.control_type)) + return false; + if (!(parent_pidx == rhs.parent_pidx)) + return false; + if (__isset.old_partition_count != rhs.__isset.old_partition_count) + return false; + else if (__isset.old_partition_count && !(old_partition_count == rhs.old_partition_count)) + return false; + return true; + } + bool operator!=(const control_split_request &rhs) const { return !(*this == rhs); } + + bool operator<(const control_split_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(control_split_request &a, control_split_request &b); + +inline std::ostream &operator<<(std::ostream &out, const control_split_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _control_split_response__isset +{ + _control_split_response__isset() : err(false), hint_msg(false) {} + bool err : 1; + bool hint_msg : 1; +} _control_split_response__isset; + +class control_split_response +{ +public: + control_split_response(const control_split_response &); + control_split_response(control_split_response &&); + control_split_response &operator=(const control_split_response &); + control_split_response &operator=(control_split_response &&); + control_split_response() : hint_msg() {} + + virtual ~control_split_response() throw(); + ::dsn::error_code err; + std::string hint_msg; + + _control_split_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + void __set_hint_msg(const std::string &val); + + bool operator==(const control_split_response &rhs) const + { + if (!(err == rhs.err)) + return false; + if (__isset.hint_msg != rhs.__isset.hint_msg) + return false; + else if (__isset.hint_msg && !(hint_msg == rhs.hint_msg)) + return false; + return true; + } + bool operator!=(const control_split_response &rhs) const { return !(*this == rhs); } + + bool operator<(const control_split_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(control_split_response &a, control_split_response &b); + +inline std::ostream &operator<<(std::ostream &out, const control_split_response &obj) +{ + obj.printTo(out); + return out; +} + typedef struct _notify_catch_up_request__isset { _notify_catch_up_request__isset() diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 2f928fd371..9495e83959 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -1604,5 +1604,40 @@ replication_ddl_client::start_partition_split(const std::string &app_name, int n return call_rpc_sync(start_split_rpc(std::move(req), RPC_CM_START_PARTITION_SPLIT)); } +error_with +replication_ddl_client::pause_partition_split(const std::string &app_name, + const int32_t parent_pidx) +{ + return control_partition_split(app_name, split_control_type::PAUSE, parent_pidx, 0); +} + +error_with +replication_ddl_client::restart_partition_split(const std::string &app_name, + const int32_t parent_pidx) +{ + return control_partition_split(app_name, split_control_type::RESTART, parent_pidx, 0); +} + +error_with +replication_ddl_client::cancel_partition_split(const std::string &app_name, + const int32_t old_partition_count) +{ + return control_partition_split(app_name, split_control_type::CANCEL, -1, old_partition_count); +} + +error_with +replication_ddl_client::control_partition_split(const std::string &app_name, + split_control_type::type control_type, + const int32_t parent_pidx, + const int32_t old_partition_count) +{ + auto req = make_unique(); + req->__set_app_name(app_name); + req->__set_control_type(control_type); + req->__set_parent_pidx(parent_pidx); + req->__set_old_partition_count(old_partition_count); + return call_rpc_sync(control_split_rpc(std::move(req), RPC_CM_CONTROL_PARTITION_SPLIT)); +} + } // namespace replication } // namespace dsn diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 370a802d09..7beec8de5d 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -41,6 +41,7 @@ typedef rpc_holder contro typedef rpc_holder query_bulk_load_rpc; typedef rpc_holder start_split_rpc; +typedef rpc_holder control_split_rpc; class replication_options { diff --git a/src/common/replication_types.cpp b/src/common/replication_types.cpp index dd31aed20d..870bffe5fb 100644 --- a/src/common/replication_types.cpp +++ b/src/common/replication_types.cpp @@ -157,6 +157,13 @@ const std::map _duplication_fail_mode_VALUES_TO_NAMES( ::apache::thrift::TEnumIterator(3, _kduplication_fail_modeValues, _kduplication_fail_modeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +int _ksplit_control_typeValues[] = { + split_control_type::PAUSE, split_control_type::RESTART, split_control_type::CANCEL}; +const char *_ksplit_control_typeNames[] = {"PAUSE", "RESTART", "CANCEL"}; +const std::map _split_control_type_VALUES_TO_NAMES( + ::apache::thrift::TEnumIterator(3, _ksplit_control_typeValues, _ksplit_control_typeNames), + ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + int _kbulk_load_statusValues[] = {bulk_load_status::BLS_INVALID, bulk_load_status::BLS_DOWNLOADING, bulk_load_status::BLS_DOWNLOADED, @@ -14590,6 +14597,297 @@ void start_partition_split_response::printTo(std::ostream &out) const out << ")"; } +control_split_request::~control_split_request() throw() {} + +void control_split_request::__set_app_name(const std::string &val) { this->app_name = val; } + +void control_split_request::__set_control_type(const split_control_type::type val) +{ + this->control_type = val; +} + +void control_split_request::__set_parent_pidx(const int32_t val) { this->parent_pidx = val; } + +void control_split_request::__set_old_partition_count(const int32_t val) +{ + this->old_partition_count = val; + __isset.old_partition_count = true; +} + +uint32_t control_split_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->app_name); + this->__isset.app_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast646; + xfer += iprot->readI32(ecast646); + this->control_type = (split_control_type::type)ecast646; + this->__isset.control_type = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->parent_pidx); + this->__isset.parent_pidx = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->old_partition_count); + this->__isset.old_partition_count = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t control_split_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("control_split_request"); + + xfer += oprot->writeFieldBegin("app_name", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->app_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("control_type", ::apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32((int32_t)this->control_type); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("parent_pidx", ::apache::thrift::protocol::T_I32, 3); + xfer += oprot->writeI32(this->parent_pidx); + xfer += oprot->writeFieldEnd(); + + if (this->__isset.old_partition_count) { + xfer += oprot->writeFieldBegin("old_partition_count", ::apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32(this->old_partition_count); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(control_split_request &a, control_split_request &b) +{ + using ::std::swap; + swap(a.app_name, b.app_name); + swap(a.control_type, b.control_type); + swap(a.parent_pidx, b.parent_pidx); + swap(a.old_partition_count, b.old_partition_count); + swap(a.__isset, b.__isset); +} + +control_split_request::control_split_request(const control_split_request &other647) +{ + app_name = other647.app_name; + control_type = other647.control_type; + parent_pidx = other647.parent_pidx; + old_partition_count = other647.old_partition_count; + __isset = other647.__isset; +} +control_split_request::control_split_request(control_split_request &&other648) +{ + app_name = std::move(other648.app_name); + control_type = std::move(other648.control_type); + parent_pidx = std::move(other648.parent_pidx); + old_partition_count = std::move(other648.old_partition_count); + __isset = std::move(other648.__isset); +} +control_split_request &control_split_request::operator=(const control_split_request &other649) +{ + app_name = other649.app_name; + control_type = other649.control_type; + parent_pidx = other649.parent_pidx; + old_partition_count = other649.old_partition_count; + __isset = other649.__isset; + return *this; +} +control_split_request &control_split_request::operator=(control_split_request &&other650) +{ + app_name = std::move(other650.app_name); + control_type = std::move(other650.control_type); + parent_pidx = std::move(other650.parent_pidx); + old_partition_count = std::move(other650.old_partition_count); + __isset = std::move(other650.__isset); + return *this; +} +void control_split_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "control_split_request("; + out << "app_name=" << to_string(app_name); + out << ", " + << "control_type=" << to_string(control_type); + out << ", " + << "parent_pidx=" << to_string(parent_pidx); + out << ", " + << "old_partition_count="; + (__isset.old_partition_count ? (out << to_string(old_partition_count)) : (out << "")); + out << ")"; +} + +control_split_response::~control_split_response() throw() {} + +void control_split_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +void control_split_response::__set_hint_msg(const std::string &val) +{ + this->hint_msg = val; + __isset.hint_msg = true; +} + +uint32_t control_split_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->hint_msg); + this->__isset.hint_msg = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t control_split_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("control_split_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + if (this->__isset.hint_msg) { + xfer += oprot->writeFieldBegin("hint_msg", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->hint_msg); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(control_split_response &a, control_split_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.hint_msg, b.hint_msg); + swap(a.__isset, b.__isset); +} + +control_split_response::control_split_response(const control_split_response &other651) +{ + err = other651.err; + hint_msg = other651.hint_msg; + __isset = other651.__isset; +} +control_split_response::control_split_response(control_split_response &&other652) +{ + err = std::move(other652.err); + hint_msg = std::move(other652.hint_msg); + __isset = std::move(other652.__isset); +} +control_split_response &control_split_response::operator=(const control_split_response &other653) +{ + err = other653.err; + hint_msg = other653.hint_msg; + __isset = other653.__isset; + return *this; +} +control_split_response &control_split_response::operator=(control_split_response &&other654) +{ + err = std::move(other654.err); + hint_msg = std::move(other654.hint_msg); + __isset = std::move(other654.__isset); + return *this; +} +void control_split_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "control_split_response("; + out << "err=" << to_string(err); + out << ", " + << "hint_msg="; + (__isset.hint_msg ? (out << to_string(hint_msg)) : (out << "")); + out << ")"; +} + notify_catch_up_request::~notify_catch_up_request() throw() {} void notify_catch_up_request::__set_parent_gpid(const ::dsn::gpid &val) { this->parent_gpid = val; } @@ -14703,38 +15001,38 @@ void swap(notify_catch_up_request &a, notify_catch_up_request &b) swap(a.__isset, b.__isset); } -notify_catch_up_request::notify_catch_up_request(const notify_catch_up_request &other646) +notify_catch_up_request::notify_catch_up_request(const notify_catch_up_request &other655) { - parent_gpid = other646.parent_gpid; - child_gpid = other646.child_gpid; - child_ballot = other646.child_ballot; - child_address = other646.child_address; - __isset = other646.__isset; + parent_gpid = other655.parent_gpid; + child_gpid = other655.child_gpid; + child_ballot = other655.child_ballot; + child_address = other655.child_address; + __isset = other655.__isset; } -notify_catch_up_request::notify_catch_up_request(notify_catch_up_request &&other647) +notify_catch_up_request::notify_catch_up_request(notify_catch_up_request &&other656) { - parent_gpid = std::move(other647.parent_gpid); - child_gpid = std::move(other647.child_gpid); - child_ballot = std::move(other647.child_ballot); - child_address = std::move(other647.child_address); - __isset = std::move(other647.__isset); + parent_gpid = std::move(other656.parent_gpid); + child_gpid = std::move(other656.child_gpid); + child_ballot = std::move(other656.child_ballot); + child_address = std::move(other656.child_address); + __isset = std::move(other656.__isset); } -notify_catch_up_request ¬ify_catch_up_request::operator=(const notify_catch_up_request &other648) +notify_catch_up_request ¬ify_catch_up_request::operator=(const notify_catch_up_request &other657) { - parent_gpid = other648.parent_gpid; - child_gpid = other648.child_gpid; - child_ballot = other648.child_ballot; - child_address = other648.child_address; - __isset = other648.__isset; + parent_gpid = other657.parent_gpid; + child_gpid = other657.child_gpid; + child_ballot = other657.child_ballot; + child_address = other657.child_address; + __isset = other657.__isset; return *this; } -notify_catch_up_request ¬ify_catch_up_request::operator=(notify_catch_up_request &&other649) +notify_catch_up_request ¬ify_catch_up_request::operator=(notify_catch_up_request &&other658) { - parent_gpid = std::move(other649.parent_gpid); - child_gpid = std::move(other649.child_gpid); - child_ballot = std::move(other649.child_ballot); - child_address = std::move(other649.child_address); - __isset = std::move(other649.__isset); + parent_gpid = std::move(other658.parent_gpid); + child_gpid = std::move(other658.child_gpid); + child_ballot = std::move(other658.child_ballot); + child_address = std::move(other658.child_address); + __isset = std::move(other658.__isset); return *this; } void notify_catch_up_request::printTo(std::ostream &out) const @@ -14816,27 +15114,27 @@ void swap(notify_cacth_up_response &a, notify_cacth_up_response &b) swap(a.__isset, b.__isset); } -notify_cacth_up_response::notify_cacth_up_response(const notify_cacth_up_response &other650) +notify_cacth_up_response::notify_cacth_up_response(const notify_cacth_up_response &other659) { - err = other650.err; - __isset = other650.__isset; + err = other659.err; + __isset = other659.__isset; } -notify_cacth_up_response::notify_cacth_up_response(notify_cacth_up_response &&other651) +notify_cacth_up_response::notify_cacth_up_response(notify_cacth_up_response &&other660) { - err = std::move(other651.err); - __isset = std::move(other651.__isset); + err = std::move(other660.err); + __isset = std::move(other660.__isset); } notify_cacth_up_response ¬ify_cacth_up_response:: -operator=(const notify_cacth_up_response &other652) +operator=(const notify_cacth_up_response &other661) { - err = other652.err; - __isset = other652.__isset; + err = other661.err; + __isset = other661.__isset; return *this; } -notify_cacth_up_response ¬ify_cacth_up_response::operator=(notify_cacth_up_response &&other653) +notify_cacth_up_response ¬ify_cacth_up_response::operator=(notify_cacth_up_response &&other662) { - err = std::move(other653.err); - __isset = std::move(other653.__isset); + err = std::move(other662.err); + __isset = std::move(other662.__isset); return *this; } void notify_cacth_up_response::printTo(std::ostream &out) const @@ -14973,41 +15271,41 @@ void swap(update_child_group_partition_count_request &a, } update_child_group_partition_count_request::update_child_group_partition_count_request( - const update_child_group_partition_count_request &other654) + const update_child_group_partition_count_request &other663) { - target_address = other654.target_address; - new_partition_count = other654.new_partition_count; - child_pid = other654.child_pid; - ballot = other654.ballot; - __isset = other654.__isset; + target_address = other663.target_address; + new_partition_count = other663.new_partition_count; + child_pid = other663.child_pid; + ballot = other663.ballot; + __isset = other663.__isset; } update_child_group_partition_count_request::update_child_group_partition_count_request( - update_child_group_partition_count_request &&other655) + update_child_group_partition_count_request &&other664) { - target_address = std::move(other655.target_address); - new_partition_count = std::move(other655.new_partition_count); - child_pid = std::move(other655.child_pid); - ballot = std::move(other655.ballot); - __isset = std::move(other655.__isset); + target_address = std::move(other664.target_address); + new_partition_count = std::move(other664.new_partition_count); + child_pid = std::move(other664.child_pid); + ballot = std::move(other664.ballot); + __isset = std::move(other664.__isset); } update_child_group_partition_count_request &update_child_group_partition_count_request:: -operator=(const update_child_group_partition_count_request &other656) +operator=(const update_child_group_partition_count_request &other665) { - target_address = other656.target_address; - new_partition_count = other656.new_partition_count; - child_pid = other656.child_pid; - ballot = other656.ballot; - __isset = other656.__isset; + target_address = other665.target_address; + new_partition_count = other665.new_partition_count; + child_pid = other665.child_pid; + ballot = other665.ballot; + __isset = other665.__isset; return *this; } update_child_group_partition_count_request &update_child_group_partition_count_request:: -operator=(update_child_group_partition_count_request &&other657) +operator=(update_child_group_partition_count_request &&other666) { - target_address = std::move(other657.target_address); - new_partition_count = std::move(other657.new_partition_count); - child_pid = std::move(other657.child_pid); - ballot = std::move(other657.ballot); - __isset = std::move(other657.__isset); + target_address = std::move(other666.target_address); + new_partition_count = std::move(other666.new_partition_count); + child_pid = std::move(other666.child_pid); + ballot = std::move(other666.ballot); + __isset = std::move(other666.__isset); return *this; } void update_child_group_partition_count_request::printTo(std::ostream &out) const @@ -15098,29 +15396,29 @@ void swap(update_child_group_partition_count_response &a, } update_child_group_partition_count_response::update_child_group_partition_count_response( - const update_child_group_partition_count_response &other658) + const update_child_group_partition_count_response &other667) { - err = other658.err; - __isset = other658.__isset; + err = other667.err; + __isset = other667.__isset; } update_child_group_partition_count_response::update_child_group_partition_count_response( - update_child_group_partition_count_response &&other659) + update_child_group_partition_count_response &&other668) { - err = std::move(other659.err); - __isset = std::move(other659.__isset); + err = std::move(other668.err); + __isset = std::move(other668.__isset); } update_child_group_partition_count_response &update_child_group_partition_count_response:: -operator=(const update_child_group_partition_count_response &other660) +operator=(const update_child_group_partition_count_response &other669) { - err = other660.err; - __isset = other660.__isset; + err = other669.err; + __isset = other669.__isset; return *this; } update_child_group_partition_count_response &update_child_group_partition_count_response:: -operator=(update_child_group_partition_count_response &&other661) +operator=(update_child_group_partition_count_response &&other670) { - err = std::move(other661.err); - __isset = std::move(other661.__isset); + err = std::move(other670.err); + __isset = std::move(other670.__isset); return *this; } void update_child_group_partition_count_response::printTo(std::ostream &out) const @@ -15250,38 +15548,38 @@ void swap(register_child_request &a, register_child_request &b) swap(a.__isset, b.__isset); } -register_child_request::register_child_request(const register_child_request &other662) +register_child_request::register_child_request(const register_child_request &other671) { - app = other662.app; - parent_config = other662.parent_config; - child_config = other662.child_config; - primary_address = other662.primary_address; - __isset = other662.__isset; + app = other671.app; + parent_config = other671.parent_config; + child_config = other671.child_config; + primary_address = other671.primary_address; + __isset = other671.__isset; } -register_child_request::register_child_request(register_child_request &&other663) +register_child_request::register_child_request(register_child_request &&other672) { - app = std::move(other663.app); - parent_config = std::move(other663.parent_config); - child_config = std::move(other663.child_config); - primary_address = std::move(other663.primary_address); - __isset = std::move(other663.__isset); + app = std::move(other672.app); + parent_config = std::move(other672.parent_config); + child_config = std::move(other672.child_config); + primary_address = std::move(other672.primary_address); + __isset = std::move(other672.__isset); } -register_child_request ®ister_child_request::operator=(const register_child_request &other664) +register_child_request ®ister_child_request::operator=(const register_child_request &other673) { - app = other664.app; - parent_config = other664.parent_config; - child_config = other664.child_config; - primary_address = other664.primary_address; - __isset = other664.__isset; + app = other673.app; + parent_config = other673.parent_config; + child_config = other673.child_config; + primary_address = other673.primary_address; + __isset = other673.__isset; return *this; } -register_child_request ®ister_child_request::operator=(register_child_request &&other665) +register_child_request ®ister_child_request::operator=(register_child_request &&other674) { - app = std::move(other665.app); - parent_config = std::move(other665.parent_config); - child_config = std::move(other665.child_config); - primary_address = std::move(other665.primary_address); - __isset = std::move(other665.__isset); + app = std::move(other674.app); + parent_config = std::move(other674.parent_config); + child_config = std::move(other674.child_config); + primary_address = std::move(other674.primary_address); + __isset = std::move(other674.__isset); return *this; } void register_child_request::printTo(std::ostream &out) const @@ -15414,38 +15712,38 @@ void swap(register_child_response &a, register_child_response &b) swap(a.__isset, b.__isset); } -register_child_response::register_child_response(const register_child_response &other666) +register_child_response::register_child_response(const register_child_response &other675) { - err = other666.err; - app = other666.app; - parent_config = other666.parent_config; - child_config = other666.child_config; - __isset = other666.__isset; + err = other675.err; + app = other675.app; + parent_config = other675.parent_config; + child_config = other675.child_config; + __isset = other675.__isset; } -register_child_response::register_child_response(register_child_response &&other667) +register_child_response::register_child_response(register_child_response &&other676) { - err = std::move(other667.err); - app = std::move(other667.app); - parent_config = std::move(other667.parent_config); - child_config = std::move(other667.child_config); - __isset = std::move(other667.__isset); + err = std::move(other676.err); + app = std::move(other676.app); + parent_config = std::move(other676.parent_config); + child_config = std::move(other676.child_config); + __isset = std::move(other676.__isset); } -register_child_response ®ister_child_response::operator=(const register_child_response &other668) +register_child_response ®ister_child_response::operator=(const register_child_response &other677) { - err = other668.err; - app = other668.app; - parent_config = other668.parent_config; - child_config = other668.child_config; - __isset = other668.__isset; + err = other677.err; + app = other677.app; + parent_config = other677.parent_config; + child_config = other677.child_config; + __isset = other677.__isset; return *this; } -register_child_response ®ister_child_response::operator=(register_child_response &&other669) +register_child_response ®ister_child_response::operator=(register_child_response &&other678) { - err = std::move(other669.err); - app = std::move(other669.app); - parent_config = std::move(other669.parent_config); - child_config = std::move(other669.child_config); - __isset = std::move(other669.__isset); + err = std::move(other678.err); + app = std::move(other678.app); + parent_config = std::move(other678.parent_config); + child_config = std::move(other678.child_config); + __isset = std::move(other678.__isset); return *this; } void register_child_response::printTo(std::ostream &out) const @@ -15491,13 +15789,13 @@ uint32_t bulk_load_metadata::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_LIST) { { this->files.clear(); - uint32_t _size670; - ::apache::thrift::protocol::TType _etype673; - xfer += iprot->readListBegin(_etype673, _size670); - this->files.resize(_size670); - uint32_t _i674; - for (_i674 = 0; _i674 < _size670; ++_i674) { - xfer += this->files[_i674].read(iprot); + uint32_t _size679; + ::apache::thrift::protocol::TType _etype682; + xfer += iprot->readListBegin(_etype682, _size679); + this->files.resize(_size679); + uint32_t _i683; + for (_i683 = 0; _i683 < _size679; ++_i683) { + xfer += this->files[_i683].read(iprot); } xfer += iprot->readListEnd(); } @@ -15536,9 +15834,9 @@ uint32_t bulk_load_metadata::write(::apache::thrift::protocol::TProtocol *oprot) { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->files.size())); - std::vector::const_iterator _iter675; - for (_iter675 = this->files.begin(); _iter675 != this->files.end(); ++_iter675) { - xfer += (*_iter675).write(oprot); + std::vector::const_iterator _iter684; + for (_iter684 = this->files.begin(); _iter684 != this->files.end(); ++_iter684) { + xfer += (*_iter684).write(oprot); } xfer += oprot->writeListEnd(); } @@ -15561,30 +15859,30 @@ void swap(bulk_load_metadata &a, bulk_load_metadata &b) swap(a.__isset, b.__isset); } -bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other676) +bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other685) { - files = other676.files; - file_total_size = other676.file_total_size; - __isset = other676.__isset; + files = other685.files; + file_total_size = other685.file_total_size; + __isset = other685.__isset; } -bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other677) +bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other686) { - files = std::move(other677.files); - file_total_size = std::move(other677.file_total_size); - __isset = std::move(other677.__isset); + files = std::move(other686.files); + file_total_size = std::move(other686.file_total_size); + __isset = std::move(other686.__isset); } -bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other678) +bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other687) { - files = other678.files; - file_total_size = other678.file_total_size; - __isset = other678.__isset; + files = other687.files; + file_total_size = other687.file_total_size; + __isset = other687.__isset; return *this; } -bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other679) +bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other688) { - files = std::move(other679.files); - file_total_size = std::move(other679.file_total_size); - __isset = std::move(other679.__isset); + files = std::move(other688.files); + file_total_size = std::move(other688.file_total_size); + __isset = std::move(other688.__isset); return *this; } void bulk_load_metadata::printTo(std::ostream &out) const @@ -15716,38 +16014,38 @@ void swap(start_bulk_load_request &a, start_bulk_load_request &b) swap(a.__isset, b.__isset); } -start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other680) +start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other689) { - app_name = other680.app_name; - cluster_name = other680.cluster_name; - file_provider_type = other680.file_provider_type; - remote_root_path = other680.remote_root_path; - __isset = other680.__isset; + app_name = other689.app_name; + cluster_name = other689.cluster_name; + file_provider_type = other689.file_provider_type; + remote_root_path = other689.remote_root_path; + __isset = other689.__isset; } -start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other681) +start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other690) { - app_name = std::move(other681.app_name); - cluster_name = std::move(other681.cluster_name); - file_provider_type = std::move(other681.file_provider_type); - remote_root_path = std::move(other681.remote_root_path); - __isset = std::move(other681.__isset); + app_name = std::move(other690.app_name); + cluster_name = std::move(other690.cluster_name); + file_provider_type = std::move(other690.file_provider_type); + remote_root_path = std::move(other690.remote_root_path); + __isset = std::move(other690.__isset); } -start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other682) +start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other691) { - app_name = other682.app_name; - cluster_name = other682.cluster_name; - file_provider_type = other682.file_provider_type; - remote_root_path = other682.remote_root_path; - __isset = other682.__isset; + app_name = other691.app_name; + cluster_name = other691.cluster_name; + file_provider_type = other691.file_provider_type; + remote_root_path = other691.remote_root_path; + __isset = other691.__isset; return *this; } -start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other683) +start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other692) { - app_name = std::move(other683.app_name); - cluster_name = std::move(other683.cluster_name); - file_provider_type = std::move(other683.file_provider_type); - remote_root_path = std::move(other683.remote_root_path); - __isset = std::move(other683.__isset); + app_name = std::move(other692.app_name); + cluster_name = std::move(other692.cluster_name); + file_provider_type = std::move(other692.file_provider_type); + remote_root_path = std::move(other692.remote_root_path); + __isset = std::move(other692.__isset); return *this; } void start_bulk_load_request::printTo(std::ostream &out) const @@ -15844,31 +16142,31 @@ void swap(start_bulk_load_response &a, start_bulk_load_response &b) swap(a.__isset, b.__isset); } -start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other684) +start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other693) { - err = other684.err; - hint_msg = other684.hint_msg; - __isset = other684.__isset; + err = other693.err; + hint_msg = other693.hint_msg; + __isset = other693.__isset; } -start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other685) +start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other694) { - err = std::move(other685.err); - hint_msg = std::move(other685.hint_msg); - __isset = std::move(other685.__isset); + err = std::move(other694.err); + hint_msg = std::move(other694.hint_msg); + __isset = std::move(other694.__isset); } start_bulk_load_response &start_bulk_load_response:: -operator=(const start_bulk_load_response &other686) +operator=(const start_bulk_load_response &other695) { - err = other686.err; - hint_msg = other686.hint_msg; - __isset = other686.__isset; + err = other695.err; + hint_msg = other695.hint_msg; + __isset = other695.__isset; return *this; } -start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other687) +start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other696) { - err = std::move(other687.err); - hint_msg = std::move(other687.hint_msg); - __isset = std::move(other687.__isset); + err = std::move(other696.err); + hint_msg = std::move(other696.hint_msg); + __isset = std::move(other696.__isset); return *this; } void start_bulk_load_response::printTo(std::ostream &out) const @@ -15950,9 +16248,9 @@ uint32_t partition_bulk_load_state::read(::apache::thrift::protocol::TProtocol * break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast688; - xfer += iprot->readI32(ecast688); - this->ingest_status = (ingestion_status::type)ecast688; + int32_t ecast697; + xfer += iprot->readI32(ecast697); + this->ingest_status = (ingestion_status::type)ecast697; this->__isset.ingest_status = true; } else { xfer += iprot->skip(ftype); @@ -16033,44 +16331,44 @@ void swap(partition_bulk_load_state &a, partition_bulk_load_state &b) swap(a.__isset, b.__isset); } -partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other689) +partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other698) { - download_progress = other689.download_progress; - download_status = other689.download_status; - ingest_status = other689.ingest_status; - is_cleaned_up = other689.is_cleaned_up; - is_paused = other689.is_paused; - __isset = other689.__isset; + download_progress = other698.download_progress; + download_status = other698.download_status; + ingest_status = other698.ingest_status; + is_cleaned_up = other698.is_cleaned_up; + is_paused = other698.is_paused; + __isset = other698.__isset; } -partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other690) +partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other699) { - download_progress = std::move(other690.download_progress); - download_status = std::move(other690.download_status); - ingest_status = std::move(other690.ingest_status); - is_cleaned_up = std::move(other690.is_cleaned_up); - is_paused = std::move(other690.is_paused); - __isset = std::move(other690.__isset); + download_progress = std::move(other699.download_progress); + download_status = std::move(other699.download_status); + ingest_status = std::move(other699.ingest_status); + is_cleaned_up = std::move(other699.is_cleaned_up); + is_paused = std::move(other699.is_paused); + __isset = std::move(other699.__isset); } partition_bulk_load_state &partition_bulk_load_state:: -operator=(const partition_bulk_load_state &other691) -{ - download_progress = other691.download_progress; - download_status = other691.download_status; - ingest_status = other691.ingest_status; - is_cleaned_up = other691.is_cleaned_up; - is_paused = other691.is_paused; - __isset = other691.__isset; +operator=(const partition_bulk_load_state &other700) +{ + download_progress = other700.download_progress; + download_status = other700.download_status; + ingest_status = other700.ingest_status; + is_cleaned_up = other700.is_cleaned_up; + is_paused = other700.is_paused; + __isset = other700.__isset; return *this; } partition_bulk_load_state &partition_bulk_load_state:: -operator=(partition_bulk_load_state &&other692) -{ - download_progress = std::move(other692.download_progress); - download_status = std::move(other692.download_status); - ingest_status = std::move(other692.ingest_status); - is_cleaned_up = std::move(other692.is_cleaned_up); - is_paused = std::move(other692.is_paused); - __isset = std::move(other692.__isset); +operator=(partition_bulk_load_state &&other701) +{ + download_progress = std::move(other701.download_progress); + download_status = std::move(other701.download_status); + ingest_status = std::move(other701.ingest_status); + is_cleaned_up = std::move(other701.is_cleaned_up); + is_paused = std::move(other701.is_paused); + __isset = std::move(other701.__isset); return *this; } void partition_bulk_load_state::printTo(std::ostream &out) const @@ -16198,9 +16496,9 @@ uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) break; case 7: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast693; - xfer += iprot->readI32(ecast693); - this->meta_bulk_load_status = (bulk_load_status::type)ecast693; + int32_t ecast702; + xfer += iprot->readI32(ecast702); + this->meta_bulk_load_status = (bulk_load_status::type)ecast702; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16297,58 +16595,58 @@ void swap(bulk_load_request &a, bulk_load_request &b) swap(a.__isset, b.__isset); } -bulk_load_request::bulk_load_request(const bulk_load_request &other694) -{ - pid = other694.pid; - app_name = other694.app_name; - primary_addr = other694.primary_addr; - remote_provider_name = other694.remote_provider_name; - cluster_name = other694.cluster_name; - ballot = other694.ballot; - meta_bulk_load_status = other694.meta_bulk_load_status; - query_bulk_load_metadata = other694.query_bulk_load_metadata; - remote_root_path = other694.remote_root_path; - __isset = other694.__isset; -} -bulk_load_request::bulk_load_request(bulk_load_request &&other695) -{ - pid = std::move(other695.pid); - app_name = std::move(other695.app_name); - primary_addr = std::move(other695.primary_addr); - remote_provider_name = std::move(other695.remote_provider_name); - cluster_name = std::move(other695.cluster_name); - ballot = std::move(other695.ballot); - meta_bulk_load_status = std::move(other695.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other695.query_bulk_load_metadata); - remote_root_path = std::move(other695.remote_root_path); - __isset = std::move(other695.__isset); -} -bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other696) -{ - pid = other696.pid; - app_name = other696.app_name; - primary_addr = other696.primary_addr; - remote_provider_name = other696.remote_provider_name; - cluster_name = other696.cluster_name; - ballot = other696.ballot; - meta_bulk_load_status = other696.meta_bulk_load_status; - query_bulk_load_metadata = other696.query_bulk_load_metadata; - remote_root_path = other696.remote_root_path; - __isset = other696.__isset; +bulk_load_request::bulk_load_request(const bulk_load_request &other703) +{ + pid = other703.pid; + app_name = other703.app_name; + primary_addr = other703.primary_addr; + remote_provider_name = other703.remote_provider_name; + cluster_name = other703.cluster_name; + ballot = other703.ballot; + meta_bulk_load_status = other703.meta_bulk_load_status; + query_bulk_load_metadata = other703.query_bulk_load_metadata; + remote_root_path = other703.remote_root_path; + __isset = other703.__isset; +} +bulk_load_request::bulk_load_request(bulk_load_request &&other704) +{ + pid = std::move(other704.pid); + app_name = std::move(other704.app_name); + primary_addr = std::move(other704.primary_addr); + remote_provider_name = std::move(other704.remote_provider_name); + cluster_name = std::move(other704.cluster_name); + ballot = std::move(other704.ballot); + meta_bulk_load_status = std::move(other704.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other704.query_bulk_load_metadata); + remote_root_path = std::move(other704.remote_root_path); + __isset = std::move(other704.__isset); +} +bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other705) +{ + pid = other705.pid; + app_name = other705.app_name; + primary_addr = other705.primary_addr; + remote_provider_name = other705.remote_provider_name; + cluster_name = other705.cluster_name; + ballot = other705.ballot; + meta_bulk_load_status = other705.meta_bulk_load_status; + query_bulk_load_metadata = other705.query_bulk_load_metadata; + remote_root_path = other705.remote_root_path; + __isset = other705.__isset; return *this; } -bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other697) -{ - pid = std::move(other697.pid); - app_name = std::move(other697.app_name); - primary_addr = std::move(other697.primary_addr); - remote_provider_name = std::move(other697.remote_provider_name); - cluster_name = std::move(other697.cluster_name); - ballot = std::move(other697.ballot); - meta_bulk_load_status = std::move(other697.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other697.query_bulk_load_metadata); - remote_root_path = std::move(other697.remote_root_path); - __isset = std::move(other697.__isset); +bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other706) +{ + pid = std::move(other706.pid); + app_name = std::move(other706.app_name); + primary_addr = std::move(other706.primary_addr); + remote_provider_name = std::move(other706.remote_provider_name); + cluster_name = std::move(other706.cluster_name); + ballot = std::move(other706.ballot); + meta_bulk_load_status = std::move(other706.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other706.query_bulk_load_metadata); + remote_root_path = std::move(other706.remote_root_path); + __isset = std::move(other706.__isset); return *this; } void bulk_load_request::printTo(std::ostream &out) const @@ -16469,9 +16767,9 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) break; case 4: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast698; - xfer += iprot->readI32(ecast698); - this->primary_bulk_load_status = (bulk_load_status::type)ecast698; + int32_t ecast707; + xfer += iprot->readI32(ecast707); + this->primary_bulk_load_status = (bulk_load_status::type)ecast707; this->__isset.primary_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16481,16 +16779,16 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_MAP) { { this->group_bulk_load_state.clear(); - uint32_t _size699; - ::apache::thrift::protocol::TType _ktype700; - ::apache::thrift::protocol::TType _vtype701; - xfer += iprot->readMapBegin(_ktype700, _vtype701, _size699); - uint32_t _i703; - for (_i703 = 0; _i703 < _size699; ++_i703) { - ::dsn::rpc_address _key704; - xfer += _key704.read(iprot); - partition_bulk_load_state &_val705 = this->group_bulk_load_state[_key704]; - xfer += _val705.read(iprot); + uint32_t _size708; + ::apache::thrift::protocol::TType _ktype709; + ::apache::thrift::protocol::TType _vtype710; + xfer += iprot->readMapBegin(_ktype709, _vtype710, _size708); + uint32_t _i712; + for (_i712 = 0; _i712 < _size708; ++_i712) { + ::dsn::rpc_address _key713; + xfer += _key713.read(iprot); + partition_bulk_load_state &_val714 = this->group_bulk_load_state[_key713]; + xfer += _val714.read(iprot); } xfer += iprot->readMapEnd(); } @@ -16579,12 +16877,12 @@ uint32_t bulk_load_response::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, static_cast(this->group_bulk_load_state.size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter706; - for (_iter706 = this->group_bulk_load_state.begin(); - _iter706 != this->group_bulk_load_state.end(); - ++_iter706) { - xfer += _iter706->first.write(oprot); - xfer += _iter706->second.write(oprot); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter715; + for (_iter715 = this->group_bulk_load_state.begin(); + _iter715 != this->group_bulk_load_state.end(); + ++_iter715) { + xfer += _iter715->first.write(oprot); + xfer += _iter715->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -16640,64 +16938,64 @@ void swap(bulk_load_response &a, bulk_load_response &b) swap(a.__isset, b.__isset); } -bulk_load_response::bulk_load_response(const bulk_load_response &other707) -{ - err = other707.err; - pid = other707.pid; - app_name = other707.app_name; - primary_bulk_load_status = other707.primary_bulk_load_status; - group_bulk_load_state = other707.group_bulk_load_state; - metadata = other707.metadata; - total_download_progress = other707.total_download_progress; - is_group_ingestion_finished = other707.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other707.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other707.is_group_bulk_load_paused; - __isset = other707.__isset; -} -bulk_load_response::bulk_load_response(bulk_load_response &&other708) -{ - err = std::move(other708.err); - pid = std::move(other708.pid); - app_name = std::move(other708.app_name); - primary_bulk_load_status = std::move(other708.primary_bulk_load_status); - group_bulk_load_state = std::move(other708.group_bulk_load_state); - metadata = std::move(other708.metadata); - total_download_progress = std::move(other708.total_download_progress); - is_group_ingestion_finished = std::move(other708.is_group_ingestion_finished); +bulk_load_response::bulk_load_response(const bulk_load_response &other716) +{ + err = other716.err; + pid = other716.pid; + app_name = other716.app_name; + primary_bulk_load_status = other716.primary_bulk_load_status; + group_bulk_load_state = other716.group_bulk_load_state; + metadata = other716.metadata; + total_download_progress = other716.total_download_progress; + is_group_ingestion_finished = other716.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other716.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other716.is_group_bulk_load_paused; + __isset = other716.__isset; +} +bulk_load_response::bulk_load_response(bulk_load_response &&other717) +{ + err = std::move(other717.err); + pid = std::move(other717.pid); + app_name = std::move(other717.app_name); + primary_bulk_load_status = std::move(other717.primary_bulk_load_status); + group_bulk_load_state = std::move(other717.group_bulk_load_state); + metadata = std::move(other717.metadata); + total_download_progress = std::move(other717.total_download_progress); + is_group_ingestion_finished = std::move(other717.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other708.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other708.is_group_bulk_load_paused); - __isset = std::move(other708.__isset); -} -bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other709) -{ - err = other709.err; - pid = other709.pid; - app_name = other709.app_name; - primary_bulk_load_status = other709.primary_bulk_load_status; - group_bulk_load_state = other709.group_bulk_load_state; - metadata = other709.metadata; - total_download_progress = other709.total_download_progress; - is_group_ingestion_finished = other709.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other709.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other709.is_group_bulk_load_paused; - __isset = other709.__isset; + std::move(other717.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other717.is_group_bulk_load_paused); + __isset = std::move(other717.__isset); +} +bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other718) +{ + err = other718.err; + pid = other718.pid; + app_name = other718.app_name; + primary_bulk_load_status = other718.primary_bulk_load_status; + group_bulk_load_state = other718.group_bulk_load_state; + metadata = other718.metadata; + total_download_progress = other718.total_download_progress; + is_group_ingestion_finished = other718.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other718.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other718.is_group_bulk_load_paused; + __isset = other718.__isset; return *this; } -bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other710) +bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other719) { - err = std::move(other710.err); - pid = std::move(other710.pid); - app_name = std::move(other710.app_name); - primary_bulk_load_status = std::move(other710.primary_bulk_load_status); - group_bulk_load_state = std::move(other710.group_bulk_load_state); - metadata = std::move(other710.metadata); - total_download_progress = std::move(other710.total_download_progress); - is_group_ingestion_finished = std::move(other710.is_group_ingestion_finished); + err = std::move(other719.err); + pid = std::move(other719.pid); + app_name = std::move(other719.app_name); + primary_bulk_load_status = std::move(other719.primary_bulk_load_status); + group_bulk_load_state = std::move(other719.group_bulk_load_state); + metadata = std::move(other719.metadata); + total_download_progress = std::move(other719.total_download_progress); + is_group_ingestion_finished = std::move(other719.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other710.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other710.is_group_bulk_load_paused); - __isset = std::move(other710.__isset); + std::move(other719.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other719.is_group_bulk_load_paused); + __isset = std::move(other719.__isset); return *this; } void bulk_load_response::printTo(std::ostream &out) const @@ -16828,9 +17126,9 @@ uint32_t group_bulk_load_request::read(::apache::thrift::protocol::TProtocol *ip break; case 6: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast711; - xfer += iprot->readI32(ecast711); - this->meta_bulk_load_status = (bulk_load_status::type)ecast711; + int32_t ecast720; + xfer += iprot->readI32(ecast720); + this->meta_bulk_load_status = (bulk_load_status::type)ecast720; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16908,50 +17206,50 @@ void swap(group_bulk_load_request &a, group_bulk_load_request &b) swap(a.__isset, b.__isset); } -group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other712) -{ - app_name = other712.app_name; - target_address = other712.target_address; - config = other712.config; - provider_name = other712.provider_name; - cluster_name = other712.cluster_name; - meta_bulk_load_status = other712.meta_bulk_load_status; - remote_root_path = other712.remote_root_path; - __isset = other712.__isset; -} -group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other713) -{ - app_name = std::move(other713.app_name); - target_address = std::move(other713.target_address); - config = std::move(other713.config); - provider_name = std::move(other713.provider_name); - cluster_name = std::move(other713.cluster_name); - meta_bulk_load_status = std::move(other713.meta_bulk_load_status); - remote_root_path = std::move(other713.remote_root_path); - __isset = std::move(other713.__isset); -} -group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other714) -{ - app_name = other714.app_name; - target_address = other714.target_address; - config = other714.config; - provider_name = other714.provider_name; - cluster_name = other714.cluster_name; - meta_bulk_load_status = other714.meta_bulk_load_status; - remote_root_path = other714.remote_root_path; - __isset = other714.__isset; +group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other721) +{ + app_name = other721.app_name; + target_address = other721.target_address; + config = other721.config; + provider_name = other721.provider_name; + cluster_name = other721.cluster_name; + meta_bulk_load_status = other721.meta_bulk_load_status; + remote_root_path = other721.remote_root_path; + __isset = other721.__isset; +} +group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other722) +{ + app_name = std::move(other722.app_name); + target_address = std::move(other722.target_address); + config = std::move(other722.config); + provider_name = std::move(other722.provider_name); + cluster_name = std::move(other722.cluster_name); + meta_bulk_load_status = std::move(other722.meta_bulk_load_status); + remote_root_path = std::move(other722.remote_root_path); + __isset = std::move(other722.__isset); +} +group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other723) +{ + app_name = other723.app_name; + target_address = other723.target_address; + config = other723.config; + provider_name = other723.provider_name; + cluster_name = other723.cluster_name; + meta_bulk_load_status = other723.meta_bulk_load_status; + remote_root_path = other723.remote_root_path; + __isset = other723.__isset; return *this; } -group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other715) +group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other724) { - app_name = std::move(other715.app_name); - target_address = std::move(other715.target_address); - config = std::move(other715.config); - provider_name = std::move(other715.provider_name); - cluster_name = std::move(other715.cluster_name); - meta_bulk_load_status = std::move(other715.meta_bulk_load_status); - remote_root_path = std::move(other715.remote_root_path); - __isset = std::move(other715.__isset); + app_name = std::move(other724.app_name); + target_address = std::move(other724.target_address); + config = std::move(other724.config); + provider_name = std::move(other724.provider_name); + cluster_name = std::move(other724.cluster_name); + meta_bulk_load_status = std::move(other724.meta_bulk_load_status); + remote_root_path = std::move(other724.remote_root_path); + __isset = std::move(other724.__isset); return *this; } void group_bulk_load_request::printTo(std::ostream &out) const @@ -17017,9 +17315,9 @@ uint32_t group_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast716; - xfer += iprot->readI32(ecast716); - this->status = (bulk_load_status::type)ecast716; + int32_t ecast725; + xfer += iprot->readI32(ecast725); + this->status = (bulk_load_status::type)ecast725; this->__isset.status = true; } else { xfer += iprot->skip(ftype); @@ -17077,35 +17375,35 @@ void swap(group_bulk_load_response &a, group_bulk_load_response &b) swap(a.__isset, b.__isset); } -group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other717) +group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other726) { - err = other717.err; - status = other717.status; - bulk_load_state = other717.bulk_load_state; - __isset = other717.__isset; + err = other726.err; + status = other726.status; + bulk_load_state = other726.bulk_load_state; + __isset = other726.__isset; } -group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other718) +group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other727) { - err = std::move(other718.err); - status = std::move(other718.status); - bulk_load_state = std::move(other718.bulk_load_state); - __isset = std::move(other718.__isset); + err = std::move(other727.err); + status = std::move(other727.status); + bulk_load_state = std::move(other727.bulk_load_state); + __isset = std::move(other727.__isset); } group_bulk_load_response &group_bulk_load_response:: -operator=(const group_bulk_load_response &other719) +operator=(const group_bulk_load_response &other728) { - err = other719.err; - status = other719.status; - bulk_load_state = other719.bulk_load_state; - __isset = other719.__isset; + err = other728.err; + status = other728.status; + bulk_load_state = other728.bulk_load_state; + __isset = other728.__isset; return *this; } -group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other720) +group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other729) { - err = std::move(other720.err); - status = std::move(other720.status); - bulk_load_state = std::move(other720.bulk_load_state); - __isset = std::move(other720.__isset); + err = std::move(other729.err); + status = std::move(other729.status); + bulk_load_state = std::move(other729.bulk_load_state); + __isset = std::move(other729.__isset); return *this; } void group_bulk_load_response::printTo(std::ostream &out) const @@ -17200,30 +17498,30 @@ void swap(ingestion_request &a, ingestion_request &b) swap(a.__isset, b.__isset); } -ingestion_request::ingestion_request(const ingestion_request &other721) +ingestion_request::ingestion_request(const ingestion_request &other730) { - app_name = other721.app_name; - metadata = other721.metadata; - __isset = other721.__isset; + app_name = other730.app_name; + metadata = other730.metadata; + __isset = other730.__isset; } -ingestion_request::ingestion_request(ingestion_request &&other722) +ingestion_request::ingestion_request(ingestion_request &&other731) { - app_name = std::move(other722.app_name); - metadata = std::move(other722.metadata); - __isset = std::move(other722.__isset); + app_name = std::move(other731.app_name); + metadata = std::move(other731.metadata); + __isset = std::move(other731.__isset); } -ingestion_request &ingestion_request::operator=(const ingestion_request &other723) +ingestion_request &ingestion_request::operator=(const ingestion_request &other732) { - app_name = other723.app_name; - metadata = other723.metadata; - __isset = other723.__isset; + app_name = other732.app_name; + metadata = other732.metadata; + __isset = other732.__isset; return *this; } -ingestion_request &ingestion_request::operator=(ingestion_request &&other724) +ingestion_request &ingestion_request::operator=(ingestion_request &&other733) { - app_name = std::move(other724.app_name); - metadata = std::move(other724.metadata); - __isset = std::move(other724.__isset); + app_name = std::move(other733.app_name); + metadata = std::move(other733.metadata); + __isset = std::move(other733.__isset); return *this; } void ingestion_request::printTo(std::ostream &out) const @@ -17316,30 +17614,30 @@ void swap(ingestion_response &a, ingestion_response &b) swap(a.__isset, b.__isset); } -ingestion_response::ingestion_response(const ingestion_response &other725) +ingestion_response::ingestion_response(const ingestion_response &other734) { - err = other725.err; - rocksdb_error = other725.rocksdb_error; - __isset = other725.__isset; + err = other734.err; + rocksdb_error = other734.rocksdb_error; + __isset = other734.__isset; } -ingestion_response::ingestion_response(ingestion_response &&other726) +ingestion_response::ingestion_response(ingestion_response &&other735) { - err = std::move(other726.err); - rocksdb_error = std::move(other726.rocksdb_error); - __isset = std::move(other726.__isset); + err = std::move(other735.err); + rocksdb_error = std::move(other735.rocksdb_error); + __isset = std::move(other735.__isset); } -ingestion_response &ingestion_response::operator=(const ingestion_response &other727) +ingestion_response &ingestion_response::operator=(const ingestion_response &other736) { - err = other727.err; - rocksdb_error = other727.rocksdb_error; - __isset = other727.__isset; + err = other736.err; + rocksdb_error = other736.rocksdb_error; + __isset = other736.__isset; return *this; } -ingestion_response &ingestion_response::operator=(ingestion_response &&other728) +ingestion_response &ingestion_response::operator=(ingestion_response &&other737) { - err = std::move(other728.err); - rocksdb_error = std::move(other728.rocksdb_error); - __isset = std::move(other728.__isset); + err = std::move(other737.err); + rocksdb_error = std::move(other737.rocksdb_error); + __isset = std::move(other737.__isset); return *this; } void ingestion_response::printTo(std::ostream &out) const @@ -17390,9 +17688,9 @@ uint32_t control_bulk_load_request::read(::apache::thrift::protocol::TProtocol * break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast729; - xfer += iprot->readI32(ecast729); - this->type = (bulk_load_control_type::type)ecast729; + int32_t ecast738; + xfer += iprot->readI32(ecast738); + this->type = (bulk_load_control_type::type)ecast738; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -17437,32 +17735,32 @@ void swap(control_bulk_load_request &a, control_bulk_load_request &b) swap(a.__isset, b.__isset); } -control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other730) +control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other739) { - app_name = other730.app_name; - type = other730.type; - __isset = other730.__isset; + app_name = other739.app_name; + type = other739.type; + __isset = other739.__isset; } -control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other731) +control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other740) { - app_name = std::move(other731.app_name); - type = std::move(other731.type); - __isset = std::move(other731.__isset); + app_name = std::move(other740.app_name); + type = std::move(other740.type); + __isset = std::move(other740.__isset); } control_bulk_load_request &control_bulk_load_request:: -operator=(const control_bulk_load_request &other732) +operator=(const control_bulk_load_request &other741) { - app_name = other732.app_name; - type = other732.type; - __isset = other732.__isset; + app_name = other741.app_name; + type = other741.type; + __isset = other741.__isset; return *this; } control_bulk_load_request &control_bulk_load_request:: -operator=(control_bulk_load_request &&other733) +operator=(control_bulk_load_request &&other742) { - app_name = std::move(other733.app_name); - type = std::move(other733.type); - __isset = std::move(other733.__isset); + app_name = std::move(other742.app_name); + type = std::move(other742.type); + __isset = std::move(other742.__isset); return *this; } void control_bulk_load_request::printTo(std::ostream &out) const @@ -17560,32 +17858,32 @@ void swap(control_bulk_load_response &a, control_bulk_load_response &b) swap(a.__isset, b.__isset); } -control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other734) +control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other743) { - err = other734.err; - hint_msg = other734.hint_msg; - __isset = other734.__isset; + err = other743.err; + hint_msg = other743.hint_msg; + __isset = other743.__isset; } -control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other735) +control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other744) { - err = std::move(other735.err); - hint_msg = std::move(other735.hint_msg); - __isset = std::move(other735.__isset); + err = std::move(other744.err); + hint_msg = std::move(other744.hint_msg); + __isset = std::move(other744.__isset); } control_bulk_load_response &control_bulk_load_response:: -operator=(const control_bulk_load_response &other736) +operator=(const control_bulk_load_response &other745) { - err = other736.err; - hint_msg = other736.hint_msg; - __isset = other736.__isset; + err = other745.err; + hint_msg = other745.hint_msg; + __isset = other745.__isset; return *this; } control_bulk_load_response &control_bulk_load_response:: -operator=(control_bulk_load_response &&other737) +operator=(control_bulk_load_response &&other746) { - err = std::move(other737.err); - hint_msg = std::move(other737.hint_msg); - __isset = std::move(other737.__isset); + err = std::move(other746.err); + hint_msg = std::move(other746.hint_msg); + __isset = std::move(other746.__isset); return *this; } void control_bulk_load_response::printTo(std::ostream &out) const @@ -17664,26 +17962,26 @@ void swap(query_bulk_load_request &a, query_bulk_load_request &b) swap(a.__isset, b.__isset); } -query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other738) +query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other747) { - app_name = other738.app_name; - __isset = other738.__isset; + app_name = other747.app_name; + __isset = other747.__isset; } -query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other739) +query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other748) { - app_name = std::move(other739.app_name); - __isset = std::move(other739.__isset); + app_name = std::move(other748.app_name); + __isset = std::move(other748.__isset); } -query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other740) +query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other749) { - app_name = other740.app_name; - __isset = other740.__isset; + app_name = other749.app_name; + __isset = other749.__isset; return *this; } -query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other741) +query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other750) { - app_name = std::move(other741.app_name); - __isset = std::move(other741.__isset); + app_name = std::move(other750.app_name); + __isset = std::move(other750.__isset); return *this; } void query_bulk_load_request::printTo(std::ostream &out) const @@ -17765,9 +18063,9 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast742; - xfer += iprot->readI32(ecast742); - this->app_status = (bulk_load_status::type)ecast742; + int32_t ecast751; + xfer += iprot->readI32(ecast751); + this->app_status = (bulk_load_status::type)ecast751; this->__isset.app_status = true; } else { xfer += iprot->skip(ftype); @@ -17777,15 +18075,15 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->partitions_status.clear(); - uint32_t _size743; - ::apache::thrift::protocol::TType _etype746; - xfer += iprot->readListBegin(_etype746, _size743); - this->partitions_status.resize(_size743); - uint32_t _i747; - for (_i747 = 0; _i747 < _size743; ++_i747) { - int32_t ecast748; - xfer += iprot->readI32(ecast748); - this->partitions_status[_i747] = (bulk_load_status::type)ecast748; + uint32_t _size752; + ::apache::thrift::protocol::TType _etype755; + xfer += iprot->readListBegin(_etype755, _size752); + this->partitions_status.resize(_size752); + uint32_t _i756; + for (_i756 = 0; _i756 < _size752; ++_i756) { + int32_t ecast757; + xfer += iprot->readI32(ecast757); + this->partitions_status[_i756] = (bulk_load_status::type)ecast757; } xfer += iprot->readListEnd(); } @@ -17806,25 +18104,25 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->bulk_load_states.clear(); - uint32_t _size749; - ::apache::thrift::protocol::TType _etype752; - xfer += iprot->readListBegin(_etype752, _size749); - this->bulk_load_states.resize(_size749); - uint32_t _i753; - for (_i753 = 0; _i753 < _size749; ++_i753) { + uint32_t _size758; + ::apache::thrift::protocol::TType _etype761; + xfer += iprot->readListBegin(_etype761, _size758); + this->bulk_load_states.resize(_size758); + uint32_t _i762; + for (_i762 = 0; _i762 < _size758; ++_i762) { { - this->bulk_load_states[_i753].clear(); - uint32_t _size754; - ::apache::thrift::protocol::TType _ktype755; - ::apache::thrift::protocol::TType _vtype756; - xfer += iprot->readMapBegin(_ktype755, _vtype756, _size754); - uint32_t _i758; - for (_i758 = 0; _i758 < _size754; ++_i758) { - ::dsn::rpc_address _key759; - xfer += _key759.read(iprot); - partition_bulk_load_state &_val760 = - this->bulk_load_states[_i753][_key759]; - xfer += _val760.read(iprot); + this->bulk_load_states[_i762].clear(); + uint32_t _size763; + ::apache::thrift::protocol::TType _ktype764; + ::apache::thrift::protocol::TType _vtype765; + xfer += iprot->readMapBegin(_ktype764, _vtype765, _size763); + uint32_t _i767; + for (_i767 = 0; _i767 < _size763; ++_i767) { + ::dsn::rpc_address _key768; + xfer += _key768.read(iprot); + partition_bulk_load_state &_val769 = + this->bulk_load_states[_i762][_key768]; + xfer += _val769.read(iprot); } xfer += iprot->readMapEnd(); } @@ -17878,10 +18176,10 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_I32, static_cast(this->partitions_status.size())); - std::vector::const_iterator _iter761; - for (_iter761 = this->partitions_status.begin(); _iter761 != this->partitions_status.end(); - ++_iter761) { - xfer += oprot->writeI32((int32_t)(*_iter761)); + std::vector::const_iterator _iter770; + for (_iter770 = this->partitions_status.begin(); _iter770 != this->partitions_status.end(); + ++_iter770) { + xfer += oprot->writeI32((int32_t)(*_iter770)); } xfer += oprot->writeListEnd(); } @@ -17896,17 +18194,17 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * xfer += oprot->writeListBegin(::apache::thrift::protocol::T_MAP, static_cast(this->bulk_load_states.size())); std::vector>::const_iterator - _iter762; - for (_iter762 = this->bulk_load_states.begin(); _iter762 != this->bulk_load_states.end(); - ++_iter762) { + _iter771; + for (_iter771 = this->bulk_load_states.begin(); _iter771 != this->bulk_load_states.end(); + ++_iter771) { { xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, - static_cast((*_iter762).size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter763; - for (_iter763 = (*_iter762).begin(); _iter763 != (*_iter762).end(); ++_iter763) { - xfer += _iter763->first.write(oprot); - xfer += _iter763->second.write(oprot); + static_cast((*_iter771).size())); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter772; + for (_iter772 = (*_iter771).begin(); _iter772 != (*_iter771).end(); ++_iter772) { + xfer += _iter772->first.write(oprot); + xfer += _iter772->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -17938,51 +18236,51 @@ void swap(query_bulk_load_response &a, query_bulk_load_response &b) swap(a.__isset, b.__isset); } -query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other764) +query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other773) { - err = other764.err; - app_name = other764.app_name; - app_status = other764.app_status; - partitions_status = other764.partitions_status; - max_replica_count = other764.max_replica_count; - bulk_load_states = other764.bulk_load_states; - hint_msg = other764.hint_msg; - __isset = other764.__isset; + err = other773.err; + app_name = other773.app_name; + app_status = other773.app_status; + partitions_status = other773.partitions_status; + max_replica_count = other773.max_replica_count; + bulk_load_states = other773.bulk_load_states; + hint_msg = other773.hint_msg; + __isset = other773.__isset; } -query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other765) +query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other774) { - err = std::move(other765.err); - app_name = std::move(other765.app_name); - app_status = std::move(other765.app_status); - partitions_status = std::move(other765.partitions_status); - max_replica_count = std::move(other765.max_replica_count); - bulk_load_states = std::move(other765.bulk_load_states); - hint_msg = std::move(other765.hint_msg); - __isset = std::move(other765.__isset); + err = std::move(other774.err); + app_name = std::move(other774.app_name); + app_status = std::move(other774.app_status); + partitions_status = std::move(other774.partitions_status); + max_replica_count = std::move(other774.max_replica_count); + bulk_load_states = std::move(other774.bulk_load_states); + hint_msg = std::move(other774.hint_msg); + __isset = std::move(other774.__isset); } query_bulk_load_response &query_bulk_load_response:: -operator=(const query_bulk_load_response &other766) -{ - err = other766.err; - app_name = other766.app_name; - app_status = other766.app_status; - partitions_status = other766.partitions_status; - max_replica_count = other766.max_replica_count; - bulk_load_states = other766.bulk_load_states; - hint_msg = other766.hint_msg; - __isset = other766.__isset; +operator=(const query_bulk_load_response &other775) +{ + err = other775.err; + app_name = other775.app_name; + app_status = other775.app_status; + partitions_status = other775.partitions_status; + max_replica_count = other775.max_replica_count; + bulk_load_states = other775.bulk_load_states; + hint_msg = other775.hint_msg; + __isset = other775.__isset; return *this; } -query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other767) +query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other776) { - err = std::move(other767.err); - app_name = std::move(other767.app_name); - app_status = std::move(other767.app_status); - partitions_status = std::move(other767.partitions_status); - max_replica_count = std::move(other767.max_replica_count); - bulk_load_states = std::move(other767.bulk_load_states); - hint_msg = std::move(other767.hint_msg); - __isset = std::move(other767.__isset); + err = std::move(other776.err); + app_name = std::move(other776.app_name); + app_status = std::move(other776.app_status); + partitions_status = std::move(other776.partitions_status); + max_replica_count = std::move(other776.max_replica_count); + bulk_load_states = std::move(other776.bulk_load_states); + hint_msg = std::move(other776.hint_msg); + __isset = std::move(other776.__isset); return *this; } void query_bulk_load_response::printTo(std::ostream &out) const @@ -18035,9 +18333,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro switch (fid) { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast768; - xfer += iprot->readI32(ecast768); - this->type = (hotkey_type::type)ecast768; + int32_t ecast777; + xfer += iprot->readI32(ecast777); + this->type = (hotkey_type::type)ecast777; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -18045,9 +18343,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast769; - xfer += iprot->readI32(ecast769); - this->action = (detect_action::type)ecast769; + int32_t ecast778; + xfer += iprot->readI32(ecast778); + this->action = (detect_action::type)ecast778; this->__isset.action = true; } else { xfer += iprot->skip(ftype); @@ -18105,34 +18403,34 @@ void swap(detect_hotkey_request &a, detect_hotkey_request &b) swap(a.__isset, b.__isset); } -detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other770) +detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other779) { - type = other770.type; - action = other770.action; - pid = other770.pid; - __isset = other770.__isset; + type = other779.type; + action = other779.action; + pid = other779.pid; + __isset = other779.__isset; } -detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other771) +detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other780) { - type = std::move(other771.type); - action = std::move(other771.action); - pid = std::move(other771.pid); - __isset = std::move(other771.__isset); + type = std::move(other780.type); + action = std::move(other780.action); + pid = std::move(other780.pid); + __isset = std::move(other780.__isset); } -detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other772) +detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other781) { - type = other772.type; - action = other772.action; - pid = other772.pid; - __isset = other772.__isset; + type = other781.type; + action = other781.action; + pid = other781.pid; + __isset = other781.__isset; return *this; } -detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other773) +detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other782) { - type = std::move(other773.type); - action = std::move(other773.action); - pid = std::move(other773.pid); - __isset = std::move(other773.__isset); + type = std::move(other782.type); + action = std::move(other782.action); + pid = std::move(other782.pid); + __isset = std::move(other782.__isset); return *this; } void detect_hotkey_request::printTo(std::ostream &out) const @@ -18252,34 +18550,34 @@ void swap(detect_hotkey_response &a, detect_hotkey_response &b) swap(a.__isset, b.__isset); } -detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other774) +detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other783) { - err = other774.err; - err_hint = other774.err_hint; - hotkey_result = other774.hotkey_result; - __isset = other774.__isset; + err = other783.err; + err_hint = other783.err_hint; + hotkey_result = other783.hotkey_result; + __isset = other783.__isset; } -detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other775) +detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other784) { - err = std::move(other775.err); - err_hint = std::move(other775.err_hint); - hotkey_result = std::move(other775.hotkey_result); - __isset = std::move(other775.__isset); + err = std::move(other784.err); + err_hint = std::move(other784.err_hint); + hotkey_result = std::move(other784.hotkey_result); + __isset = std::move(other784.__isset); } -detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other776) +detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other785) { - err = other776.err; - err_hint = other776.err_hint; - hotkey_result = other776.hotkey_result; - __isset = other776.__isset; + err = other785.err; + err_hint = other785.err_hint; + hotkey_result = other785.hotkey_result; + __isset = other785.__isset; return *this; } -detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other777) +detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other786) { - err = std::move(other777.err); - err_hint = std::move(other777.err_hint); - hotkey_result = std::move(other777.hotkey_result); - __isset = std::move(other777.__isset); + err = std::move(other786.err); + err_hint = std::move(other786.err_hint); + hotkey_result = std::move(other786.hotkey_result); + __isset = std::move(other786.__isset); return *this; } void detect_hotkey_response::printTo(std::ostream &out) const diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index a6bc8418da..f2c0b90851 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -471,6 +471,9 @@ void meta_service::register_rpc_handlers() register_rpc_handler_with_rpc_holder(RPC_CM_START_PARTITION_SPLIT, "start_partition_split", &meta_service::on_start_partition_split); + register_rpc_handler_with_rpc_holder(RPC_CM_CONTROL_PARTITION_SPLIT, + "control_partition_split(pause/restart/cancel)", + &meta_service::on_control_partition_split); register_rpc_handler_with_rpc_holder(RPC_CM_REGISTER_CHILD_REPLICA, "register_child_on_meta", &meta_service::on_register_child_on_meta); @@ -1009,6 +1012,23 @@ void meta_service::on_start_partition_split(start_split_rpc rpc) server_state::sStateHash); } +void meta_service::on_control_partition_split(control_split_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + + if (_split_svc == nullptr) { + derror_f("meta doesn't support partition split"); + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _split_svc->control_partition_split(std::move(rpc)); }, + server_state::sStateHash); +} + void meta_service::on_register_child_on_meta(register_child_rpc rpc) { if (!check_status(rpc)) { diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index fa8d4a503a..ced331430f 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -183,6 +183,7 @@ class meta_service : public serverlet // split void on_start_partition_split(start_split_rpc rpc); + void on_control_partition_split(control_split_rpc rpc); void on_register_child_on_meta(register_child_rpc rpc); // bulk load diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp index 69b5e11197..a5412a026a 100644 --- a/src/meta/meta_split_service.cpp +++ b/src/meta/meta_split_service.cpp @@ -293,5 +293,141 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, parent_context.stage = config_status::not_pending; } +void meta_split_service::control_partition_split(control_split_rpc rpc) +{ + const auto &req = rpc.request(); + const auto &control_type = req.control_type; + auto &response = rpc.response(); + + zauto_write_lock l(app_lock()); + std::shared_ptr app = _state->get_app(req.app_name); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.__set_hint_msg(fmt::format( + "app {}", response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped", req.app_name)); + derror_f("{} split failed, {}", control_type_str(control_type), response.hint_msg); + return; + } + + if (app->helpers->split_states.splitting_count <= 0) { + response.err = ERR_INVALID_STATE; + response.__set_hint_msg(fmt::format("app({}) is not splitting", req.app_name)); + derror_f("{} split failed, {}", control_type_str(control_type), response.hint_msg); + return; + } + + if (req.parent_pidx >= 0 && (control_type == split_control_type::PAUSE || + control_type == split_control_type::RESTART)) { + do_control_single(std::move(app), std::move(rpc)); + } else { + do_control_all(std::move(app), std::move(rpc)); + } +} + +void meta_split_service::do_control_single(std::shared_ptr app, control_split_rpc rpc) +{ + const auto &req = rpc.request(); + const std::string &app_name = req.app_name; + const int32_t &parent_pidx = req.parent_pidx; + const auto &control_type = req.control_type; + auto &response = rpc.response(); + + if (parent_pidx >= app->partition_count / 2) { + response.err = ERR_INVALID_PARAMETERS; + response.__set_hint_msg(fmt::format("invalid parent partition index({})", parent_pidx)); + derror_f("{} split for app({}) failed, {}", + control_type_str(control_type), + app_name, + response.hint_msg); + return; + } + + auto iter = app->helpers->split_states.status.find(parent_pidx); + if (iter == app->helpers->split_states.status.end()) { + response.err = + control_type == split_control_type::PAUSE ? ERR_CHILD_REGISTERED : ERR_INVALID_STATE; + response.__set_hint_msg(fmt::format("partition[{}] is not splitting", parent_pidx)); + derror_f("{} split for app({}) failed, {}", + control_type_str(control_type), + app_name, + response.hint_msg); + return; + } + + split_status::type old_status = + control_type == split_control_type::PAUSE ? split_status::SPLITTING : split_status::PAUSED; + split_status::type target_status = + control_type == split_control_type::PAUSE ? split_status::PAUSING : split_status::SPLITTING; + if (iter->second == old_status) { + iter->second = target_status; + response.err = ERR_OK; + ddebug_f("app({}) partition[{}] {} split succeed", + app_name, + parent_pidx, + control_type_str(control_type)); + } else { + response.err = ERR_INVALID_STATE; + response.__set_hint_msg(fmt::format("partition[{}] wrong split_status({})", + parent_pidx, + dsn::enum_to_string(iter->second))); + derror_f("{} split for app({}) failed, {}", + control_type_str(control_type), + app_name, + response.hint_msg); + } +} + +void meta_split_service::do_control_all(std::shared_ptr app, control_split_rpc rpc) +{ + const auto &req = rpc.request(); + const auto &control_type = req.control_type; + auto &response = rpc.response(); + + if (control_type == split_control_type::CANCEL) { + if (req.old_partition_count != app->partition_count / 2) { + response.err = ERR_INVALID_PARAMETERS; + response.__set_hint_msg( + fmt::format("wrong partition_count, should be {}", app->partition_count / 2)); + derror_f("cancel split for app({}) failed, wrong partition count: partition count({}) " + "VS req partition_count({})", + app->app_name, + app->partition_count, + req.old_partition_count); + return; + } + + if (app->helpers->split_states.splitting_count != req.old_partition_count) { + response.err = ERR_CHILD_REGISTERED; + response.__set_hint_msg("some partitions have already finished split"); + derror_f("cancel split for app({}) failed, {}", app->app_name, response.hint_msg); + return; + } + + for (auto &kv : app->helpers->split_states.status) { + ddebug_f("app({}) partition({}) cancel split, old status = {}", + app->app_name, + kv.first, + dsn::enum_to_string(kv.second)); + kv.second = split_status::CANCELING; + } + return; + } + + split_status::type old_status = + control_type == split_control_type::PAUSE ? split_status::SPLITTING : split_status::PAUSED; + split_status::type target_status = + control_type == split_control_type::PAUSE ? split_status::PAUSING : split_status::SPLITTING; + for (auto &kv : app->helpers->split_states.status) { + if (kv.second == old_status) { + kv.second = target_status; + ddebug_f("app({}) partition[{}] {} split succeed", + app->app_name, + kv.first, + control_type_str(control_type)); + } + } + response.err = ERR_OK; +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_split_service.h b/src/meta/meta_split_service.h index ac6d0fe586..f85356bc89 100644 --- a/src/meta/meta_split_service.h +++ b/src/meta/meta_split_service.h @@ -38,6 +38,15 @@ class meta_split_service // client -> meta to start split void start_partition_split(start_split_rpc rpc); + // client -> meta to pause/restart/cancel split + void control_partition_split(control_split_rpc rpc); + + // pause/restart specific one partition + void do_control_single(std::shared_ptr app, control_split_rpc rpc); + + // pause all splitting partitions or restart all paused partitions or cancel all partitions + void do_control_all(std::shared_ptr app, control_split_rpc rpc); + // primary parent -> meta_server to register child void register_child_on_meta(register_child_rpc rpc); @@ -48,6 +57,19 @@ class meta_split_service void on_add_child_on_remote_storage_reply(error_code ec, register_child_rpc rpc, bool create_new); + static const std::string control_type_str(split_control_type::type type) + { + std::string str = ""; + if (type == split_control_type::PAUSE) { + str = "pause"; + } else if (type == split_control_type::RESTART) { + str = "restart"; + } else if (type == split_control_type::CANCEL) { + str = "cancel"; + } + return str; + } + private: friend class meta_service; friend class meta_split_service_test; diff --git a/src/meta/test/meta_split_service_test.cpp b/src/meta/test/meta_split_service_test.cpp index 2ac41a7daf..9605563249 100644 --- a/src/meta/test/meta_split_service_test.cpp +++ b/src/meta/test/meta_split_service_test.cpp @@ -63,6 +63,24 @@ class meta_split_service_test : public meta_test_base return rpc.response().err; } + error_code control_partition_split(const std::string &app_name, + split_control_type::type type, + const int32_t pidx, + const int32_t old_partition_count = 0) + { + auto req = make_unique(); + req->__set_app_name(app_name); + req->__set_control_type(type); + req->__set_parent_pidx(pidx); + req->__set_old_partition_count(old_partition_count); + + control_split_rpc rpc(std::move(req), RPC_CM_CONTROL_PARTITION_SPLIT); + split_svc().control_partition_split(rpc); + wait_all(); + + return rpc.response().err; + } + error_code register_child(int32_t parent_index, ballot req_parent_ballot, bool wait_zk) { partition_configuration parent_config; @@ -131,6 +149,15 @@ class meta_split_service_test : public meta_test_base } } + void clear_app_partition_split_context() + { + app->partition_count = PARTITION_COUNT; + app->partitions.resize(app->partition_count); + app->helpers->contexts.resize(app->partition_count); + app->helpers->split_states.splitting_count = 0; + app->helpers->split_states.status.clear(); + } + void mock_child_registered() { app->partitions[CHILD_INDEX].ballot = PARENT_BALLOT; @@ -138,6 +165,33 @@ class meta_split_service_test : public meta_test_base app->helpers->split_states.status.erase(PARENT_INDEX); } + void mock_split_states(split_status::type status, int32_t parent_index = -1) + { + if (parent_index != -1) { + app->helpers->split_states.status[parent_index] = status; + } else { + auto partition_count = app->partition_count; + for (auto i = 0; i < partition_count / 2; ++i) { + app->helpers->split_states.status[i] = status; + } + } + } + + bool check_split_status(split_status::type expected_status, int32_t parent_index = -1) + { + auto app = find_app(NAME); + if (parent_index != -1) { + return (app->helpers->split_states.status[parent_index] == expected_status); + } else { + for (const auto kv : app->helpers->split_states.status) { + if (kv.second != expected_status) { + return false; + } + } + return true; + } + } + const std::string NAME = "split_table"; const int32_t PARTITION_COUNT = 4; const int32_t NEW_PARTITION_COUNT = 8; @@ -264,5 +318,174 @@ TEST_F(meta_split_service_test, on_config_sync_test) drop_app("not_splitting_app"); } +/// control split unit tests +TEST_F(meta_split_service_test, pause_or_restart_single_partition_test) +{ + // Test case: + // - pause with wrong pidx + // - pause with partition is not splitting + // - pause with partition split_status = pausing + // - pause with partition split_status = paused + // - pause with partition split_status = canceling + // - pause with partition split_status = splitting + // - restart with partition is not splitting + // - restart with partition split_status = pausing + // - restart with partition split_status = paused + // - restart with partition split_status = canceling + // - restart with partition split_status = splitting + struct control_single_partition_test + { + int32_t pidx; + split_status::type cur_status; + split_control_type::type control_type; + error_code expected_err; + split_status::type expected_status; + } tests[] = {{NEW_PARTITION_COUNT, + split_status::SPLITTING, + split_control_type::PAUSE, + ERR_INVALID_PARAMETERS, + split_status::SPLITTING}, + {PARENT_INDEX, + split_status::NOT_SPLIT, + split_control_type::PAUSE, + ERR_CHILD_REGISTERED, + split_status::NOT_SPLIT}, + {PARENT_INDEX, + split_status::PAUSING, + split_control_type::PAUSE, + ERR_INVALID_STATE, + split_status::PAUSING}, + {PARENT_INDEX, + split_status::PAUSED, + split_control_type::PAUSE, + ERR_INVALID_STATE, + split_status::PAUSED}, + {PARENT_INDEX, + split_status::CANCELING, + split_control_type::PAUSE, + ERR_INVALID_STATE, + split_status::CANCELING}, + {PARENT_INDEX, + split_status::SPLITTING, + split_control_type::PAUSE, + ERR_OK, + split_status::PAUSING}, + {PARENT_INDEX, + split_status::NOT_SPLIT, + split_control_type::RESTART, + ERR_INVALID_STATE, + split_status::NOT_SPLIT}, + {PARENT_INDEX, + split_status::PAUSING, + split_control_type::RESTART, + ERR_INVALID_STATE, + split_status::PAUSING}, + {PARENT_INDEX, + split_status::PAUSED, + split_control_type::RESTART, + ERR_OK, + split_status::SPLITTING}, + {PARENT_INDEX, + split_status::CANCELING, + split_control_type::RESTART, + ERR_INVALID_STATE, + split_status::CANCELING}, + {PARENT_INDEX, + split_status::SPLITTING, + split_control_type::RESTART, + ERR_INVALID_STATE, + split_status::SPLITTING}}; + + for (auto test : tests) { + mock_app_partition_split_context(); + if (test.cur_status == split_status::NOT_SPLIT) { + mock_child_registered(); + } else { + mock_split_states(test.cur_status, PARENT_INDEX); + } + ASSERT_EQ(control_partition_split(NAME, test.control_type, test.pidx, PARTITION_COUNT), + test.expected_err); + if (test.expected_err == ERR_OK) { + ASSERT_TRUE(check_split_status(test.expected_status, test.pidx)); + } + clear_app_partition_split_context(); + } +} + +TEST_F(meta_split_service_test, pause_or_restart_multi_partitions_test) +{ + // Test case: + // - app not existed + // - app is not splitting + // - pausing all splitting partitions succeed + // - restart all paused partitions succeed + struct control_multi_partitions_test + { + bool mock_split_context; + std::string app_name; + split_control_type::type control_type; + error_code expected_err; + } tests[] = {{false, "table_not_exist", split_control_type::PAUSE, ERR_APP_NOT_EXIST}, + {false, NAME, split_control_type::RESTART, ERR_INVALID_STATE}, + {true, NAME, split_control_type::PAUSE, ERR_OK}, + {true, NAME, split_control_type::RESTART, ERR_OK}}; + + for (auto test : tests) { + if (test.mock_split_context) { + mock_app_partition_split_context(); + if (test.control_type == split_control_type::RESTART) { + mock_split_states(split_status::PAUSED, -1); + } + } + error_code ec = + control_partition_split(test.app_name, test.control_type, -1, PARTITION_COUNT); + ASSERT_EQ(ec, test.expected_err); + if (test.expected_err == ERR_OK) { + split_status::type expected_status = test.control_type == split_control_type::PAUSE + ? split_status::PAUSING + : split_status::SPLITTING; + ASSERT_TRUE(check_split_status(expected_status, -1)); + } + if (test.mock_split_context) { + clear_app_partition_split_context(); + } + } +} + +TEST_F(meta_split_service_test, cancel_split_test) +{ + // Test case: + // - wrong partition count + // - cancel split with child registered + // - cancel succeed + struct cancel_test + { + int32_t old_partition_count; + bool mock_child_registered; + error_code expected_err; + bool check_status; + } tests[] = {{NEW_PARTITION_COUNT, false, ERR_INVALID_PARAMETERS, false}, + {PARTITION_COUNT, true, ERR_CHILD_REGISTERED, false}, + {PARTITION_COUNT, false, ERR_OK, true}}; + + for (auto test : tests) { + mock_app_partition_split_context(); + if (test.mock_child_registered) { + mock_child_registered(); + } + + ASSERT_EQ( + control_partition_split(NAME, split_control_type::CANCEL, -1, test.old_partition_count), + test.expected_err); + if (test.check_status) { + auto app = find_app(NAME); + ASSERT_EQ(app->partition_count, NEW_PARTITION_COUNT); + ASSERT_EQ(app->helpers->split_states.splitting_count, PARTITION_COUNT); + check_split_status(split_status::CANCELING, -1); + } + clear_app_partition_split_context(); + } +} + } // namespace replication } // namespace dsn diff --git a/src/replication.thrift b/src/replication.thrift index a1f4226082..04749631bc 100644 --- a/src/replication.thrift +++ b/src/replication.thrift @@ -890,6 +890,39 @@ struct start_partition_split_response 2:string hint_msg; } +enum split_control_type +{ + PAUSE, + RESTART, + CANCEL +} + +// client to meta server to control partition split +// support three control type: pause, restart, cancel +struct control_split_request +{ + 1:string app_name; + 2:split_control_type control_type + // for pause, parent_pidx >= 0, pause specific partition, parent_pidx = -1, pause all splitting partition + // for restart, parent_pidx >= 0, restart specific partition, parent_pidx = -1, restart all paused partition + // for cancel, parent_pidx will always be -1 + 3:i32 parent_pidx; + // only used for cancel + 4:optional i32 old_partition_count; +} + +struct control_split_response +{ + // Possible errors: + // - ERR_APP_NOT_EXIST: app not exist + // - ERR_APP_DROPPED: app has been dropped + // - ERR_INVALID_STATE: wrong partition split_status + // - ERR_INVALID_PARAMETERS: invalid parent_pidx or old_partition_count + // - ERR_CHILD_REGISTERED: child partition has been registered, pause partition split or cancel split failed + 1:dsn.error_code err; + 2:optional string hint_msg; +} + // child to primary parent, notifying that itself has caught up with parent struct notify_catch_up_request {