From 48ad9b8d15786a857951bf4cb09bfacd060bfffa Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Wed, 28 Oct 2020 16:47:52 +0800 Subject: [PATCH] feat(split): add update_child_group_partition_count (#645) --- .../dsn/dist/replication/replication.codes.h | 1 + .../dsn/dist/replication/replication_types.h | 132 ++ src/common/replication_types.cpp | 1328 ++++++++++------- src/replica/replica.h | 2 +- src/replica/replica_stub.cpp | 13 + src/replica/replica_stub.h | 15 +- src/replica/split/replica_split_manager.cpp | 236 ++- src/replica/split/replica_split_manager.h | 22 + src/replica/split/test/replica_split_test.cpp | 154 ++ src/replication.thrift | 17 + 10 files changed, 1389 insertions(+), 531 deletions(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 82824f583d..22bf3fef08 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -155,6 +155,7 @@ MAKE_EVENT_CODE(LPC_EXEC_COMMAND_ON_REPLICA, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ERROR, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_SPLIT_NOTIFY_CATCH_UP, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_GROUP_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 38e209f16d..00431ea27e 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -435,6 +435,10 @@ class notify_catch_up_request; class notify_cacth_up_response; +class update_child_group_partition_count_request; + +class update_child_group_partition_count_response; + class register_child_request; class register_child_response; @@ -6229,6 +6233,134 @@ inline std::ostream &operator<<(std::ostream &out, const notify_cacth_up_respons return out; } +typedef struct _update_child_group_partition_count_request__isset +{ + _update_child_group_partition_count_request__isset() + : target_address(false), new_partition_count(false), child_pid(false), ballot(false) + { + } + bool target_address : 1; + bool new_partition_count : 1; + bool child_pid : 1; + bool ballot : 1; +} _update_child_group_partition_count_request__isset; + +class update_child_group_partition_count_request +{ +public: + update_child_group_partition_count_request(const update_child_group_partition_count_request &); + update_child_group_partition_count_request(update_child_group_partition_count_request &&); + update_child_group_partition_count_request & + operator=(const update_child_group_partition_count_request &); + update_child_group_partition_count_request & + operator=(update_child_group_partition_count_request &&); + update_child_group_partition_count_request() : new_partition_count(0), ballot(0) {} + + virtual ~update_child_group_partition_count_request() throw(); + ::dsn::rpc_address target_address; + int32_t new_partition_count; + ::dsn::gpid child_pid; + int64_t ballot; + + _update_child_group_partition_count_request__isset __isset; + + void __set_target_address(const ::dsn::rpc_address &val); + + void __set_new_partition_count(const int32_t val); + + void __set_child_pid(const ::dsn::gpid &val); + + void __set_ballot(const int64_t val); + + bool operator==(const update_child_group_partition_count_request &rhs) const + { + if (!(target_address == rhs.target_address)) + return false; + if (!(new_partition_count == rhs.new_partition_count)) + return false; + if (!(child_pid == rhs.child_pid)) + return false; + if (!(ballot == rhs.ballot)) + return false; + return true; + } + bool operator!=(const update_child_group_partition_count_request &rhs) const + { + return !(*this == rhs); + } + + bool operator<(const update_child_group_partition_count_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(update_child_group_partition_count_request &a, + update_child_group_partition_count_request &b); + +inline std::ostream &operator<<(std::ostream &out, + const update_child_group_partition_count_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _update_child_group_partition_count_response__isset +{ + _update_child_group_partition_count_response__isset() : err(false) {} + bool err : 1; +} _update_child_group_partition_count_response__isset; + +class update_child_group_partition_count_response +{ +public: + update_child_group_partition_count_response( + const update_child_group_partition_count_response &); + update_child_group_partition_count_response(update_child_group_partition_count_response &&); + update_child_group_partition_count_response & + operator=(const update_child_group_partition_count_response &); + update_child_group_partition_count_response & + operator=(update_child_group_partition_count_response &&); + update_child_group_partition_count_response() {} + + virtual ~update_child_group_partition_count_response() throw(); + ::dsn::error_code err; + + _update_child_group_partition_count_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + bool operator==(const update_child_group_partition_count_response &rhs) const + { + if (!(err == rhs.err)) + return false; + return true; + } + bool operator!=(const update_child_group_partition_count_response &rhs) const + { + return !(*this == rhs); + } + + bool operator<(const update_child_group_partition_count_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(update_child_group_partition_count_response &a, + update_child_group_partition_count_response &b); + +inline std::ostream &operator<<(std::ostream &out, + const update_child_group_partition_count_response &obj) +{ + obj.printTo(out); + return out; +} + typedef struct _register_child_request__isset { _register_child_request__isset() diff --git a/src/common/replication_types.cpp b/src/common/replication_types.cpp index 9585ec0799..f56d8cd7db 100644 --- a/src/common/replication_types.cpp +++ b/src/common/replication_types.cpp @@ -14466,6 +14466,290 @@ void notify_cacth_up_response::printTo(std::ostream &out) const out << ")"; } +update_child_group_partition_count_request::~update_child_group_partition_count_request() throw() {} + +void update_child_group_partition_count_request::__set_target_address(const ::dsn::rpc_address &val) +{ + this->target_address = val; +} + +void update_child_group_partition_count_request::__set_new_partition_count(const int32_t val) +{ + this->new_partition_count = val; +} + +void update_child_group_partition_count_request::__set_child_pid(const ::dsn::gpid &val) +{ + this->child_pid = val; +} + +void update_child_group_partition_count_request::__set_ballot(const int64_t val) +{ + this->ballot = val; +} + +uint32_t +update_child_group_partition_count_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->target_address.read(iprot); + this->__isset.target_address = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->new_partition_count); + this->__isset.new_partition_count = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_pid.read(iprot); + this->__isset.child_pid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->ballot); + this->__isset.ballot = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t update_child_group_partition_count_request::write( + ::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("update_child_group_partition_count_request"); + + xfer += oprot->writeFieldBegin("target_address", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->target_address.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("new_partition_count", ::apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32(this->new_partition_count); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_pid", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->child_pid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("ballot", ::apache::thrift::protocol::T_I64, 4); + xfer += oprot->writeI64(this->ballot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(update_child_group_partition_count_request &a, + update_child_group_partition_count_request &b) +{ + using ::std::swap; + swap(a.target_address, b.target_address); + swap(a.new_partition_count, b.new_partition_count); + swap(a.child_pid, b.child_pid); + swap(a.ballot, b.ballot); + swap(a.__isset, b.__isset); +} + +update_child_group_partition_count_request::update_child_group_partition_count_request( + const update_child_group_partition_count_request &other630) +{ + target_address = other630.target_address; + new_partition_count = other630.new_partition_count; + child_pid = other630.child_pid; + ballot = other630.ballot; + __isset = other630.__isset; +} +update_child_group_partition_count_request::update_child_group_partition_count_request( + update_child_group_partition_count_request &&other631) +{ + target_address = std::move(other631.target_address); + new_partition_count = std::move(other631.new_partition_count); + child_pid = std::move(other631.child_pid); + ballot = std::move(other631.ballot); + __isset = std::move(other631.__isset); +} +update_child_group_partition_count_request &update_child_group_partition_count_request:: +operator=(const update_child_group_partition_count_request &other632) +{ + target_address = other632.target_address; + new_partition_count = other632.new_partition_count; + child_pid = other632.child_pid; + ballot = other632.ballot; + __isset = other632.__isset; + return *this; +} +update_child_group_partition_count_request &update_child_group_partition_count_request:: +operator=(update_child_group_partition_count_request &&other633) +{ + target_address = std::move(other633.target_address); + new_partition_count = std::move(other633.new_partition_count); + child_pid = std::move(other633.child_pid); + ballot = std::move(other633.ballot); + __isset = std::move(other633.__isset); + return *this; +} +void update_child_group_partition_count_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "update_child_group_partition_count_request("; + out << "target_address=" << to_string(target_address); + out << ", " + << "new_partition_count=" << to_string(new_partition_count); + out << ", " + << "child_pid=" << to_string(child_pid); + out << ", " + << "ballot=" << to_string(ballot); + out << ")"; +} + +update_child_group_partition_count_response::~update_child_group_partition_count_response() throw() +{ +} + +void update_child_group_partition_count_response::__set_err(const ::dsn::error_code &val) +{ + this->err = val; +} + +uint32_t +update_child_group_partition_count_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t update_child_group_partition_count_response::write( + ::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("update_child_group_partition_count_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(update_child_group_partition_count_response &a, + update_child_group_partition_count_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.__isset, b.__isset); +} + +update_child_group_partition_count_response::update_child_group_partition_count_response( + const update_child_group_partition_count_response &other634) +{ + err = other634.err; + __isset = other634.__isset; +} +update_child_group_partition_count_response::update_child_group_partition_count_response( + update_child_group_partition_count_response &&other635) +{ + err = std::move(other635.err); + __isset = std::move(other635.__isset); +} +update_child_group_partition_count_response &update_child_group_partition_count_response:: +operator=(const update_child_group_partition_count_response &other636) +{ + err = other636.err; + __isset = other636.__isset; + return *this; +} +update_child_group_partition_count_response &update_child_group_partition_count_response:: +operator=(update_child_group_partition_count_response &&other637) +{ + err = std::move(other637.err); + __isset = std::move(other637.__isset); + return *this; +} +void update_child_group_partition_count_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "update_child_group_partition_count_response("; + out << "err=" << to_string(err); + out << ")"; +} + register_child_request::~register_child_request() throw() {} void register_child_request::__set_app(const ::dsn::app_info &val) { this->app = val; } @@ -14585,38 +14869,38 @@ void swap(register_child_request &a, register_child_request &b) swap(a.__isset, b.__isset); } -register_child_request::register_child_request(const register_child_request &other630) +register_child_request::register_child_request(const register_child_request &other638) { - app = other630.app; - parent_config = other630.parent_config; - child_config = other630.child_config; - primary_address = other630.primary_address; - __isset = other630.__isset; + app = other638.app; + parent_config = other638.parent_config; + child_config = other638.child_config; + primary_address = other638.primary_address; + __isset = other638.__isset; } -register_child_request::register_child_request(register_child_request &&other631) +register_child_request::register_child_request(register_child_request &&other639) { - app = std::move(other631.app); - parent_config = std::move(other631.parent_config); - child_config = std::move(other631.child_config); - primary_address = std::move(other631.primary_address); - __isset = std::move(other631.__isset); + app = std::move(other639.app); + parent_config = std::move(other639.parent_config); + child_config = std::move(other639.child_config); + primary_address = std::move(other639.primary_address); + __isset = std::move(other639.__isset); } -register_child_request ®ister_child_request::operator=(const register_child_request &other632) +register_child_request ®ister_child_request::operator=(const register_child_request &other640) { - app = other632.app; - parent_config = other632.parent_config; - child_config = other632.child_config; - primary_address = other632.primary_address; - __isset = other632.__isset; + app = other640.app; + parent_config = other640.parent_config; + child_config = other640.child_config; + primary_address = other640.primary_address; + __isset = other640.__isset; return *this; } -register_child_request ®ister_child_request::operator=(register_child_request &&other633) +register_child_request ®ister_child_request::operator=(register_child_request &&other641) { - app = std::move(other633.app); - parent_config = std::move(other633.parent_config); - child_config = std::move(other633.child_config); - primary_address = std::move(other633.primary_address); - __isset = std::move(other633.__isset); + app = std::move(other641.app); + parent_config = std::move(other641.parent_config); + child_config = std::move(other641.child_config); + primary_address = std::move(other641.primary_address); + __isset = std::move(other641.__isset); return *this; } void register_child_request::printTo(std::ostream &out) const @@ -14749,38 +15033,38 @@ void swap(register_child_response &a, register_child_response &b) swap(a.__isset, b.__isset); } -register_child_response::register_child_response(const register_child_response &other634) +register_child_response::register_child_response(const register_child_response &other642) { - err = other634.err; - app = other634.app; - parent_config = other634.parent_config; - child_config = other634.child_config; - __isset = other634.__isset; + err = other642.err; + app = other642.app; + parent_config = other642.parent_config; + child_config = other642.child_config; + __isset = other642.__isset; } -register_child_response::register_child_response(register_child_response &&other635) +register_child_response::register_child_response(register_child_response &&other643) { - err = std::move(other635.err); - app = std::move(other635.app); - parent_config = std::move(other635.parent_config); - child_config = std::move(other635.child_config); - __isset = std::move(other635.__isset); + err = std::move(other643.err); + app = std::move(other643.app); + parent_config = std::move(other643.parent_config); + child_config = std::move(other643.child_config); + __isset = std::move(other643.__isset); } -register_child_response ®ister_child_response::operator=(const register_child_response &other636) +register_child_response ®ister_child_response::operator=(const register_child_response &other644) { - err = other636.err; - app = other636.app; - parent_config = other636.parent_config; - child_config = other636.child_config; - __isset = other636.__isset; + err = other644.err; + app = other644.app; + parent_config = other644.parent_config; + child_config = other644.child_config; + __isset = other644.__isset; return *this; } -register_child_response ®ister_child_response::operator=(register_child_response &&other637) +register_child_response ®ister_child_response::operator=(register_child_response &&other645) { - err = std::move(other637.err); - app = std::move(other637.app); - parent_config = std::move(other637.parent_config); - child_config = std::move(other637.child_config); - __isset = std::move(other637.__isset); + err = std::move(other645.err); + app = std::move(other645.app); + parent_config = std::move(other645.parent_config); + child_config = std::move(other645.child_config); + __isset = std::move(other645.__isset); return *this; } void register_child_response::printTo(std::ostream &out) const @@ -14826,13 +15110,13 @@ uint32_t bulk_load_metadata::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_LIST) { { this->files.clear(); - uint32_t _size638; - ::apache::thrift::protocol::TType _etype641; - xfer += iprot->readListBegin(_etype641, _size638); - this->files.resize(_size638); - uint32_t _i642; - for (_i642 = 0; _i642 < _size638; ++_i642) { - xfer += this->files[_i642].read(iprot); + uint32_t _size646; + ::apache::thrift::protocol::TType _etype649; + xfer += iprot->readListBegin(_etype649, _size646); + this->files.resize(_size646); + uint32_t _i650; + for (_i650 = 0; _i650 < _size646; ++_i650) { + xfer += this->files[_i650].read(iprot); } xfer += iprot->readListEnd(); } @@ -14871,9 +15155,9 @@ uint32_t bulk_load_metadata::write(::apache::thrift::protocol::TProtocol *oprot) { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->files.size())); - std::vector::const_iterator _iter643; - for (_iter643 = this->files.begin(); _iter643 != this->files.end(); ++_iter643) { - xfer += (*_iter643).write(oprot); + std::vector::const_iterator _iter651; + for (_iter651 = this->files.begin(); _iter651 != this->files.end(); ++_iter651) { + xfer += (*_iter651).write(oprot); } xfer += oprot->writeListEnd(); } @@ -14896,30 +15180,30 @@ void swap(bulk_load_metadata &a, bulk_load_metadata &b) swap(a.__isset, b.__isset); } -bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other644) +bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other652) { - files = other644.files; - file_total_size = other644.file_total_size; - __isset = other644.__isset; + files = other652.files; + file_total_size = other652.file_total_size; + __isset = other652.__isset; } -bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other645) +bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other653) { - files = std::move(other645.files); - file_total_size = std::move(other645.file_total_size); - __isset = std::move(other645.__isset); + files = std::move(other653.files); + file_total_size = std::move(other653.file_total_size); + __isset = std::move(other653.__isset); } -bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other646) +bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other654) { - files = other646.files; - file_total_size = other646.file_total_size; - __isset = other646.__isset; + files = other654.files; + file_total_size = other654.file_total_size; + __isset = other654.__isset; return *this; } -bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other647) +bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other655) { - files = std::move(other647.files); - file_total_size = std::move(other647.file_total_size); - __isset = std::move(other647.__isset); + files = std::move(other655.files); + file_total_size = std::move(other655.file_total_size); + __isset = std::move(other655.__isset); return *this; } void bulk_load_metadata::printTo(std::ostream &out) const @@ -15033,34 +15317,34 @@ void swap(start_bulk_load_request &a, start_bulk_load_request &b) swap(a.__isset, b.__isset); } -start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other648) +start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other656) { - app_name = other648.app_name; - cluster_name = other648.cluster_name; - file_provider_type = other648.file_provider_type; - __isset = other648.__isset; + app_name = other656.app_name; + cluster_name = other656.cluster_name; + file_provider_type = other656.file_provider_type; + __isset = other656.__isset; } -start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other649) +start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other657) { - app_name = std::move(other649.app_name); - cluster_name = std::move(other649.cluster_name); - file_provider_type = std::move(other649.file_provider_type); - __isset = std::move(other649.__isset); + app_name = std::move(other657.app_name); + cluster_name = std::move(other657.cluster_name); + file_provider_type = std::move(other657.file_provider_type); + __isset = std::move(other657.__isset); } -start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other650) +start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other658) { - app_name = other650.app_name; - cluster_name = other650.cluster_name; - file_provider_type = other650.file_provider_type; - __isset = other650.__isset; + app_name = other658.app_name; + cluster_name = other658.cluster_name; + file_provider_type = other658.file_provider_type; + __isset = other658.__isset; return *this; } -start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other651) +start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other659) { - app_name = std::move(other651.app_name); - cluster_name = std::move(other651.cluster_name); - file_provider_type = std::move(other651.file_provider_type); - __isset = std::move(other651.__isset); + app_name = std::move(other659.app_name); + cluster_name = std::move(other659.cluster_name); + file_provider_type = std::move(other659.file_provider_type); + __isset = std::move(other659.__isset); return *this; } void start_bulk_load_request::printTo(std::ostream &out) const @@ -15155,31 +15439,31 @@ void swap(start_bulk_load_response &a, start_bulk_load_response &b) swap(a.__isset, b.__isset); } -start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other652) +start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other660) { - err = other652.err; - hint_msg = other652.hint_msg; - __isset = other652.__isset; + err = other660.err; + hint_msg = other660.hint_msg; + __isset = other660.__isset; } -start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other653) +start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other661) { - err = std::move(other653.err); - hint_msg = std::move(other653.hint_msg); - __isset = std::move(other653.__isset); + err = std::move(other661.err); + hint_msg = std::move(other661.hint_msg); + __isset = std::move(other661.__isset); } start_bulk_load_response &start_bulk_load_response:: -operator=(const start_bulk_load_response &other654) +operator=(const start_bulk_load_response &other662) { - err = other654.err; - hint_msg = other654.hint_msg; - __isset = other654.__isset; + err = other662.err; + hint_msg = other662.hint_msg; + __isset = other662.__isset; return *this; } -start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other655) +start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other663) { - err = std::move(other655.err); - hint_msg = std::move(other655.hint_msg); - __isset = std::move(other655.__isset); + err = std::move(other663.err); + hint_msg = std::move(other663.hint_msg); + __isset = std::move(other663.__isset); return *this; } void start_bulk_load_response::printTo(std::ostream &out) const @@ -15261,9 +15545,9 @@ uint32_t partition_bulk_load_state::read(::apache::thrift::protocol::TProtocol * break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast656; - xfer += iprot->readI32(ecast656); - this->ingest_status = (ingestion_status::type)ecast656; + int32_t ecast664; + xfer += iprot->readI32(ecast664); + this->ingest_status = (ingestion_status::type)ecast664; this->__isset.ingest_status = true; } else { xfer += iprot->skip(ftype); @@ -15344,44 +15628,44 @@ void swap(partition_bulk_load_state &a, partition_bulk_load_state &b) swap(a.__isset, b.__isset); } -partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other657) +partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other665) { - download_progress = other657.download_progress; - download_status = other657.download_status; - ingest_status = other657.ingest_status; - is_cleaned_up = other657.is_cleaned_up; - is_paused = other657.is_paused; - __isset = other657.__isset; + download_progress = other665.download_progress; + download_status = other665.download_status; + ingest_status = other665.ingest_status; + is_cleaned_up = other665.is_cleaned_up; + is_paused = other665.is_paused; + __isset = other665.__isset; } -partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other658) +partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other666) { - download_progress = std::move(other658.download_progress); - download_status = std::move(other658.download_status); - ingest_status = std::move(other658.ingest_status); - is_cleaned_up = std::move(other658.is_cleaned_up); - is_paused = std::move(other658.is_paused); - __isset = std::move(other658.__isset); + download_progress = std::move(other666.download_progress); + download_status = std::move(other666.download_status); + ingest_status = std::move(other666.ingest_status); + is_cleaned_up = std::move(other666.is_cleaned_up); + is_paused = std::move(other666.is_paused); + __isset = std::move(other666.__isset); } partition_bulk_load_state &partition_bulk_load_state:: -operator=(const partition_bulk_load_state &other659) -{ - download_progress = other659.download_progress; - download_status = other659.download_status; - ingest_status = other659.ingest_status; - is_cleaned_up = other659.is_cleaned_up; - is_paused = other659.is_paused; - __isset = other659.__isset; +operator=(const partition_bulk_load_state &other667) +{ + download_progress = other667.download_progress; + download_status = other667.download_status; + ingest_status = other667.ingest_status; + is_cleaned_up = other667.is_cleaned_up; + is_paused = other667.is_paused; + __isset = other667.__isset; return *this; } partition_bulk_load_state &partition_bulk_load_state:: -operator=(partition_bulk_load_state &&other660) -{ - download_progress = std::move(other660.download_progress); - download_status = std::move(other660.download_status); - ingest_status = std::move(other660.ingest_status); - is_cleaned_up = std::move(other660.is_cleaned_up); - is_paused = std::move(other660.is_paused); - __isset = std::move(other660.__isset); +operator=(partition_bulk_load_state &&other668) +{ + download_progress = std::move(other668.download_progress); + download_status = std::move(other668.download_status); + ingest_status = std::move(other668.ingest_status); + is_cleaned_up = std::move(other668.is_cleaned_up); + is_paused = std::move(other668.is_paused); + __isset = std::move(other668.__isset); return *this; } void partition_bulk_load_state::printTo(std::ostream &out) const @@ -15504,9 +15788,9 @@ uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) break; case 7: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast661; - xfer += iprot->readI32(ecast661); - this->meta_bulk_load_status = (bulk_load_status::type)ecast661; + int32_t ecast669; + xfer += iprot->readI32(ecast669); + this->meta_bulk_load_status = (bulk_load_status::type)ecast669; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -15590,54 +15874,54 @@ void swap(bulk_load_request &a, bulk_load_request &b) swap(a.__isset, b.__isset); } -bulk_load_request::bulk_load_request(const bulk_load_request &other662) -{ - pid = other662.pid; - app_name = other662.app_name; - primary_addr = other662.primary_addr; - remote_provider_name = other662.remote_provider_name; - cluster_name = other662.cluster_name; - ballot = other662.ballot; - meta_bulk_load_status = other662.meta_bulk_load_status; - query_bulk_load_metadata = other662.query_bulk_load_metadata; - __isset = other662.__isset; -} -bulk_load_request::bulk_load_request(bulk_load_request &&other663) -{ - pid = std::move(other663.pid); - app_name = std::move(other663.app_name); - primary_addr = std::move(other663.primary_addr); - remote_provider_name = std::move(other663.remote_provider_name); - cluster_name = std::move(other663.cluster_name); - ballot = std::move(other663.ballot); - meta_bulk_load_status = std::move(other663.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other663.query_bulk_load_metadata); - __isset = std::move(other663.__isset); -} -bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other664) -{ - pid = other664.pid; - app_name = other664.app_name; - primary_addr = other664.primary_addr; - remote_provider_name = other664.remote_provider_name; - cluster_name = other664.cluster_name; - ballot = other664.ballot; - meta_bulk_load_status = other664.meta_bulk_load_status; - query_bulk_load_metadata = other664.query_bulk_load_metadata; - __isset = other664.__isset; +bulk_load_request::bulk_load_request(const bulk_load_request &other670) +{ + pid = other670.pid; + app_name = other670.app_name; + primary_addr = other670.primary_addr; + remote_provider_name = other670.remote_provider_name; + cluster_name = other670.cluster_name; + ballot = other670.ballot; + meta_bulk_load_status = other670.meta_bulk_load_status; + query_bulk_load_metadata = other670.query_bulk_load_metadata; + __isset = other670.__isset; +} +bulk_load_request::bulk_load_request(bulk_load_request &&other671) +{ + pid = std::move(other671.pid); + app_name = std::move(other671.app_name); + primary_addr = std::move(other671.primary_addr); + remote_provider_name = std::move(other671.remote_provider_name); + cluster_name = std::move(other671.cluster_name); + ballot = std::move(other671.ballot); + meta_bulk_load_status = std::move(other671.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other671.query_bulk_load_metadata); + __isset = std::move(other671.__isset); +} +bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other672) +{ + pid = other672.pid; + app_name = other672.app_name; + primary_addr = other672.primary_addr; + remote_provider_name = other672.remote_provider_name; + cluster_name = other672.cluster_name; + ballot = other672.ballot; + meta_bulk_load_status = other672.meta_bulk_load_status; + query_bulk_load_metadata = other672.query_bulk_load_metadata; + __isset = other672.__isset; return *this; } -bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other665) -{ - pid = std::move(other665.pid); - app_name = std::move(other665.app_name); - primary_addr = std::move(other665.primary_addr); - remote_provider_name = std::move(other665.remote_provider_name); - cluster_name = std::move(other665.cluster_name); - ballot = std::move(other665.ballot); - meta_bulk_load_status = std::move(other665.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other665.query_bulk_load_metadata); - __isset = std::move(other665.__isset); +bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other673) +{ + pid = std::move(other673.pid); + app_name = std::move(other673.app_name); + primary_addr = std::move(other673.primary_addr); + remote_provider_name = std::move(other673.remote_provider_name); + cluster_name = std::move(other673.cluster_name); + ballot = std::move(other673.ballot); + meta_bulk_load_status = std::move(other673.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other673.query_bulk_load_metadata); + __isset = std::move(other673.__isset); return *this; } void bulk_load_request::printTo(std::ostream &out) const @@ -15756,9 +16040,9 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) break; case 4: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast666; - xfer += iprot->readI32(ecast666); - this->primary_bulk_load_status = (bulk_load_status::type)ecast666; + int32_t ecast674; + xfer += iprot->readI32(ecast674); + this->primary_bulk_load_status = (bulk_load_status::type)ecast674; this->__isset.primary_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -15768,16 +16052,16 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_MAP) { { this->group_bulk_load_state.clear(); - uint32_t _size667; - ::apache::thrift::protocol::TType _ktype668; - ::apache::thrift::protocol::TType _vtype669; - xfer += iprot->readMapBegin(_ktype668, _vtype669, _size667); - uint32_t _i671; - for (_i671 = 0; _i671 < _size667; ++_i671) { - ::dsn::rpc_address _key672; - xfer += _key672.read(iprot); - partition_bulk_load_state &_val673 = this->group_bulk_load_state[_key672]; - xfer += _val673.read(iprot); + uint32_t _size675; + ::apache::thrift::protocol::TType _ktype676; + ::apache::thrift::protocol::TType _vtype677; + xfer += iprot->readMapBegin(_ktype676, _vtype677, _size675); + uint32_t _i679; + for (_i679 = 0; _i679 < _size675; ++_i679) { + ::dsn::rpc_address _key680; + xfer += _key680.read(iprot); + partition_bulk_load_state &_val681 = this->group_bulk_load_state[_key680]; + xfer += _val681.read(iprot); } xfer += iprot->readMapEnd(); } @@ -15866,12 +16150,12 @@ uint32_t bulk_load_response::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, static_cast(this->group_bulk_load_state.size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter674; - for (_iter674 = this->group_bulk_load_state.begin(); - _iter674 != this->group_bulk_load_state.end(); - ++_iter674) { - xfer += _iter674->first.write(oprot); - xfer += _iter674->second.write(oprot); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter682; + for (_iter682 = this->group_bulk_load_state.begin(); + _iter682 != this->group_bulk_load_state.end(); + ++_iter682) { + xfer += _iter682->first.write(oprot); + xfer += _iter682->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -15927,64 +16211,64 @@ void swap(bulk_load_response &a, bulk_load_response &b) swap(a.__isset, b.__isset); } -bulk_load_response::bulk_load_response(const bulk_load_response &other675) -{ - err = other675.err; - pid = other675.pid; - app_name = other675.app_name; - primary_bulk_load_status = other675.primary_bulk_load_status; - group_bulk_load_state = other675.group_bulk_load_state; - metadata = other675.metadata; - total_download_progress = other675.total_download_progress; - is_group_ingestion_finished = other675.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other675.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other675.is_group_bulk_load_paused; - __isset = other675.__isset; -} -bulk_load_response::bulk_load_response(bulk_load_response &&other676) -{ - err = std::move(other676.err); - pid = std::move(other676.pid); - app_name = std::move(other676.app_name); - primary_bulk_load_status = std::move(other676.primary_bulk_load_status); - group_bulk_load_state = std::move(other676.group_bulk_load_state); - metadata = std::move(other676.metadata); - total_download_progress = std::move(other676.total_download_progress); - is_group_ingestion_finished = std::move(other676.is_group_ingestion_finished); +bulk_load_response::bulk_load_response(const bulk_load_response &other683) +{ + err = other683.err; + pid = other683.pid; + app_name = other683.app_name; + primary_bulk_load_status = other683.primary_bulk_load_status; + group_bulk_load_state = other683.group_bulk_load_state; + metadata = other683.metadata; + total_download_progress = other683.total_download_progress; + is_group_ingestion_finished = other683.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other683.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other683.is_group_bulk_load_paused; + __isset = other683.__isset; +} +bulk_load_response::bulk_load_response(bulk_load_response &&other684) +{ + err = std::move(other684.err); + pid = std::move(other684.pid); + app_name = std::move(other684.app_name); + primary_bulk_load_status = std::move(other684.primary_bulk_load_status); + group_bulk_load_state = std::move(other684.group_bulk_load_state); + metadata = std::move(other684.metadata); + total_download_progress = std::move(other684.total_download_progress); + is_group_ingestion_finished = std::move(other684.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other676.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other676.is_group_bulk_load_paused); - __isset = std::move(other676.__isset); -} -bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other677) -{ - err = other677.err; - pid = other677.pid; - app_name = other677.app_name; - primary_bulk_load_status = other677.primary_bulk_load_status; - group_bulk_load_state = other677.group_bulk_load_state; - metadata = other677.metadata; - total_download_progress = other677.total_download_progress; - is_group_ingestion_finished = other677.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other677.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other677.is_group_bulk_load_paused; - __isset = other677.__isset; + std::move(other684.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other684.is_group_bulk_load_paused); + __isset = std::move(other684.__isset); +} +bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other685) +{ + err = other685.err; + pid = other685.pid; + app_name = other685.app_name; + primary_bulk_load_status = other685.primary_bulk_load_status; + group_bulk_load_state = other685.group_bulk_load_state; + metadata = other685.metadata; + total_download_progress = other685.total_download_progress; + is_group_ingestion_finished = other685.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other685.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other685.is_group_bulk_load_paused; + __isset = other685.__isset; return *this; } -bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other678) +bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other686) { - err = std::move(other678.err); - pid = std::move(other678.pid); - app_name = std::move(other678.app_name); - primary_bulk_load_status = std::move(other678.primary_bulk_load_status); - group_bulk_load_state = std::move(other678.group_bulk_load_state); - metadata = std::move(other678.metadata); - total_download_progress = std::move(other678.total_download_progress); - is_group_ingestion_finished = std::move(other678.is_group_ingestion_finished); + err = std::move(other686.err); + pid = std::move(other686.pid); + app_name = std::move(other686.app_name); + primary_bulk_load_status = std::move(other686.primary_bulk_load_status); + group_bulk_load_state = std::move(other686.group_bulk_load_state); + metadata = std::move(other686.metadata); + total_download_progress = std::move(other686.total_download_progress); + is_group_ingestion_finished = std::move(other686.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other678.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other678.is_group_bulk_load_paused); - __isset = std::move(other678.__isset); + std::move(other686.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other686.is_group_bulk_load_paused); + __isset = std::move(other686.__isset); return *this; } void bulk_load_response::printTo(std::ostream &out) const @@ -16110,9 +16394,9 @@ uint32_t group_bulk_load_request::read(::apache::thrift::protocol::TProtocol *ip break; case 6: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast679; - xfer += iprot->readI32(ecast679); - this->meta_bulk_load_status = (bulk_load_status::type)ecast679; + int32_t ecast687; + xfer += iprot->readI32(ecast687); + this->meta_bulk_load_status = (bulk_load_status::type)ecast687; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16177,46 +16461,46 @@ void swap(group_bulk_load_request &a, group_bulk_load_request &b) swap(a.__isset, b.__isset); } -group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other680) +group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other688) { - app_name = other680.app_name; - target_address = other680.target_address; - config = other680.config; - provider_name = other680.provider_name; - cluster_name = other680.cluster_name; - meta_bulk_load_status = other680.meta_bulk_load_status; - __isset = other680.__isset; + app_name = other688.app_name; + target_address = other688.target_address; + config = other688.config; + provider_name = other688.provider_name; + cluster_name = other688.cluster_name; + meta_bulk_load_status = other688.meta_bulk_load_status; + __isset = other688.__isset; } -group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other681) +group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other689) { - app_name = std::move(other681.app_name); - target_address = std::move(other681.target_address); - config = std::move(other681.config); - provider_name = std::move(other681.provider_name); - cluster_name = std::move(other681.cluster_name); - meta_bulk_load_status = std::move(other681.meta_bulk_load_status); - __isset = std::move(other681.__isset); + app_name = std::move(other689.app_name); + target_address = std::move(other689.target_address); + config = std::move(other689.config); + provider_name = std::move(other689.provider_name); + cluster_name = std::move(other689.cluster_name); + meta_bulk_load_status = std::move(other689.meta_bulk_load_status); + __isset = std::move(other689.__isset); } -group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other682) +group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other690) { - app_name = other682.app_name; - target_address = other682.target_address; - config = other682.config; - provider_name = other682.provider_name; - cluster_name = other682.cluster_name; - meta_bulk_load_status = other682.meta_bulk_load_status; - __isset = other682.__isset; + app_name = other690.app_name; + target_address = other690.target_address; + config = other690.config; + provider_name = other690.provider_name; + cluster_name = other690.cluster_name; + meta_bulk_load_status = other690.meta_bulk_load_status; + __isset = other690.__isset; return *this; } -group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other683) +group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other691) { - app_name = std::move(other683.app_name); - target_address = std::move(other683.target_address); - config = std::move(other683.config); - provider_name = std::move(other683.provider_name); - cluster_name = std::move(other683.cluster_name); - meta_bulk_load_status = std::move(other683.meta_bulk_load_status); - __isset = std::move(other683.__isset); + app_name = std::move(other691.app_name); + target_address = std::move(other691.target_address); + config = std::move(other691.config); + provider_name = std::move(other691.provider_name); + cluster_name = std::move(other691.cluster_name); + meta_bulk_load_status = std::move(other691.meta_bulk_load_status); + __isset = std::move(other691.__isset); return *this; } void group_bulk_load_request::printTo(std::ostream &out) const @@ -16280,9 +16564,9 @@ uint32_t group_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast684; - xfer += iprot->readI32(ecast684); - this->status = (bulk_load_status::type)ecast684; + int32_t ecast692; + xfer += iprot->readI32(ecast692); + this->status = (bulk_load_status::type)ecast692; this->__isset.status = true; } else { xfer += iprot->skip(ftype); @@ -16340,35 +16624,35 @@ void swap(group_bulk_load_response &a, group_bulk_load_response &b) swap(a.__isset, b.__isset); } -group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other685) +group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other693) { - err = other685.err; - status = other685.status; - bulk_load_state = other685.bulk_load_state; - __isset = other685.__isset; + err = other693.err; + status = other693.status; + bulk_load_state = other693.bulk_load_state; + __isset = other693.__isset; } -group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other686) +group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other694) { - err = std::move(other686.err); - status = std::move(other686.status); - bulk_load_state = std::move(other686.bulk_load_state); - __isset = std::move(other686.__isset); + err = std::move(other694.err); + status = std::move(other694.status); + bulk_load_state = std::move(other694.bulk_load_state); + __isset = std::move(other694.__isset); } group_bulk_load_response &group_bulk_load_response:: -operator=(const group_bulk_load_response &other687) +operator=(const group_bulk_load_response &other695) { - err = other687.err; - status = other687.status; - bulk_load_state = other687.bulk_load_state; - __isset = other687.__isset; + err = other695.err; + status = other695.status; + bulk_load_state = other695.bulk_load_state; + __isset = other695.__isset; return *this; } -group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other688) +group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other696) { - err = std::move(other688.err); - status = std::move(other688.status); - bulk_load_state = std::move(other688.bulk_load_state); - __isset = std::move(other688.__isset); + err = std::move(other696.err); + status = std::move(other696.status); + bulk_load_state = std::move(other696.bulk_load_state); + __isset = std::move(other696.__isset); return *this; } void group_bulk_load_response::printTo(std::ostream &out) const @@ -16463,30 +16747,30 @@ void swap(ingestion_request &a, ingestion_request &b) swap(a.__isset, b.__isset); } -ingestion_request::ingestion_request(const ingestion_request &other689) +ingestion_request::ingestion_request(const ingestion_request &other697) { - app_name = other689.app_name; - metadata = other689.metadata; - __isset = other689.__isset; + app_name = other697.app_name; + metadata = other697.metadata; + __isset = other697.__isset; } -ingestion_request::ingestion_request(ingestion_request &&other690) +ingestion_request::ingestion_request(ingestion_request &&other698) { - app_name = std::move(other690.app_name); - metadata = std::move(other690.metadata); - __isset = std::move(other690.__isset); + app_name = std::move(other698.app_name); + metadata = std::move(other698.metadata); + __isset = std::move(other698.__isset); } -ingestion_request &ingestion_request::operator=(const ingestion_request &other691) +ingestion_request &ingestion_request::operator=(const ingestion_request &other699) { - app_name = other691.app_name; - metadata = other691.metadata; - __isset = other691.__isset; + app_name = other699.app_name; + metadata = other699.metadata; + __isset = other699.__isset; return *this; } -ingestion_request &ingestion_request::operator=(ingestion_request &&other692) +ingestion_request &ingestion_request::operator=(ingestion_request &&other700) { - app_name = std::move(other692.app_name); - metadata = std::move(other692.metadata); - __isset = std::move(other692.__isset); + app_name = std::move(other700.app_name); + metadata = std::move(other700.metadata); + __isset = std::move(other700.__isset); return *this; } void ingestion_request::printTo(std::ostream &out) const @@ -16579,30 +16863,30 @@ void swap(ingestion_response &a, ingestion_response &b) swap(a.__isset, b.__isset); } -ingestion_response::ingestion_response(const ingestion_response &other693) +ingestion_response::ingestion_response(const ingestion_response &other701) { - err = other693.err; - rocksdb_error = other693.rocksdb_error; - __isset = other693.__isset; + err = other701.err; + rocksdb_error = other701.rocksdb_error; + __isset = other701.__isset; } -ingestion_response::ingestion_response(ingestion_response &&other694) +ingestion_response::ingestion_response(ingestion_response &&other702) { - err = std::move(other694.err); - rocksdb_error = std::move(other694.rocksdb_error); - __isset = std::move(other694.__isset); + err = std::move(other702.err); + rocksdb_error = std::move(other702.rocksdb_error); + __isset = std::move(other702.__isset); } -ingestion_response &ingestion_response::operator=(const ingestion_response &other695) +ingestion_response &ingestion_response::operator=(const ingestion_response &other703) { - err = other695.err; - rocksdb_error = other695.rocksdb_error; - __isset = other695.__isset; + err = other703.err; + rocksdb_error = other703.rocksdb_error; + __isset = other703.__isset; return *this; } -ingestion_response &ingestion_response::operator=(ingestion_response &&other696) +ingestion_response &ingestion_response::operator=(ingestion_response &&other704) { - err = std::move(other696.err); - rocksdb_error = std::move(other696.rocksdb_error); - __isset = std::move(other696.__isset); + err = std::move(other704.err); + rocksdb_error = std::move(other704.rocksdb_error); + __isset = std::move(other704.__isset); return *this; } void ingestion_response::printTo(std::ostream &out) const @@ -16653,9 +16937,9 @@ uint32_t control_bulk_load_request::read(::apache::thrift::protocol::TProtocol * break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast697; - xfer += iprot->readI32(ecast697); - this->type = (bulk_load_control_type::type)ecast697; + int32_t ecast705; + xfer += iprot->readI32(ecast705); + this->type = (bulk_load_control_type::type)ecast705; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -16700,32 +16984,32 @@ void swap(control_bulk_load_request &a, control_bulk_load_request &b) swap(a.__isset, b.__isset); } -control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other698) +control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other706) { - app_name = other698.app_name; - type = other698.type; - __isset = other698.__isset; + app_name = other706.app_name; + type = other706.type; + __isset = other706.__isset; } -control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other699) +control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other707) { - app_name = std::move(other699.app_name); - type = std::move(other699.type); - __isset = std::move(other699.__isset); + app_name = std::move(other707.app_name); + type = std::move(other707.type); + __isset = std::move(other707.__isset); } control_bulk_load_request &control_bulk_load_request:: -operator=(const control_bulk_load_request &other700) +operator=(const control_bulk_load_request &other708) { - app_name = other700.app_name; - type = other700.type; - __isset = other700.__isset; + app_name = other708.app_name; + type = other708.type; + __isset = other708.__isset; return *this; } control_bulk_load_request &control_bulk_load_request:: -operator=(control_bulk_load_request &&other701) +operator=(control_bulk_load_request &&other709) { - app_name = std::move(other701.app_name); - type = std::move(other701.type); - __isset = std::move(other701.__isset); + app_name = std::move(other709.app_name); + type = std::move(other709.type); + __isset = std::move(other709.__isset); return *this; } void control_bulk_load_request::printTo(std::ostream &out) const @@ -16823,32 +17107,32 @@ void swap(control_bulk_load_response &a, control_bulk_load_response &b) swap(a.__isset, b.__isset); } -control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other702) +control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other710) { - err = other702.err; - hint_msg = other702.hint_msg; - __isset = other702.__isset; + err = other710.err; + hint_msg = other710.hint_msg; + __isset = other710.__isset; } -control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other703) +control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other711) { - err = std::move(other703.err); - hint_msg = std::move(other703.hint_msg); - __isset = std::move(other703.__isset); + err = std::move(other711.err); + hint_msg = std::move(other711.hint_msg); + __isset = std::move(other711.__isset); } control_bulk_load_response &control_bulk_load_response:: -operator=(const control_bulk_load_response &other704) +operator=(const control_bulk_load_response &other712) { - err = other704.err; - hint_msg = other704.hint_msg; - __isset = other704.__isset; + err = other712.err; + hint_msg = other712.hint_msg; + __isset = other712.__isset; return *this; } control_bulk_load_response &control_bulk_load_response:: -operator=(control_bulk_load_response &&other705) +operator=(control_bulk_load_response &&other713) { - err = std::move(other705.err); - hint_msg = std::move(other705.hint_msg); - __isset = std::move(other705.__isset); + err = std::move(other713.err); + hint_msg = std::move(other713.hint_msg); + __isset = std::move(other713.__isset); return *this; } void control_bulk_load_response::printTo(std::ostream &out) const @@ -16927,26 +17211,26 @@ void swap(query_bulk_load_request &a, query_bulk_load_request &b) swap(a.__isset, b.__isset); } -query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other706) +query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other714) { - app_name = other706.app_name; - __isset = other706.__isset; + app_name = other714.app_name; + __isset = other714.__isset; } -query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other707) +query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other715) { - app_name = std::move(other707.app_name); - __isset = std::move(other707.__isset); + app_name = std::move(other715.app_name); + __isset = std::move(other715.__isset); } -query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other708) +query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other716) { - app_name = other708.app_name; - __isset = other708.__isset; + app_name = other716.app_name; + __isset = other716.__isset; return *this; } -query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other709) +query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other717) { - app_name = std::move(other709.app_name); - __isset = std::move(other709.__isset); + app_name = std::move(other717.app_name); + __isset = std::move(other717.__isset); return *this; } void query_bulk_load_request::printTo(std::ostream &out) const @@ -17028,9 +17312,9 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast710; - xfer += iprot->readI32(ecast710); - this->app_status = (bulk_load_status::type)ecast710; + int32_t ecast718; + xfer += iprot->readI32(ecast718); + this->app_status = (bulk_load_status::type)ecast718; this->__isset.app_status = true; } else { xfer += iprot->skip(ftype); @@ -17040,15 +17324,15 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->partitions_status.clear(); - uint32_t _size711; - ::apache::thrift::protocol::TType _etype714; - xfer += iprot->readListBegin(_etype714, _size711); - this->partitions_status.resize(_size711); - uint32_t _i715; - for (_i715 = 0; _i715 < _size711; ++_i715) { - int32_t ecast716; - xfer += iprot->readI32(ecast716); - this->partitions_status[_i715] = (bulk_load_status::type)ecast716; + uint32_t _size719; + ::apache::thrift::protocol::TType _etype722; + xfer += iprot->readListBegin(_etype722, _size719); + this->partitions_status.resize(_size719); + uint32_t _i723; + for (_i723 = 0; _i723 < _size719; ++_i723) { + int32_t ecast724; + xfer += iprot->readI32(ecast724); + this->partitions_status[_i723] = (bulk_load_status::type)ecast724; } xfer += iprot->readListEnd(); } @@ -17069,25 +17353,25 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->bulk_load_states.clear(); - uint32_t _size717; - ::apache::thrift::protocol::TType _etype720; - xfer += iprot->readListBegin(_etype720, _size717); - this->bulk_load_states.resize(_size717); - uint32_t _i721; - for (_i721 = 0; _i721 < _size717; ++_i721) { + uint32_t _size725; + ::apache::thrift::protocol::TType _etype728; + xfer += iprot->readListBegin(_etype728, _size725); + this->bulk_load_states.resize(_size725); + uint32_t _i729; + for (_i729 = 0; _i729 < _size725; ++_i729) { { - this->bulk_load_states[_i721].clear(); - uint32_t _size722; - ::apache::thrift::protocol::TType _ktype723; - ::apache::thrift::protocol::TType _vtype724; - xfer += iprot->readMapBegin(_ktype723, _vtype724, _size722); - uint32_t _i726; - for (_i726 = 0; _i726 < _size722; ++_i726) { - ::dsn::rpc_address _key727; - xfer += _key727.read(iprot); - partition_bulk_load_state &_val728 = - this->bulk_load_states[_i721][_key727]; - xfer += _val728.read(iprot); + this->bulk_load_states[_i729].clear(); + uint32_t _size730; + ::apache::thrift::protocol::TType _ktype731; + ::apache::thrift::protocol::TType _vtype732; + xfer += iprot->readMapBegin(_ktype731, _vtype732, _size730); + uint32_t _i734; + for (_i734 = 0; _i734 < _size730; ++_i734) { + ::dsn::rpc_address _key735; + xfer += _key735.read(iprot); + partition_bulk_load_state &_val736 = + this->bulk_load_states[_i729][_key735]; + xfer += _val736.read(iprot); } xfer += iprot->readMapEnd(); } @@ -17141,10 +17425,10 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_I32, static_cast(this->partitions_status.size())); - std::vector::const_iterator _iter729; - for (_iter729 = this->partitions_status.begin(); _iter729 != this->partitions_status.end(); - ++_iter729) { - xfer += oprot->writeI32((int32_t)(*_iter729)); + std::vector::const_iterator _iter737; + for (_iter737 = this->partitions_status.begin(); _iter737 != this->partitions_status.end(); + ++_iter737) { + xfer += oprot->writeI32((int32_t)(*_iter737)); } xfer += oprot->writeListEnd(); } @@ -17159,17 +17443,17 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * xfer += oprot->writeListBegin(::apache::thrift::protocol::T_MAP, static_cast(this->bulk_load_states.size())); std::vector>::const_iterator - _iter730; - for (_iter730 = this->bulk_load_states.begin(); _iter730 != this->bulk_load_states.end(); - ++_iter730) { + _iter738; + for (_iter738 = this->bulk_load_states.begin(); _iter738 != this->bulk_load_states.end(); + ++_iter738) { { xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, - static_cast((*_iter730).size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter731; - for (_iter731 = (*_iter730).begin(); _iter731 != (*_iter730).end(); ++_iter731) { - xfer += _iter731->first.write(oprot); - xfer += _iter731->second.write(oprot); + static_cast((*_iter738).size())); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter739; + for (_iter739 = (*_iter738).begin(); _iter739 != (*_iter738).end(); ++_iter739) { + xfer += _iter739->first.write(oprot); + xfer += _iter739->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -17201,51 +17485,51 @@ void swap(query_bulk_load_response &a, query_bulk_load_response &b) swap(a.__isset, b.__isset); } -query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other732) +query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other740) { - err = other732.err; - app_name = other732.app_name; - app_status = other732.app_status; - partitions_status = other732.partitions_status; - max_replica_count = other732.max_replica_count; - bulk_load_states = other732.bulk_load_states; - hint_msg = other732.hint_msg; - __isset = other732.__isset; + err = other740.err; + app_name = other740.app_name; + app_status = other740.app_status; + partitions_status = other740.partitions_status; + max_replica_count = other740.max_replica_count; + bulk_load_states = other740.bulk_load_states; + hint_msg = other740.hint_msg; + __isset = other740.__isset; } -query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other733) +query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other741) { - err = std::move(other733.err); - app_name = std::move(other733.app_name); - app_status = std::move(other733.app_status); - partitions_status = std::move(other733.partitions_status); - max_replica_count = std::move(other733.max_replica_count); - bulk_load_states = std::move(other733.bulk_load_states); - hint_msg = std::move(other733.hint_msg); - __isset = std::move(other733.__isset); + err = std::move(other741.err); + app_name = std::move(other741.app_name); + app_status = std::move(other741.app_status); + partitions_status = std::move(other741.partitions_status); + max_replica_count = std::move(other741.max_replica_count); + bulk_load_states = std::move(other741.bulk_load_states); + hint_msg = std::move(other741.hint_msg); + __isset = std::move(other741.__isset); } query_bulk_load_response &query_bulk_load_response:: -operator=(const query_bulk_load_response &other734) -{ - err = other734.err; - app_name = other734.app_name; - app_status = other734.app_status; - partitions_status = other734.partitions_status; - max_replica_count = other734.max_replica_count; - bulk_load_states = other734.bulk_load_states; - hint_msg = other734.hint_msg; - __isset = other734.__isset; +operator=(const query_bulk_load_response &other742) +{ + err = other742.err; + app_name = other742.app_name; + app_status = other742.app_status; + partitions_status = other742.partitions_status; + max_replica_count = other742.max_replica_count; + bulk_load_states = other742.bulk_load_states; + hint_msg = other742.hint_msg; + __isset = other742.__isset; return *this; } -query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other735) +query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other743) { - err = std::move(other735.err); - app_name = std::move(other735.app_name); - app_status = std::move(other735.app_status); - partitions_status = std::move(other735.partitions_status); - max_replica_count = std::move(other735.max_replica_count); - bulk_load_states = std::move(other735.bulk_load_states); - hint_msg = std::move(other735.hint_msg); - __isset = std::move(other735.__isset); + err = std::move(other743.err); + app_name = std::move(other743.app_name); + app_status = std::move(other743.app_status); + partitions_status = std::move(other743.partitions_status); + max_replica_count = std::move(other743.max_replica_count); + bulk_load_states = std::move(other743.bulk_load_states); + hint_msg = std::move(other743.hint_msg); + __isset = std::move(other743.__isset); return *this; } void query_bulk_load_response::printTo(std::ostream &out) const @@ -17298,9 +17582,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro switch (fid) { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast736; - xfer += iprot->readI32(ecast736); - this->type = (hotkey_type::type)ecast736; + int32_t ecast744; + xfer += iprot->readI32(ecast744); + this->type = (hotkey_type::type)ecast744; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -17308,9 +17592,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast737; - xfer += iprot->readI32(ecast737); - this->action = (detect_action::type)ecast737; + int32_t ecast745; + xfer += iprot->readI32(ecast745); + this->action = (detect_action::type)ecast745; this->__isset.action = true; } else { xfer += iprot->skip(ftype); @@ -17368,34 +17652,34 @@ void swap(detect_hotkey_request &a, detect_hotkey_request &b) swap(a.__isset, b.__isset); } -detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other738) +detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other746) { - type = other738.type; - action = other738.action; - pid = other738.pid; - __isset = other738.__isset; + type = other746.type; + action = other746.action; + pid = other746.pid; + __isset = other746.__isset; } -detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other739) +detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other747) { - type = std::move(other739.type); - action = std::move(other739.action); - pid = std::move(other739.pid); - __isset = std::move(other739.__isset); + type = std::move(other747.type); + action = std::move(other747.action); + pid = std::move(other747.pid); + __isset = std::move(other747.__isset); } -detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other740) +detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other748) { - type = other740.type; - action = other740.action; - pid = other740.pid; - __isset = other740.__isset; + type = other748.type; + action = other748.action; + pid = other748.pid; + __isset = other748.__isset; return *this; } -detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other741) +detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other749) { - type = std::move(other741.type); - action = std::move(other741.action); - pid = std::move(other741.pid); - __isset = std::move(other741.__isset); + type = std::move(other749.type); + action = std::move(other749.action); + pid = std::move(other749.pid); + __isset = std::move(other749.__isset); return *this; } void detect_hotkey_request::printTo(std::ostream &out) const @@ -17495,30 +17779,30 @@ void swap(detect_hotkey_response &a, detect_hotkey_response &b) swap(a.__isset, b.__isset); } -detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other742) +detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other750) { - err = other742.err; - err_hint = other742.err_hint; - __isset = other742.__isset; + err = other750.err; + err_hint = other750.err_hint; + __isset = other750.__isset; } -detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other743) +detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other751) { - err = std::move(other743.err); - err_hint = std::move(other743.err_hint); - __isset = std::move(other743.__isset); + err = std::move(other751.err); + err_hint = std::move(other751.err_hint); + __isset = std::move(other751.__isset); } -detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other744) +detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other752) { - err = other744.err; - err_hint = other744.err_hint; - __isset = other744.__isset; + err = other752.err; + err_hint = other752.err_hint; + __isset = other752.__isset; return *this; } -detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other745) +detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other753) { - err = std::move(other745.err); - err_hint = std::move(other745.err_hint); - __isset = std::move(other745.__isset); + err = std::move(other753.err); + err_hint = std::move(other753.err_hint); + __isset = std::move(other753.__isset); return *this; } void detect_hotkey_response::printTo(std::ostream &out) const diff --git a/src/replica/replica.h b/src/replica/replica.h index a031aad721..b479f43034 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -423,7 +423,7 @@ class replica : public serverlet, public ref_counter, public replica_ba replica_stub *_stub; std::string _dir; replication_options *_options; - const app_info _app_info; + app_info _app_info; std::map _extra_envs; // uniq timestamp generator for this replica. diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index fe126db68c..8d1544dcb2 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2709,6 +2709,19 @@ void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc) } } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_stub::on_update_child_group_partition_count(update_child_group_partition_count_rpc rpc) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + replica_ptr replica = get_replica(request.child_pid); + if (replica != nullptr) { + replica->get_split_manager()->on_update_child_group_partition_count(request, response); + } else { + response.err = ERR_OBJECT_NOT_FOUND; + } +} + void replica_stub::update_disk_holding_replicas() { for (const auto &dir_node : _fs_manager._dir_nodes) { diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 634fef9bc0..cf1b3054d5 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -55,6 +55,9 @@ typedef rpc_holder copy_checkpoint_rpc; typedef rpc_holder query_disk_info_rpc; typedef rpc_holder query_app_info_rpc; typedef rpc_holder notify_catch_up_rpc; +typedef rpc_holder + update_child_group_partition_count_rpc; typedef rpc_holder group_bulk_load_rpc; typedef rpc_holder detect_hotkey_rpc; @@ -127,12 +130,6 @@ class replica_stub : public serverlet, public ref_counter void on_copy_checkpoint(copy_checkpoint_rpc rpc); void on_group_bulk_load(group_bulk_load_rpc rpc); - // - // functions while executing partition split - // - // on primary, child notify itself has been caught up parent - void on_notify_primary_split_catch_up(notify_catch_up_rpc rpc); - // // local messages // @@ -207,6 +204,12 @@ class replica_stub : public serverlet, public ref_counter // This function is used for partition split error handler void split_replica_error_handler(gpid pid, local_execution handler); + // on primary parent partition, child notify itself has been caught up parent + void on_notify_primary_split_catch_up(notify_catch_up_rpc rpc); + + // on child partition, update new partition count + void on_update_child_group_partition_count(update_child_group_partition_count_rpc rpc); + // TODO: (Tangyanzhao) add some comments void on_detect_hotkey(detect_hotkey_rpc rpc); diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index ad8e0bd574..dc9357475a 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -632,8 +632,7 @@ void replica_split_manager::parent_check_sync_point_commit(decree sync_point) // sync_point, _replica->_app->last_committed_decree()); if (_replica->_app->last_committed_decree() >= sync_point) { - // TODO(heyuchen): TBD - // update child replica group partition_count + update_child_group_partition_count(_replica->_app_info.partition_count * 2); } else { dwarn_replica("sync_point has not been committed, please wait and retry"); tasking::enqueue( @@ -645,9 +644,242 @@ void replica_split_manager::parent_check_sync_point_commit(decree sync_point) // } } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::update_child_group_partition_count( + int new_partition_count) // on primary parent +{ + if (status() != partition_status::PS_PRIMARY || _split_status != split_status::SPLITTING) { + derror_replica( + "wrong partition status or wrong split status, partition_status={}, split_status={}", + enum_to_string(status()), + enum_to_string(_split_status)); + _stub->split_replica_error_handler( + _child_gpid, + std::bind(&replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "update_child_group_partition_count failed, " + "wrong partition status or split status")); + _replica->_primary_states.sync_send_write_request = false; + parent_cleanup_split_context(); + return; + } + + if (!_replica->_primary_states.learners.empty() || + _replica->_primary_states.membership.secondaries.size() + 1 < + _replica->_primary_states.membership.max_replica_count) { + derror_replica("there are {} learners or not have enough secondaries(count is {})", + _replica->_primary_states.learners.size(), + _replica->_primary_states.membership.secondaries.size()); + _stub->split_replica_error_handler( + _child_gpid, + std::bind( + &replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "update_child_group_partition_count failed, have learner or lack of secondary")); + _replica->_primary_states.sync_send_write_request = false; + parent_cleanup_split_context(); + return; + } + + auto not_replied_addresses = std::make_shared>(); + // _primary_states.statuses is a map structure: rpc address -> partition_status + for (const auto &kv : _replica->_primary_states.statuses) { + not_replied_addresses->insert(kv.first); + } + for (const auto &iter : _replica->_primary_states.statuses) { + parent_send_update_partition_count_request( + iter.first, new_partition_count, not_replied_addresses); + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::parent_send_update_partition_count_request( + const rpc_address &address, + int32_t new_partition_count, + std::shared_ptr> ¬_replied_addresses) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_parent_update_partition_count_request", [](dsn::string_view) {}); + + dcheck_eq_replica(status(), partition_status::PS_PRIMARY); + + auto request = make_unique(); + request->new_partition_count = new_partition_count; + request->target_address = address; + request->child_pid = _child_gpid; + request->ballot = get_ballot(); + + ddebug_replica( + "send update child group partition count request to node({}), new partition_count = {}", + address.to_string(), + new_partition_count); + update_child_group_partition_count_rpc rpc(std::move(request), + RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT, + 0_ms, + 0, + get_gpid().thread_hash()); + rpc.call(address, tracker(), [this, rpc, not_replied_addresses](error_code ec) mutable { + on_update_child_group_partition_count_reply( + ec, rpc.request(), rpc.response(), not_replied_addresses); + }); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::on_update_child_group_partition_count( + const update_child_group_partition_count_request &request, + update_child_group_partition_count_response &response) // on child partition +{ + if (request.ballot != get_ballot() || !_replica->_split_states.is_caught_up) { + derror_replica( + "receive outdated update child group_partition_count_request, request ballot={}, " + "local ballot={}, is_caught_up={}", + request.ballot, + get_ballot(), + _replica->_split_states.is_caught_up); + response.err = ERR_VERSION_OUTDATED; + return; + } + + if (_replica->_app_info.partition_count == request.new_partition_count && + _partition_version.load() == request.new_partition_count - 1) { + dwarn_replica("receive repeated update child group_partition_count_request, " + "partition_count = {}, ignore it", + request.new_partition_count); + response.err = ERR_OK; + return; + } + + dcheck_eq_replica(_replica->_app_info.partition_count * 2, request.new_partition_count); + update_local_partition_count(request.new_partition_count); + response.err = ERR_OK; +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::update_local_partition_count( + int32_t new_partition_count) // on all partitions +{ + // update _app_info and partition_version + auto info = _replica->_app_info; + // if app has not been split before, init_partition_count = -1 + // we should set init_partition_count to old_partition_count + if (info.init_partition_count < 0) { + info.init_partition_count = info.partition_count; + } + auto old_partition_count = info.partition_count; + info.partition_count = new_partition_count; + + replica_app_info new_info((app_info *)&info); + std::string info_path = utils::filesystem::path_combine(_replica->_dir, ".app-info"); + auto err = new_info.store(info_path.c_str()); + if (err != ERR_OK) { + info.partition_count = old_partition_count; + dassert_replica(false, "failed to save app_info to {}, error = {}", info_path, err); + return; + } + + _replica->_app_info = info; + ddebug_replica("update partition_count from {} to {}", + old_partition_count, + _replica->_app_info.partition_count); + + _replica->_app->set_partition_version(_replica->_app_info.partition_count - 1); + _partition_version.store(_replica->_app_info.partition_count - 1); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::on_update_child_group_partition_count_reply( + error_code ec, + const update_child_group_partition_count_request &request, + const update_child_group_partition_count_response &response, + std::shared_ptr> ¬_replied_addresses) // on primary parent +{ + _replica->_checker.only_one_thread_access(); + + if (status() != partition_status::PS_PRIMARY || _split_status != split_status::SPLITTING) { + derror_replica( + "wrong partition status or wrong split status, partition_status={}, split_status={}", + enum_to_string(status()), + enum_to_string(_split_status)); + _stub->split_replica_error_handler( + _child_gpid, + std::bind(&replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "on_update_child_group_partition_count_reply " + "failed, wrong partition status or split " + "status")); + _replica->_primary_states.sync_send_write_request = false; + parent_cleanup_split_context(); + return; + } + + if (request.ballot != get_ballot()) { + derror_replica( + "ballot changed, request ballot = {}, local ballot = {}", request.ballot, get_ballot()); + _stub->split_replica_error_handler( + _child_gpid, + std::bind(&replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "on_update_child_group_partition_count_reply failed, ballot changed")); + _replica->_primary_states.sync_send_write_request = false; + parent_cleanup_split_context(); + return; + } + + error_code error = (ec == ERR_OK) ? response.err : ec; + if (error == ERR_TIMEOUT) { + dwarn_replica("failed to update child node({}) partition_count, error = {}, wait and retry", + request.target_address.to_string(), + error); + tasking::enqueue( + LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_split_manager::parent_send_update_partition_count_request, + this, + request.target_address, + request.new_partition_count, + not_replied_addresses), + get_gpid().thread_hash(), + std::chrono::seconds(1)); + return; + } + + if (error != ERR_OK) { + derror_replica("failed to update child node({}) partition_count({}), error = {}", + request.target_address.to_string(), + request.new_partition_count, + error); + _stub->split_replica_error_handler( + _child_gpid, + std::bind(&replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "on_update_child_group_partition_count_reply error")); + _replica->_primary_states.sync_send_write_request = false; + parent_cleanup_split_context(); + return; + } + + ddebug_replica("update node({}) child({}) partition_count({}) succeed", + request.target_address.to_string(), + request.child_pid, + request.new_partition_count); + + // update group partition_count succeed + not_replied_addresses->erase(request.target_address); + if (not_replied_addresses->empty()) { + ddebug_replica("update child({}) group partition_count, new_partition_count = {}", + request.child_pid, + request.new_partition_count); + register_child_on_meta(get_ballot()); + } else { + ddebug_replica("there are still {} replica not update partition count in child group", + not_replied_addresses->size()); + } +} + // ThreadPool: THREAD_POOL_REPLICATION void replica_split_manager::register_child_on_meta(ballot b) // on primary parent { + FAIL_POINT_INJECT_F("replica_register_child_on_meta", [](dsn::string_view) {}); + if (status() != partition_status::PS_PRIMARY || _split_status != split_status::SPLITTING) { derror_replica( "wrong partition status or wrong split status, partition_status={}, split_status={}", diff --git a/src/replica/split/replica_split_manager.h b/src/replica/split/replica_split_manager.h index 200e6c7882..f9e4839210 100644 --- a/src/replica/split/replica_split_manager.h +++ b/src/replica/split/replica_split_manager.h @@ -82,6 +82,28 @@ class replica_split_manager : replica_base // sync_point is the first decree after parent send write request to child synchronously void parent_check_sync_point_commit(decree sync_point); + // primary parent update child group partition count + void update_child_group_partition_count(int32_t new_partition_count); + + void parent_send_update_partition_count_request( + const rpc_address &address, + int32_t new_partition_count, + std::shared_ptr> ¬_replied_addresses); + + // child update its partition_count + void + on_update_child_group_partition_count(const update_child_group_partition_count_request &request, + update_child_group_partition_count_response &response); + + void on_update_child_group_partition_count_reply( + error_code ec, + const update_child_group_partition_count_request &request, + const update_child_group_partition_count_response &response, + std::shared_ptr> ¬_replied_addresses); + + // all replicas update partition_count in memory and disk + void update_local_partition_count(int32_t new_partition_count); + // primary parent register children on meta_server void register_child_on_meta(ballot b); void on_register_child_on_meta_reply(error_code ec, diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index e469d223f9..5cb9d3cd08 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -165,6 +165,29 @@ class replica_split_test : public replica_test_base mock_mutation_list(min_decree); } + void mock_parent_primary_configuration(bool lack_of_secondary = false) + { + partition_configuration config; + config.max_replica_count = 3; + config.pid = PARENT_GPID; + config.ballot = INIT_BALLOT; + config.primary = PRIMARY; + config.secondaries.emplace_back(SECONDARY); + if (!lack_of_secondary) { + config.secondaries.emplace_back(SECONDARY2); + } + _parent_replica->set_primary_partition_configuration(config); + } + + void mock_update_child_partition_count_request(update_child_group_partition_count_request &req, + ballot b) + { + req.child_pid = CHILD_GPID; + req.ballot = b; + req.target_address = PRIMARY; + req.new_partition_count = NEW_PARTITION_COUNT; + } + /// test functions void test_parent_start_split(ballot b, gpid req_child_gpid, split_status::type status) { @@ -245,6 +268,40 @@ class replica_split_test : public replica_test_base return resp.err; } + void test_update_child_group_partition_count() + { + _parent_split_mgr->update_child_group_partition_count(NEW_PARTITION_COUNT); + _parent_replica->tracker()->wait_outstanding_tasks(); + _child_replica->tracker()->wait_outstanding_tasks(); + } + + error_code test_on_update_child_group_partition_count(ballot b) + { + update_child_group_partition_count_request req; + mock_update_child_partition_count_request(req, b); + + update_child_group_partition_count_response resp; + _child_split_mgr->on_update_child_group_partition_count(req, resp); + _child_replica->tracker()->wait_outstanding_tasks(); + return resp.err; + } + + error_code test_on_update_child_group_partition_count_reply(error_code resp_err) + { + update_child_group_partition_count_request req; + mock_update_child_partition_count_request(req, INIT_BALLOT); + update_child_group_partition_count_response resp; + resp.err = resp_err; + auto not_replied_addresses = std::make_shared>(); + not_replied_addresses->insert(PRIMARY); + + _parent_split_mgr->on_update_child_group_partition_count_reply( + ERR_OK, req, resp, not_replied_addresses); + _parent_replica->tracker()->wait_outstanding_tasks(); + _child_replica->tracker()->wait_outstanding_tasks(); + return resp.err; + } + void test_register_child_on_meta() { parent_set_split_status(split_status::SPLITTING); @@ -516,6 +573,103 @@ TEST_F(replica_split_test, parent_handle_catch_up_test) } } +// update_child_group_partition_count tests +TEST_F(replica_split_test, update_child_group_partition_count_test) +{ + fail::cfg("replica_parent_update_partition_count_request", "return()"); + generate_child(); + + // Test cases: + // - wrong split status + // - primary has learner + // - update child group partition count succeed + struct update_child_group_partition_count_test + { + split_status::type parent_split_status; + bool parent_has_learner; + partition_status::type expected_child_status; + bool expected_sync_send_write_request; + bool is_parent_not_in_split; + + } tests[] = { + {split_status::NOT_SPLIT, false, partition_status::PS_ERROR, false, true}, + {split_status::SPLITTING, true, partition_status::PS_ERROR, false, true}, + {split_status::SPLITTING, false, partition_status::PS_PARTITION_SPLIT, true, false}, + }; + for (auto test : tests) { + mock_child_split_context(true, true); + mock_parent_primary_configuration(test.parent_has_learner); + mock_primary_parent_split_context(true); + parent_set_split_status(test.parent_split_status); + + test_update_child_group_partition_count(); + ASSERT_EQ(_child_replica->status(), test.expected_child_status); + ASSERT_EQ(parent_sync_send_write_request(), test.expected_sync_send_write_request); + ASSERT_EQ(is_parent_not_in_split(), test.is_parent_not_in_split); + } +} + +// on_update_child_group_partition_count tests +TEST_F(replica_split_test, child_update_partition_count_test) +{ + ballot WRONG_BALLOT = INIT_BALLOT + 1; + generate_child(); + + // Test cases: + // - request has wrong ballot + // - child not caught up + // - child update partition count succeed + struct on_update_child_partition_count_test + { + ballot req_ballot; + bool caught_up; + error_code expected_err; + int32_t expected_partition_version; + } tests[] = {{WRONG_BALLOT, true, ERR_VERSION_OUTDATED, OLD_PARTITION_COUNT - 1}, + {INIT_BALLOT, false, ERR_VERSION_OUTDATED, OLD_PARTITION_COUNT - 1}, + {INIT_BALLOT, true, ERR_OK, NEW_PARTITION_COUNT - 1}}; + for (auto test : tests) { + mock_child_split_context(true, test.caught_up); + ASSERT_EQ(_child_split_mgr->get_partition_version(), OLD_PARTITION_COUNT - 1); + ASSERT_EQ(test_on_update_child_group_partition_count(test.req_ballot), test.expected_err); + ASSERT_EQ(_child_split_mgr->get_partition_version(), test.expected_partition_version); + } +} + +// on_update_child_group_partition_count_reply tests +TEST_F(replica_split_test, parent_on_update_partition_reply_test) +{ + fail::cfg("replica_register_child_on_meta", "return()"); + generate_child(); + + // Test cases: + // - wrong split status + // - child update partition_count failed + // - child update partition_count succeed + struct on_update_child_partition_count_reply_test + { + split_status::type parent_split_status; + error_code resp_err; + partition_status::type expected_child_status; + bool expected_sync_send_write_request; + bool is_parent_not_in_split; + } tests[] = { + {split_status::NOT_SPLIT, ERR_OK, partition_status::PS_ERROR, false, true}, + {split_status::SPLITTING, ERR_VERSION_OUTDATED, partition_status::PS_ERROR, false, true}, + {split_status::SPLITTING, ERR_OK, partition_status::PS_PARTITION_SPLIT, true, false}, + }; + for (auto test : tests) { + mock_primary_parent_split_context(true); + parent_set_split_status(test.parent_split_status); + mock_child_split_context(true, true); + + test_on_update_child_group_partition_count_reply(test.resp_err); + ASSERT_EQ(_child_replica->status(), test.expected_child_status); + ASSERT_EQ(parent_sync_send_write_request(), test.expected_sync_send_write_request); + ASSERT_EQ(is_parent_not_in_split(), test.is_parent_not_in_split); + } +} + // register_child test TEST_F(replica_split_test, register_child_test) { diff --git a/src/replication.thrift b/src/replication.thrift index 5a525a4cc5..f8fb89794a 100644 --- a/src/replication.thrift +++ b/src/replication.thrift @@ -875,6 +875,23 @@ struct notify_cacth_up_response 1:dsn.error_code err; } +// primary parent -> child replicas to update partition count +struct update_child_group_partition_count_request +{ + 1:dsn.rpc_address target_address; + 2:i32 new_partition_count; + 3:dsn.gpid child_pid; + 4:i64 ballot; +} + +struct update_child_group_partition_count_response +{ + // Possible errors: + // - ERR_OBJECT_NOT_FOUND: replica can not be found + // - ERR_VERSION_OUTDATED: request is outdated + 1:dsn.error_code err; +} + // primary parent -> meta server, register child on meta_server struct register_child_request {