From 00c5922b34c7ba9971c11c0a3da14c3e13c3c1a0 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 19 Jan 2021 14:55:57 +0800 Subject: [PATCH 1/2] feat(split): add notify_stop_split --- .../dsn/dist/replication/replication.codes.h | 1 + .../dsn/dist/replication/replication_types.h | 120 ++ src/common/replication_common.h | 1 + src/common/replication_types.cpp | 1258 ++++++++++------- src/meta/meta_service.cpp | 18 + src/meta/meta_service.h | 1 + src/meta/meta_split_service.cpp | 83 ++ src/meta/meta_split_service.h | 4 + src/meta/test/meta_split_service_test.cpp | 128 ++ src/replica/split/replica_split_manager.cpp | 21 +- src/replication.thrift | 16 + 11 files changed, 1155 insertions(+), 496 deletions(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index dd813f014b..c58e0be416 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -108,6 +108,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_DDD_DIAGNOSE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_PARTITION_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_PARTITION_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_REGISTER_CHILD_REPLICA, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index d86fb9f376..1a0a176c27 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -477,6 +477,10 @@ class register_child_request; class register_child_response; +class notify_stop_split_request; + +class notify_stop_split_response; + class bulk_load_metadata; class start_bulk_load_request; @@ -6809,6 +6813,122 @@ inline std::ostream &operator<<(std::ostream &out, const register_child_response return out; } +typedef struct _notify_stop_split_request__isset +{ + _notify_stop_split_request__isset() + : app_name(false), parent_gpid(false), meta_split_status(false), partition_count(false) + { + } + bool app_name : 1; + bool parent_gpid : 1; + bool meta_split_status : 1; + bool partition_count : 1; +} _notify_stop_split_request__isset; + +class notify_stop_split_request +{ +public: + notify_stop_split_request(const notify_stop_split_request &); + notify_stop_split_request(notify_stop_split_request &&); + notify_stop_split_request &operator=(const notify_stop_split_request &); + notify_stop_split_request &operator=(notify_stop_split_request &&); + notify_stop_split_request() + : app_name(), meta_split_status((split_status::type)0), partition_count(0) + { + } + + virtual ~notify_stop_split_request() throw(); + std::string app_name; + ::dsn::gpid parent_gpid; + split_status::type meta_split_status; + int32_t partition_count; + + _notify_stop_split_request__isset __isset; + + void __set_app_name(const std::string &val); + + void __set_parent_gpid(const ::dsn::gpid &val); + + void __set_meta_split_status(const split_status::type val); + + void __set_partition_count(const int32_t val); + + bool operator==(const notify_stop_split_request &rhs) const + { + if (!(app_name == rhs.app_name)) + return false; + if (!(parent_gpid == rhs.parent_gpid)) + return false; + if (!(meta_split_status == rhs.meta_split_status)) + return false; + if (!(partition_count == rhs.partition_count)) + return false; + return true; + } + bool operator!=(const notify_stop_split_request &rhs) const { return !(*this == rhs); } + + bool operator<(const notify_stop_split_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(notify_stop_split_request &a, notify_stop_split_request &b); + +inline std::ostream &operator<<(std::ostream &out, const notify_stop_split_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _notify_stop_split_response__isset +{ + _notify_stop_split_response__isset() : err(false) {} + bool err : 1; +} _notify_stop_split_response__isset; + +class notify_stop_split_response +{ +public: + notify_stop_split_response(const notify_stop_split_response &); + notify_stop_split_response(notify_stop_split_response &&); + notify_stop_split_response &operator=(const notify_stop_split_response &); + notify_stop_split_response &operator=(notify_stop_split_response &&); + notify_stop_split_response() {} + + virtual ~notify_stop_split_response() throw(); + ::dsn::error_code err; + + _notify_stop_split_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + bool operator==(const notify_stop_split_response &rhs) const + { + if (!(err == rhs.err)) + return false; + return true; + } + bool operator!=(const notify_stop_split_response &rhs) const { return !(*this == rhs); } + + bool operator<(const notify_stop_split_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(notify_stop_split_response &a, notify_stop_split_response &b); + +inline std::ostream &operator<<(std::ostream &out, const notify_stop_split_response &obj) +{ + obj.printTo(out); + return out; +} + typedef struct _bulk_load_metadata__isset { _bulk_load_metadata__isset() : files(false), file_total_size(false) {} diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 7beec8de5d..6c1612329f 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -42,6 +42,7 @@ typedef rpc_holder query_bulk typedef rpc_holder start_split_rpc; typedef rpc_holder control_split_rpc; +typedef rpc_holder notify_stop_split_rpc; class replication_options { diff --git a/src/common/replication_types.cpp b/src/common/replication_types.cpp index 91e6c9229b..9c1dd54b46 100644 --- a/src/common/replication_types.cpp +++ b/src/common/replication_types.cpp @@ -15787,6 +15787,274 @@ void register_child_response::printTo(std::ostream &out) const out << ")"; } +notify_stop_split_request::~notify_stop_split_request() throw() {} + +void notify_stop_split_request::__set_app_name(const std::string &val) { this->app_name = val; } + +void notify_stop_split_request::__set_parent_gpid(const ::dsn::gpid &val) +{ + this->parent_gpid = val; +} + +void notify_stop_split_request::__set_meta_split_status(const split_status::type val) +{ + this->meta_split_status = val; +} + +void notify_stop_split_request::__set_partition_count(const int32_t val) +{ + this->partition_count = val; +} + +uint32_t notify_stop_split_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->app_name); + this->__isset.app_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->parent_gpid.read(iprot); + this->__isset.parent_gpid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast679; + xfer += iprot->readI32(ecast679); + this->meta_split_status = (split_status::type)ecast679; + this->__isset.meta_split_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->partition_count); + this->__isset.partition_count = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t notify_stop_split_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("notify_stop_split_request"); + + xfer += oprot->writeFieldBegin("app_name", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->app_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("parent_gpid", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->parent_gpid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("meta_split_status", ::apache::thrift::protocol::T_I32, 3); + xfer += oprot->writeI32((int32_t)this->meta_split_status); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("partition_count", ::apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32(this->partition_count); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(notify_stop_split_request &a, notify_stop_split_request &b) +{ + using ::std::swap; + swap(a.app_name, b.app_name); + swap(a.parent_gpid, b.parent_gpid); + swap(a.meta_split_status, b.meta_split_status); + swap(a.partition_count, b.partition_count); + swap(a.__isset, b.__isset); +} + +notify_stop_split_request::notify_stop_split_request(const notify_stop_split_request &other680) +{ + app_name = other680.app_name; + parent_gpid = other680.parent_gpid; + meta_split_status = other680.meta_split_status; + partition_count = other680.partition_count; + __isset = other680.__isset; +} +notify_stop_split_request::notify_stop_split_request(notify_stop_split_request &&other681) +{ + app_name = std::move(other681.app_name); + parent_gpid = std::move(other681.parent_gpid); + meta_split_status = std::move(other681.meta_split_status); + partition_count = std::move(other681.partition_count); + __isset = std::move(other681.__isset); +} +notify_stop_split_request ¬ify_stop_split_request:: +operator=(const notify_stop_split_request &other682) +{ + app_name = other682.app_name; + parent_gpid = other682.parent_gpid; + meta_split_status = other682.meta_split_status; + partition_count = other682.partition_count; + __isset = other682.__isset; + return *this; +} +notify_stop_split_request ¬ify_stop_split_request:: +operator=(notify_stop_split_request &&other683) +{ + app_name = std::move(other683.app_name); + parent_gpid = std::move(other683.parent_gpid); + meta_split_status = std::move(other683.meta_split_status); + partition_count = std::move(other683.partition_count); + __isset = std::move(other683.__isset); + return *this; +} +void notify_stop_split_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "notify_stop_split_request("; + out << "app_name=" << to_string(app_name); + out << ", " + << "parent_gpid=" << to_string(parent_gpid); + out << ", " + << "meta_split_status=" << to_string(meta_split_status); + out << ", " + << "partition_count=" << to_string(partition_count); + out << ")"; +} + +notify_stop_split_response::~notify_stop_split_response() throw() {} + +void notify_stop_split_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +uint32_t notify_stop_split_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t notify_stop_split_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("notify_stop_split_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(notify_stop_split_response &a, notify_stop_split_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.__isset, b.__isset); +} + +notify_stop_split_response::notify_stop_split_response(const notify_stop_split_response &other684) +{ + err = other684.err; + __isset = other684.__isset; +} +notify_stop_split_response::notify_stop_split_response(notify_stop_split_response &&other685) +{ + err = std::move(other685.err); + __isset = std::move(other685.__isset); +} +notify_stop_split_response ¬ify_stop_split_response:: +operator=(const notify_stop_split_response &other686) +{ + err = other686.err; + __isset = other686.__isset; + return *this; +} +notify_stop_split_response ¬ify_stop_split_response:: +operator=(notify_stop_split_response &&other687) +{ + err = std::move(other687.err); + __isset = std::move(other687.__isset); + return *this; +} +void notify_stop_split_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "notify_stop_split_response("; + out << "err=" << to_string(err); + out << ")"; +} + bulk_load_metadata::~bulk_load_metadata() throw() {} void bulk_load_metadata::__set_files(const std::vector &val) { this->files = val; } @@ -15816,13 +16084,13 @@ uint32_t bulk_load_metadata::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_LIST) { { this->files.clear(); - uint32_t _size679; - ::apache::thrift::protocol::TType _etype682; - xfer += iprot->readListBegin(_etype682, _size679); - this->files.resize(_size679); - uint32_t _i683; - for (_i683 = 0; _i683 < _size679; ++_i683) { - xfer += this->files[_i683].read(iprot); + uint32_t _size688; + ::apache::thrift::protocol::TType _etype691; + xfer += iprot->readListBegin(_etype691, _size688); + this->files.resize(_size688); + uint32_t _i692; + for (_i692 = 0; _i692 < _size688; ++_i692) { + xfer += this->files[_i692].read(iprot); } xfer += iprot->readListEnd(); } @@ -15861,9 +16129,9 @@ uint32_t bulk_load_metadata::write(::apache::thrift::protocol::TProtocol *oprot) { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->files.size())); - std::vector::const_iterator _iter684; - for (_iter684 = this->files.begin(); _iter684 != this->files.end(); ++_iter684) { - xfer += (*_iter684).write(oprot); + std::vector::const_iterator _iter693; + for (_iter693 = this->files.begin(); _iter693 != this->files.end(); ++_iter693) { + xfer += (*_iter693).write(oprot); } xfer += oprot->writeListEnd(); } @@ -15886,30 +16154,30 @@ void swap(bulk_load_metadata &a, bulk_load_metadata &b) swap(a.__isset, b.__isset); } -bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other685) +bulk_load_metadata::bulk_load_metadata(const bulk_load_metadata &other694) { - files = other685.files; - file_total_size = other685.file_total_size; - __isset = other685.__isset; + files = other694.files; + file_total_size = other694.file_total_size; + __isset = other694.__isset; } -bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other686) +bulk_load_metadata::bulk_load_metadata(bulk_load_metadata &&other695) { - files = std::move(other686.files); - file_total_size = std::move(other686.file_total_size); - __isset = std::move(other686.__isset); + files = std::move(other695.files); + file_total_size = std::move(other695.file_total_size); + __isset = std::move(other695.__isset); } -bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other687) +bulk_load_metadata &bulk_load_metadata::operator=(const bulk_load_metadata &other696) { - files = other687.files; - file_total_size = other687.file_total_size; - __isset = other687.__isset; + files = other696.files; + file_total_size = other696.file_total_size; + __isset = other696.__isset; return *this; } -bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other688) +bulk_load_metadata &bulk_load_metadata::operator=(bulk_load_metadata &&other697) { - files = std::move(other688.files); - file_total_size = std::move(other688.file_total_size); - __isset = std::move(other688.__isset); + files = std::move(other697.files); + file_total_size = std::move(other697.file_total_size); + __isset = std::move(other697.__isset); return *this; } void bulk_load_metadata::printTo(std::ostream &out) const @@ -16041,38 +16309,38 @@ void swap(start_bulk_load_request &a, start_bulk_load_request &b) swap(a.__isset, b.__isset); } -start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other689) +start_bulk_load_request::start_bulk_load_request(const start_bulk_load_request &other698) { - app_name = other689.app_name; - cluster_name = other689.cluster_name; - file_provider_type = other689.file_provider_type; - remote_root_path = other689.remote_root_path; - __isset = other689.__isset; + app_name = other698.app_name; + cluster_name = other698.cluster_name; + file_provider_type = other698.file_provider_type; + remote_root_path = other698.remote_root_path; + __isset = other698.__isset; } -start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other690) +start_bulk_load_request::start_bulk_load_request(start_bulk_load_request &&other699) { - app_name = std::move(other690.app_name); - cluster_name = std::move(other690.cluster_name); - file_provider_type = std::move(other690.file_provider_type); - remote_root_path = std::move(other690.remote_root_path); - __isset = std::move(other690.__isset); + app_name = std::move(other699.app_name); + cluster_name = std::move(other699.cluster_name); + file_provider_type = std::move(other699.file_provider_type); + remote_root_path = std::move(other699.remote_root_path); + __isset = std::move(other699.__isset); } -start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other691) +start_bulk_load_request &start_bulk_load_request::operator=(const start_bulk_load_request &other700) { - app_name = other691.app_name; - cluster_name = other691.cluster_name; - file_provider_type = other691.file_provider_type; - remote_root_path = other691.remote_root_path; - __isset = other691.__isset; + app_name = other700.app_name; + cluster_name = other700.cluster_name; + file_provider_type = other700.file_provider_type; + remote_root_path = other700.remote_root_path; + __isset = other700.__isset; return *this; } -start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other692) +start_bulk_load_request &start_bulk_load_request::operator=(start_bulk_load_request &&other701) { - app_name = std::move(other692.app_name); - cluster_name = std::move(other692.cluster_name); - file_provider_type = std::move(other692.file_provider_type); - remote_root_path = std::move(other692.remote_root_path); - __isset = std::move(other692.__isset); + app_name = std::move(other701.app_name); + cluster_name = std::move(other701.cluster_name); + file_provider_type = std::move(other701.file_provider_type); + remote_root_path = std::move(other701.remote_root_path); + __isset = std::move(other701.__isset); return *this; } void start_bulk_load_request::printTo(std::ostream &out) const @@ -16169,31 +16437,31 @@ void swap(start_bulk_load_response &a, start_bulk_load_response &b) swap(a.__isset, b.__isset); } -start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other693) +start_bulk_load_response::start_bulk_load_response(const start_bulk_load_response &other702) { - err = other693.err; - hint_msg = other693.hint_msg; - __isset = other693.__isset; + err = other702.err; + hint_msg = other702.hint_msg; + __isset = other702.__isset; } -start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other694) +start_bulk_load_response::start_bulk_load_response(start_bulk_load_response &&other703) { - err = std::move(other694.err); - hint_msg = std::move(other694.hint_msg); - __isset = std::move(other694.__isset); + err = std::move(other703.err); + hint_msg = std::move(other703.hint_msg); + __isset = std::move(other703.__isset); } start_bulk_load_response &start_bulk_load_response:: -operator=(const start_bulk_load_response &other695) +operator=(const start_bulk_load_response &other704) { - err = other695.err; - hint_msg = other695.hint_msg; - __isset = other695.__isset; + err = other704.err; + hint_msg = other704.hint_msg; + __isset = other704.__isset; return *this; } -start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other696) +start_bulk_load_response &start_bulk_load_response::operator=(start_bulk_load_response &&other705) { - err = std::move(other696.err); - hint_msg = std::move(other696.hint_msg); - __isset = std::move(other696.__isset); + err = std::move(other705.err); + hint_msg = std::move(other705.hint_msg); + __isset = std::move(other705.__isset); return *this; } void start_bulk_load_response::printTo(std::ostream &out) const @@ -16275,9 +16543,9 @@ uint32_t partition_bulk_load_state::read(::apache::thrift::protocol::TProtocol * break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast697; - xfer += iprot->readI32(ecast697); - this->ingest_status = (ingestion_status::type)ecast697; + int32_t ecast706; + xfer += iprot->readI32(ecast706); + this->ingest_status = (ingestion_status::type)ecast706; this->__isset.ingest_status = true; } else { xfer += iprot->skip(ftype); @@ -16358,44 +16626,44 @@ void swap(partition_bulk_load_state &a, partition_bulk_load_state &b) swap(a.__isset, b.__isset); } -partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other698) +partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other707) { - download_progress = other698.download_progress; - download_status = other698.download_status; - ingest_status = other698.ingest_status; - is_cleaned_up = other698.is_cleaned_up; - is_paused = other698.is_paused; - __isset = other698.__isset; + download_progress = other707.download_progress; + download_status = other707.download_status; + ingest_status = other707.ingest_status; + is_cleaned_up = other707.is_cleaned_up; + is_paused = other707.is_paused; + __isset = other707.__isset; } -partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other699) +partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other708) { - download_progress = std::move(other699.download_progress); - download_status = std::move(other699.download_status); - ingest_status = std::move(other699.ingest_status); - is_cleaned_up = std::move(other699.is_cleaned_up); - is_paused = std::move(other699.is_paused); - __isset = std::move(other699.__isset); + download_progress = std::move(other708.download_progress); + download_status = std::move(other708.download_status); + ingest_status = std::move(other708.ingest_status); + is_cleaned_up = std::move(other708.is_cleaned_up); + is_paused = std::move(other708.is_paused); + __isset = std::move(other708.__isset); } partition_bulk_load_state &partition_bulk_load_state:: -operator=(const partition_bulk_load_state &other700) -{ - download_progress = other700.download_progress; - download_status = other700.download_status; - ingest_status = other700.ingest_status; - is_cleaned_up = other700.is_cleaned_up; - is_paused = other700.is_paused; - __isset = other700.__isset; +operator=(const partition_bulk_load_state &other709) +{ + download_progress = other709.download_progress; + download_status = other709.download_status; + ingest_status = other709.ingest_status; + is_cleaned_up = other709.is_cleaned_up; + is_paused = other709.is_paused; + __isset = other709.__isset; return *this; } partition_bulk_load_state &partition_bulk_load_state:: -operator=(partition_bulk_load_state &&other701) -{ - download_progress = std::move(other701.download_progress); - download_status = std::move(other701.download_status); - ingest_status = std::move(other701.ingest_status); - is_cleaned_up = std::move(other701.is_cleaned_up); - is_paused = std::move(other701.is_paused); - __isset = std::move(other701.__isset); +operator=(partition_bulk_load_state &&other710) +{ + download_progress = std::move(other710.download_progress); + download_status = std::move(other710.download_status); + ingest_status = std::move(other710.ingest_status); + is_cleaned_up = std::move(other710.is_cleaned_up); + is_paused = std::move(other710.is_paused); + __isset = std::move(other710.__isset); return *this; } void partition_bulk_load_state::printTo(std::ostream &out) const @@ -16523,9 +16791,9 @@ uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) break; case 7: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast702; - xfer += iprot->readI32(ecast702); - this->meta_bulk_load_status = (bulk_load_status::type)ecast702; + int32_t ecast711; + xfer += iprot->readI32(ecast711); + this->meta_bulk_load_status = (bulk_load_status::type)ecast711; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16622,58 +16890,58 @@ void swap(bulk_load_request &a, bulk_load_request &b) swap(a.__isset, b.__isset); } -bulk_load_request::bulk_load_request(const bulk_load_request &other703) -{ - pid = other703.pid; - app_name = other703.app_name; - primary_addr = other703.primary_addr; - remote_provider_name = other703.remote_provider_name; - cluster_name = other703.cluster_name; - ballot = other703.ballot; - meta_bulk_load_status = other703.meta_bulk_load_status; - query_bulk_load_metadata = other703.query_bulk_load_metadata; - remote_root_path = other703.remote_root_path; - __isset = other703.__isset; -} -bulk_load_request::bulk_load_request(bulk_load_request &&other704) -{ - pid = std::move(other704.pid); - app_name = std::move(other704.app_name); - primary_addr = std::move(other704.primary_addr); - remote_provider_name = std::move(other704.remote_provider_name); - cluster_name = std::move(other704.cluster_name); - ballot = std::move(other704.ballot); - meta_bulk_load_status = std::move(other704.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other704.query_bulk_load_metadata); - remote_root_path = std::move(other704.remote_root_path); - __isset = std::move(other704.__isset); -} -bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other705) -{ - pid = other705.pid; - app_name = other705.app_name; - primary_addr = other705.primary_addr; - remote_provider_name = other705.remote_provider_name; - cluster_name = other705.cluster_name; - ballot = other705.ballot; - meta_bulk_load_status = other705.meta_bulk_load_status; - query_bulk_load_metadata = other705.query_bulk_load_metadata; - remote_root_path = other705.remote_root_path; - __isset = other705.__isset; +bulk_load_request::bulk_load_request(const bulk_load_request &other712) +{ + pid = other712.pid; + app_name = other712.app_name; + primary_addr = other712.primary_addr; + remote_provider_name = other712.remote_provider_name; + cluster_name = other712.cluster_name; + ballot = other712.ballot; + meta_bulk_load_status = other712.meta_bulk_load_status; + query_bulk_load_metadata = other712.query_bulk_load_metadata; + remote_root_path = other712.remote_root_path; + __isset = other712.__isset; +} +bulk_load_request::bulk_load_request(bulk_load_request &&other713) +{ + pid = std::move(other713.pid); + app_name = std::move(other713.app_name); + primary_addr = std::move(other713.primary_addr); + remote_provider_name = std::move(other713.remote_provider_name); + cluster_name = std::move(other713.cluster_name); + ballot = std::move(other713.ballot); + meta_bulk_load_status = std::move(other713.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other713.query_bulk_load_metadata); + remote_root_path = std::move(other713.remote_root_path); + __isset = std::move(other713.__isset); +} +bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other714) +{ + pid = other714.pid; + app_name = other714.app_name; + primary_addr = other714.primary_addr; + remote_provider_name = other714.remote_provider_name; + cluster_name = other714.cluster_name; + ballot = other714.ballot; + meta_bulk_load_status = other714.meta_bulk_load_status; + query_bulk_load_metadata = other714.query_bulk_load_metadata; + remote_root_path = other714.remote_root_path; + __isset = other714.__isset; return *this; } -bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other706) -{ - pid = std::move(other706.pid); - app_name = std::move(other706.app_name); - primary_addr = std::move(other706.primary_addr); - remote_provider_name = std::move(other706.remote_provider_name); - cluster_name = std::move(other706.cluster_name); - ballot = std::move(other706.ballot); - meta_bulk_load_status = std::move(other706.meta_bulk_load_status); - query_bulk_load_metadata = std::move(other706.query_bulk_load_metadata); - remote_root_path = std::move(other706.remote_root_path); - __isset = std::move(other706.__isset); +bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other715) +{ + pid = std::move(other715.pid); + app_name = std::move(other715.app_name); + primary_addr = std::move(other715.primary_addr); + remote_provider_name = std::move(other715.remote_provider_name); + cluster_name = std::move(other715.cluster_name); + ballot = std::move(other715.ballot); + meta_bulk_load_status = std::move(other715.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other715.query_bulk_load_metadata); + remote_root_path = std::move(other715.remote_root_path); + __isset = std::move(other715.__isset); return *this; } void bulk_load_request::printTo(std::ostream &out) const @@ -16794,9 +17062,9 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) break; case 4: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast707; - xfer += iprot->readI32(ecast707); - this->primary_bulk_load_status = (bulk_load_status::type)ecast707; + int32_t ecast716; + xfer += iprot->readI32(ecast716); + this->primary_bulk_load_status = (bulk_load_status::type)ecast716; this->__isset.primary_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -16806,16 +17074,16 @@ uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_MAP) { { this->group_bulk_load_state.clear(); - uint32_t _size708; - ::apache::thrift::protocol::TType _ktype709; - ::apache::thrift::protocol::TType _vtype710; - xfer += iprot->readMapBegin(_ktype709, _vtype710, _size708); - uint32_t _i712; - for (_i712 = 0; _i712 < _size708; ++_i712) { - ::dsn::rpc_address _key713; - xfer += _key713.read(iprot); - partition_bulk_load_state &_val714 = this->group_bulk_load_state[_key713]; - xfer += _val714.read(iprot); + uint32_t _size717; + ::apache::thrift::protocol::TType _ktype718; + ::apache::thrift::protocol::TType _vtype719; + xfer += iprot->readMapBegin(_ktype718, _vtype719, _size717); + uint32_t _i721; + for (_i721 = 0; _i721 < _size717; ++_i721) { + ::dsn::rpc_address _key722; + xfer += _key722.read(iprot); + partition_bulk_load_state &_val723 = this->group_bulk_load_state[_key722]; + xfer += _val723.read(iprot); } xfer += iprot->readMapEnd(); } @@ -16904,12 +17172,12 @@ uint32_t bulk_load_response::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, static_cast(this->group_bulk_load_state.size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter715; - for (_iter715 = this->group_bulk_load_state.begin(); - _iter715 != this->group_bulk_load_state.end(); - ++_iter715) { - xfer += _iter715->first.write(oprot); - xfer += _iter715->second.write(oprot); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter724; + for (_iter724 = this->group_bulk_load_state.begin(); + _iter724 != this->group_bulk_load_state.end(); + ++_iter724) { + xfer += _iter724->first.write(oprot); + xfer += _iter724->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -16965,64 +17233,64 @@ void swap(bulk_load_response &a, bulk_load_response &b) swap(a.__isset, b.__isset); } -bulk_load_response::bulk_load_response(const bulk_load_response &other716) -{ - err = other716.err; - pid = other716.pid; - app_name = other716.app_name; - primary_bulk_load_status = other716.primary_bulk_load_status; - group_bulk_load_state = other716.group_bulk_load_state; - metadata = other716.metadata; - total_download_progress = other716.total_download_progress; - is_group_ingestion_finished = other716.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other716.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other716.is_group_bulk_load_paused; - __isset = other716.__isset; -} -bulk_load_response::bulk_load_response(bulk_load_response &&other717) -{ - err = std::move(other717.err); - pid = std::move(other717.pid); - app_name = std::move(other717.app_name); - primary_bulk_load_status = std::move(other717.primary_bulk_load_status); - group_bulk_load_state = std::move(other717.group_bulk_load_state); - metadata = std::move(other717.metadata); - total_download_progress = std::move(other717.total_download_progress); - is_group_ingestion_finished = std::move(other717.is_group_ingestion_finished); +bulk_load_response::bulk_load_response(const bulk_load_response &other725) +{ + err = other725.err; + pid = other725.pid; + app_name = other725.app_name; + primary_bulk_load_status = other725.primary_bulk_load_status; + group_bulk_load_state = other725.group_bulk_load_state; + metadata = other725.metadata; + total_download_progress = other725.total_download_progress; + is_group_ingestion_finished = other725.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other725.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other725.is_group_bulk_load_paused; + __isset = other725.__isset; +} +bulk_load_response::bulk_load_response(bulk_load_response &&other726) +{ + err = std::move(other726.err); + pid = std::move(other726.pid); + app_name = std::move(other726.app_name); + primary_bulk_load_status = std::move(other726.primary_bulk_load_status); + group_bulk_load_state = std::move(other726.group_bulk_load_state); + metadata = std::move(other726.metadata); + total_download_progress = std::move(other726.total_download_progress); + is_group_ingestion_finished = std::move(other726.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other717.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other717.is_group_bulk_load_paused); - __isset = std::move(other717.__isset); -} -bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other718) -{ - err = other718.err; - pid = other718.pid; - app_name = other718.app_name; - primary_bulk_load_status = other718.primary_bulk_load_status; - group_bulk_load_state = other718.group_bulk_load_state; - metadata = other718.metadata; - total_download_progress = other718.total_download_progress; - is_group_ingestion_finished = other718.is_group_ingestion_finished; - is_group_bulk_load_context_cleaned_up = other718.is_group_bulk_load_context_cleaned_up; - is_group_bulk_load_paused = other718.is_group_bulk_load_paused; - __isset = other718.__isset; + std::move(other726.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other726.is_group_bulk_load_paused); + __isset = std::move(other726.__isset); +} +bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other727) +{ + err = other727.err; + pid = other727.pid; + app_name = other727.app_name; + primary_bulk_load_status = other727.primary_bulk_load_status; + group_bulk_load_state = other727.group_bulk_load_state; + metadata = other727.metadata; + total_download_progress = other727.total_download_progress; + is_group_ingestion_finished = other727.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned_up = other727.is_group_bulk_load_context_cleaned_up; + is_group_bulk_load_paused = other727.is_group_bulk_load_paused; + __isset = other727.__isset; return *this; } -bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other719) +bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other728) { - err = std::move(other719.err); - pid = std::move(other719.pid); - app_name = std::move(other719.app_name); - primary_bulk_load_status = std::move(other719.primary_bulk_load_status); - group_bulk_load_state = std::move(other719.group_bulk_load_state); - metadata = std::move(other719.metadata); - total_download_progress = std::move(other719.total_download_progress); - is_group_ingestion_finished = std::move(other719.is_group_ingestion_finished); + err = std::move(other728.err); + pid = std::move(other728.pid); + app_name = std::move(other728.app_name); + primary_bulk_load_status = std::move(other728.primary_bulk_load_status); + group_bulk_load_state = std::move(other728.group_bulk_load_state); + metadata = std::move(other728.metadata); + total_download_progress = std::move(other728.total_download_progress); + is_group_ingestion_finished = std::move(other728.is_group_ingestion_finished); is_group_bulk_load_context_cleaned_up = - std::move(other719.is_group_bulk_load_context_cleaned_up); - is_group_bulk_load_paused = std::move(other719.is_group_bulk_load_paused); - __isset = std::move(other719.__isset); + std::move(other728.is_group_bulk_load_context_cleaned_up); + is_group_bulk_load_paused = std::move(other728.is_group_bulk_load_paused); + __isset = std::move(other728.__isset); return *this; } void bulk_load_response::printTo(std::ostream &out) const @@ -17153,9 +17421,9 @@ uint32_t group_bulk_load_request::read(::apache::thrift::protocol::TProtocol *ip break; case 6: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast720; - xfer += iprot->readI32(ecast720); - this->meta_bulk_load_status = (bulk_load_status::type)ecast720; + int32_t ecast729; + xfer += iprot->readI32(ecast729); + this->meta_bulk_load_status = (bulk_load_status::type)ecast729; this->__isset.meta_bulk_load_status = true; } else { xfer += iprot->skip(ftype); @@ -17233,50 +17501,50 @@ void swap(group_bulk_load_request &a, group_bulk_load_request &b) swap(a.__isset, b.__isset); } -group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other721) -{ - app_name = other721.app_name; - target_address = other721.target_address; - config = other721.config; - provider_name = other721.provider_name; - cluster_name = other721.cluster_name; - meta_bulk_load_status = other721.meta_bulk_load_status; - remote_root_path = other721.remote_root_path; - __isset = other721.__isset; -} -group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other722) -{ - app_name = std::move(other722.app_name); - target_address = std::move(other722.target_address); - config = std::move(other722.config); - provider_name = std::move(other722.provider_name); - cluster_name = std::move(other722.cluster_name); - meta_bulk_load_status = std::move(other722.meta_bulk_load_status); - remote_root_path = std::move(other722.remote_root_path); - __isset = std::move(other722.__isset); -} -group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other723) -{ - app_name = other723.app_name; - target_address = other723.target_address; - config = other723.config; - provider_name = other723.provider_name; - cluster_name = other723.cluster_name; - meta_bulk_load_status = other723.meta_bulk_load_status; - remote_root_path = other723.remote_root_path; - __isset = other723.__isset; +group_bulk_load_request::group_bulk_load_request(const group_bulk_load_request &other730) +{ + app_name = other730.app_name; + target_address = other730.target_address; + config = other730.config; + provider_name = other730.provider_name; + cluster_name = other730.cluster_name; + meta_bulk_load_status = other730.meta_bulk_load_status; + remote_root_path = other730.remote_root_path; + __isset = other730.__isset; +} +group_bulk_load_request::group_bulk_load_request(group_bulk_load_request &&other731) +{ + app_name = std::move(other731.app_name); + target_address = std::move(other731.target_address); + config = std::move(other731.config); + provider_name = std::move(other731.provider_name); + cluster_name = std::move(other731.cluster_name); + meta_bulk_load_status = std::move(other731.meta_bulk_load_status); + remote_root_path = std::move(other731.remote_root_path); + __isset = std::move(other731.__isset); +} +group_bulk_load_request &group_bulk_load_request::operator=(const group_bulk_load_request &other732) +{ + app_name = other732.app_name; + target_address = other732.target_address; + config = other732.config; + provider_name = other732.provider_name; + cluster_name = other732.cluster_name; + meta_bulk_load_status = other732.meta_bulk_load_status; + remote_root_path = other732.remote_root_path; + __isset = other732.__isset; return *this; } -group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other724) +group_bulk_load_request &group_bulk_load_request::operator=(group_bulk_load_request &&other733) { - app_name = std::move(other724.app_name); - target_address = std::move(other724.target_address); - config = std::move(other724.config); - provider_name = std::move(other724.provider_name); - cluster_name = std::move(other724.cluster_name); - meta_bulk_load_status = std::move(other724.meta_bulk_load_status); - remote_root_path = std::move(other724.remote_root_path); - __isset = std::move(other724.__isset); + app_name = std::move(other733.app_name); + target_address = std::move(other733.target_address); + config = std::move(other733.config); + provider_name = std::move(other733.provider_name); + cluster_name = std::move(other733.cluster_name); + meta_bulk_load_status = std::move(other733.meta_bulk_load_status); + remote_root_path = std::move(other733.remote_root_path); + __isset = std::move(other733.__isset); return *this; } void group_bulk_load_request::printTo(std::ostream &out) const @@ -17342,9 +17610,9 @@ uint32_t group_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast725; - xfer += iprot->readI32(ecast725); - this->status = (bulk_load_status::type)ecast725; + int32_t ecast734; + xfer += iprot->readI32(ecast734); + this->status = (bulk_load_status::type)ecast734; this->__isset.status = true; } else { xfer += iprot->skip(ftype); @@ -17402,35 +17670,35 @@ void swap(group_bulk_load_response &a, group_bulk_load_response &b) swap(a.__isset, b.__isset); } -group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other726) +group_bulk_load_response::group_bulk_load_response(const group_bulk_load_response &other735) { - err = other726.err; - status = other726.status; - bulk_load_state = other726.bulk_load_state; - __isset = other726.__isset; + err = other735.err; + status = other735.status; + bulk_load_state = other735.bulk_load_state; + __isset = other735.__isset; } -group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other727) +group_bulk_load_response::group_bulk_load_response(group_bulk_load_response &&other736) { - err = std::move(other727.err); - status = std::move(other727.status); - bulk_load_state = std::move(other727.bulk_load_state); - __isset = std::move(other727.__isset); + err = std::move(other736.err); + status = std::move(other736.status); + bulk_load_state = std::move(other736.bulk_load_state); + __isset = std::move(other736.__isset); } group_bulk_load_response &group_bulk_load_response:: -operator=(const group_bulk_load_response &other728) +operator=(const group_bulk_load_response &other737) { - err = other728.err; - status = other728.status; - bulk_load_state = other728.bulk_load_state; - __isset = other728.__isset; + err = other737.err; + status = other737.status; + bulk_load_state = other737.bulk_load_state; + __isset = other737.__isset; return *this; } -group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other729) +group_bulk_load_response &group_bulk_load_response::operator=(group_bulk_load_response &&other738) { - err = std::move(other729.err); - status = std::move(other729.status); - bulk_load_state = std::move(other729.bulk_load_state); - __isset = std::move(other729.__isset); + err = std::move(other738.err); + status = std::move(other738.status); + bulk_load_state = std::move(other738.bulk_load_state); + __isset = std::move(other738.__isset); return *this; } void group_bulk_load_response::printTo(std::ostream &out) const @@ -17525,30 +17793,30 @@ void swap(ingestion_request &a, ingestion_request &b) swap(a.__isset, b.__isset); } -ingestion_request::ingestion_request(const ingestion_request &other730) +ingestion_request::ingestion_request(const ingestion_request &other739) { - app_name = other730.app_name; - metadata = other730.metadata; - __isset = other730.__isset; + app_name = other739.app_name; + metadata = other739.metadata; + __isset = other739.__isset; } -ingestion_request::ingestion_request(ingestion_request &&other731) +ingestion_request::ingestion_request(ingestion_request &&other740) { - app_name = std::move(other731.app_name); - metadata = std::move(other731.metadata); - __isset = std::move(other731.__isset); + app_name = std::move(other740.app_name); + metadata = std::move(other740.metadata); + __isset = std::move(other740.__isset); } -ingestion_request &ingestion_request::operator=(const ingestion_request &other732) +ingestion_request &ingestion_request::operator=(const ingestion_request &other741) { - app_name = other732.app_name; - metadata = other732.metadata; - __isset = other732.__isset; + app_name = other741.app_name; + metadata = other741.metadata; + __isset = other741.__isset; return *this; } -ingestion_request &ingestion_request::operator=(ingestion_request &&other733) +ingestion_request &ingestion_request::operator=(ingestion_request &&other742) { - app_name = std::move(other733.app_name); - metadata = std::move(other733.metadata); - __isset = std::move(other733.__isset); + app_name = std::move(other742.app_name); + metadata = std::move(other742.metadata); + __isset = std::move(other742.__isset); return *this; } void ingestion_request::printTo(std::ostream &out) const @@ -17641,30 +17909,30 @@ void swap(ingestion_response &a, ingestion_response &b) swap(a.__isset, b.__isset); } -ingestion_response::ingestion_response(const ingestion_response &other734) +ingestion_response::ingestion_response(const ingestion_response &other743) { - err = other734.err; - rocksdb_error = other734.rocksdb_error; - __isset = other734.__isset; + err = other743.err; + rocksdb_error = other743.rocksdb_error; + __isset = other743.__isset; } -ingestion_response::ingestion_response(ingestion_response &&other735) +ingestion_response::ingestion_response(ingestion_response &&other744) { - err = std::move(other735.err); - rocksdb_error = std::move(other735.rocksdb_error); - __isset = std::move(other735.__isset); + err = std::move(other744.err); + rocksdb_error = std::move(other744.rocksdb_error); + __isset = std::move(other744.__isset); } -ingestion_response &ingestion_response::operator=(const ingestion_response &other736) +ingestion_response &ingestion_response::operator=(const ingestion_response &other745) { - err = other736.err; - rocksdb_error = other736.rocksdb_error; - __isset = other736.__isset; + err = other745.err; + rocksdb_error = other745.rocksdb_error; + __isset = other745.__isset; return *this; } -ingestion_response &ingestion_response::operator=(ingestion_response &&other737) +ingestion_response &ingestion_response::operator=(ingestion_response &&other746) { - err = std::move(other737.err); - rocksdb_error = std::move(other737.rocksdb_error); - __isset = std::move(other737.__isset); + err = std::move(other746.err); + rocksdb_error = std::move(other746.rocksdb_error); + __isset = std::move(other746.__isset); return *this; } void ingestion_response::printTo(std::ostream &out) const @@ -17715,9 +17983,9 @@ uint32_t control_bulk_load_request::read(::apache::thrift::protocol::TProtocol * break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast738; - xfer += iprot->readI32(ecast738); - this->type = (bulk_load_control_type::type)ecast738; + int32_t ecast747; + xfer += iprot->readI32(ecast747); + this->type = (bulk_load_control_type::type)ecast747; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -17762,32 +18030,32 @@ void swap(control_bulk_load_request &a, control_bulk_load_request &b) swap(a.__isset, b.__isset); } -control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other739) +control_bulk_load_request::control_bulk_load_request(const control_bulk_load_request &other748) { - app_name = other739.app_name; - type = other739.type; - __isset = other739.__isset; + app_name = other748.app_name; + type = other748.type; + __isset = other748.__isset; } -control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other740) +control_bulk_load_request::control_bulk_load_request(control_bulk_load_request &&other749) { - app_name = std::move(other740.app_name); - type = std::move(other740.type); - __isset = std::move(other740.__isset); + app_name = std::move(other749.app_name); + type = std::move(other749.type); + __isset = std::move(other749.__isset); } control_bulk_load_request &control_bulk_load_request:: -operator=(const control_bulk_load_request &other741) +operator=(const control_bulk_load_request &other750) { - app_name = other741.app_name; - type = other741.type; - __isset = other741.__isset; + app_name = other750.app_name; + type = other750.type; + __isset = other750.__isset; return *this; } control_bulk_load_request &control_bulk_load_request:: -operator=(control_bulk_load_request &&other742) +operator=(control_bulk_load_request &&other751) { - app_name = std::move(other742.app_name); - type = std::move(other742.type); - __isset = std::move(other742.__isset); + app_name = std::move(other751.app_name); + type = std::move(other751.type); + __isset = std::move(other751.__isset); return *this; } void control_bulk_load_request::printTo(std::ostream &out) const @@ -17885,32 +18153,32 @@ void swap(control_bulk_load_response &a, control_bulk_load_response &b) swap(a.__isset, b.__isset); } -control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other743) +control_bulk_load_response::control_bulk_load_response(const control_bulk_load_response &other752) { - err = other743.err; - hint_msg = other743.hint_msg; - __isset = other743.__isset; + err = other752.err; + hint_msg = other752.hint_msg; + __isset = other752.__isset; } -control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other744) +control_bulk_load_response::control_bulk_load_response(control_bulk_load_response &&other753) { - err = std::move(other744.err); - hint_msg = std::move(other744.hint_msg); - __isset = std::move(other744.__isset); + err = std::move(other753.err); + hint_msg = std::move(other753.hint_msg); + __isset = std::move(other753.__isset); } control_bulk_load_response &control_bulk_load_response:: -operator=(const control_bulk_load_response &other745) +operator=(const control_bulk_load_response &other754) { - err = other745.err; - hint_msg = other745.hint_msg; - __isset = other745.__isset; + err = other754.err; + hint_msg = other754.hint_msg; + __isset = other754.__isset; return *this; } control_bulk_load_response &control_bulk_load_response:: -operator=(control_bulk_load_response &&other746) +operator=(control_bulk_load_response &&other755) { - err = std::move(other746.err); - hint_msg = std::move(other746.hint_msg); - __isset = std::move(other746.__isset); + err = std::move(other755.err); + hint_msg = std::move(other755.hint_msg); + __isset = std::move(other755.__isset); return *this; } void control_bulk_load_response::printTo(std::ostream &out) const @@ -17989,26 +18257,26 @@ void swap(query_bulk_load_request &a, query_bulk_load_request &b) swap(a.__isset, b.__isset); } -query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other747) +query_bulk_load_request::query_bulk_load_request(const query_bulk_load_request &other756) { - app_name = other747.app_name; - __isset = other747.__isset; + app_name = other756.app_name; + __isset = other756.__isset; } -query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other748) +query_bulk_load_request::query_bulk_load_request(query_bulk_load_request &&other757) { - app_name = std::move(other748.app_name); - __isset = std::move(other748.__isset); + app_name = std::move(other757.app_name); + __isset = std::move(other757.__isset); } -query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other749) +query_bulk_load_request &query_bulk_load_request::operator=(const query_bulk_load_request &other758) { - app_name = other749.app_name; - __isset = other749.__isset; + app_name = other758.app_name; + __isset = other758.__isset; return *this; } -query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other750) +query_bulk_load_request &query_bulk_load_request::operator=(query_bulk_load_request &&other759) { - app_name = std::move(other750.app_name); - __isset = std::move(other750.__isset); + app_name = std::move(other759.app_name); + __isset = std::move(other759.__isset); return *this; } void query_bulk_load_request::printTo(std::ostream &out) const @@ -18090,9 +18358,9 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast751; - xfer += iprot->readI32(ecast751); - this->app_status = (bulk_load_status::type)ecast751; + int32_t ecast760; + xfer += iprot->readI32(ecast760); + this->app_status = (bulk_load_status::type)ecast760; this->__isset.app_status = true; } else { xfer += iprot->skip(ftype); @@ -18102,15 +18370,15 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->partitions_status.clear(); - uint32_t _size752; - ::apache::thrift::protocol::TType _etype755; - xfer += iprot->readListBegin(_etype755, _size752); - this->partitions_status.resize(_size752); - uint32_t _i756; - for (_i756 = 0; _i756 < _size752; ++_i756) { - int32_t ecast757; - xfer += iprot->readI32(ecast757); - this->partitions_status[_i756] = (bulk_load_status::type)ecast757; + uint32_t _size761; + ::apache::thrift::protocol::TType _etype764; + xfer += iprot->readListBegin(_etype764, _size761); + this->partitions_status.resize(_size761); + uint32_t _i765; + for (_i765 = 0; _i765 < _size761; ++_i765) { + int32_t ecast766; + xfer += iprot->readI32(ecast766); + this->partitions_status[_i765] = (bulk_load_status::type)ecast766; } xfer += iprot->readListEnd(); } @@ -18131,25 +18399,25 @@ uint32_t query_bulk_load_response::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_LIST) { { this->bulk_load_states.clear(); - uint32_t _size758; - ::apache::thrift::protocol::TType _etype761; - xfer += iprot->readListBegin(_etype761, _size758); - this->bulk_load_states.resize(_size758); - uint32_t _i762; - for (_i762 = 0; _i762 < _size758; ++_i762) { + uint32_t _size767; + ::apache::thrift::protocol::TType _etype770; + xfer += iprot->readListBegin(_etype770, _size767); + this->bulk_load_states.resize(_size767); + uint32_t _i771; + for (_i771 = 0; _i771 < _size767; ++_i771) { { - this->bulk_load_states[_i762].clear(); - uint32_t _size763; - ::apache::thrift::protocol::TType _ktype764; - ::apache::thrift::protocol::TType _vtype765; - xfer += iprot->readMapBegin(_ktype764, _vtype765, _size763); - uint32_t _i767; - for (_i767 = 0; _i767 < _size763; ++_i767) { - ::dsn::rpc_address _key768; - xfer += _key768.read(iprot); - partition_bulk_load_state &_val769 = - this->bulk_load_states[_i762][_key768]; - xfer += _val769.read(iprot); + this->bulk_load_states[_i771].clear(); + uint32_t _size772; + ::apache::thrift::protocol::TType _ktype773; + ::apache::thrift::protocol::TType _vtype774; + xfer += iprot->readMapBegin(_ktype773, _vtype774, _size772); + uint32_t _i776; + for (_i776 = 0; _i776 < _size772; ++_i776) { + ::dsn::rpc_address _key777; + xfer += _key777.read(iprot); + partition_bulk_load_state &_val778 = + this->bulk_load_states[_i771][_key777]; + xfer += _val778.read(iprot); } xfer += iprot->readMapEnd(); } @@ -18203,10 +18471,10 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_I32, static_cast(this->partitions_status.size())); - std::vector::const_iterator _iter770; - for (_iter770 = this->partitions_status.begin(); _iter770 != this->partitions_status.end(); - ++_iter770) { - xfer += oprot->writeI32((int32_t)(*_iter770)); + std::vector::const_iterator _iter779; + for (_iter779 = this->partitions_status.begin(); _iter779 != this->partitions_status.end(); + ++_iter779) { + xfer += oprot->writeI32((int32_t)(*_iter779)); } xfer += oprot->writeListEnd(); } @@ -18221,17 +18489,17 @@ uint32_t query_bulk_load_response::write(::apache::thrift::protocol::TProtocol * xfer += oprot->writeListBegin(::apache::thrift::protocol::T_MAP, static_cast(this->bulk_load_states.size())); std::vector>::const_iterator - _iter771; - for (_iter771 = this->bulk_load_states.begin(); _iter771 != this->bulk_load_states.end(); - ++_iter771) { + _iter780; + for (_iter780 = this->bulk_load_states.begin(); _iter780 != this->bulk_load_states.end(); + ++_iter780) { { xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT, - static_cast((*_iter771).size())); - std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter772; - for (_iter772 = (*_iter771).begin(); _iter772 != (*_iter771).end(); ++_iter772) { - xfer += _iter772->first.write(oprot); - xfer += _iter772->second.write(oprot); + static_cast((*_iter780).size())); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter781; + for (_iter781 = (*_iter780).begin(); _iter781 != (*_iter780).end(); ++_iter781) { + xfer += _iter781->first.write(oprot); + xfer += _iter781->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -18263,51 +18531,51 @@ void swap(query_bulk_load_response &a, query_bulk_load_response &b) swap(a.__isset, b.__isset); } -query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other773) +query_bulk_load_response::query_bulk_load_response(const query_bulk_load_response &other782) { - err = other773.err; - app_name = other773.app_name; - app_status = other773.app_status; - partitions_status = other773.partitions_status; - max_replica_count = other773.max_replica_count; - bulk_load_states = other773.bulk_load_states; - hint_msg = other773.hint_msg; - __isset = other773.__isset; + err = other782.err; + app_name = other782.app_name; + app_status = other782.app_status; + partitions_status = other782.partitions_status; + max_replica_count = other782.max_replica_count; + bulk_load_states = other782.bulk_load_states; + hint_msg = other782.hint_msg; + __isset = other782.__isset; } -query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other774) +query_bulk_load_response::query_bulk_load_response(query_bulk_load_response &&other783) { - err = std::move(other774.err); - app_name = std::move(other774.app_name); - app_status = std::move(other774.app_status); - partitions_status = std::move(other774.partitions_status); - max_replica_count = std::move(other774.max_replica_count); - bulk_load_states = std::move(other774.bulk_load_states); - hint_msg = std::move(other774.hint_msg); - __isset = std::move(other774.__isset); + err = std::move(other783.err); + app_name = std::move(other783.app_name); + app_status = std::move(other783.app_status); + partitions_status = std::move(other783.partitions_status); + max_replica_count = std::move(other783.max_replica_count); + bulk_load_states = std::move(other783.bulk_load_states); + hint_msg = std::move(other783.hint_msg); + __isset = std::move(other783.__isset); } query_bulk_load_response &query_bulk_load_response:: -operator=(const query_bulk_load_response &other775) -{ - err = other775.err; - app_name = other775.app_name; - app_status = other775.app_status; - partitions_status = other775.partitions_status; - max_replica_count = other775.max_replica_count; - bulk_load_states = other775.bulk_load_states; - hint_msg = other775.hint_msg; - __isset = other775.__isset; +operator=(const query_bulk_load_response &other784) +{ + err = other784.err; + app_name = other784.app_name; + app_status = other784.app_status; + partitions_status = other784.partitions_status; + max_replica_count = other784.max_replica_count; + bulk_load_states = other784.bulk_load_states; + hint_msg = other784.hint_msg; + __isset = other784.__isset; return *this; } -query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other776) +query_bulk_load_response &query_bulk_load_response::operator=(query_bulk_load_response &&other785) { - err = std::move(other776.err); - app_name = std::move(other776.app_name); - app_status = std::move(other776.app_status); - partitions_status = std::move(other776.partitions_status); - max_replica_count = std::move(other776.max_replica_count); - bulk_load_states = std::move(other776.bulk_load_states); - hint_msg = std::move(other776.hint_msg); - __isset = std::move(other776.__isset); + err = std::move(other785.err); + app_name = std::move(other785.app_name); + app_status = std::move(other785.app_status); + partitions_status = std::move(other785.partitions_status); + max_replica_count = std::move(other785.max_replica_count); + bulk_load_states = std::move(other785.bulk_load_states); + hint_msg = std::move(other785.hint_msg); + __isset = std::move(other785.__isset); return *this; } void query_bulk_load_response::printTo(std::ostream &out) const @@ -18360,9 +18628,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro switch (fid) { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast777; - xfer += iprot->readI32(ecast777); - this->type = (hotkey_type::type)ecast777; + int32_t ecast786; + xfer += iprot->readI32(ecast786); + this->type = (hotkey_type::type)ecast786; this->__isset.type = true; } else { xfer += iprot->skip(ftype); @@ -18370,9 +18638,9 @@ uint32_t detect_hotkey_request::read(::apache::thrift::protocol::TProtocol *ipro break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast778; - xfer += iprot->readI32(ecast778); - this->action = (detect_action::type)ecast778; + int32_t ecast787; + xfer += iprot->readI32(ecast787); + this->action = (detect_action::type)ecast787; this->__isset.action = true; } else { xfer += iprot->skip(ftype); @@ -18430,34 +18698,34 @@ void swap(detect_hotkey_request &a, detect_hotkey_request &b) swap(a.__isset, b.__isset); } -detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other779) +detect_hotkey_request::detect_hotkey_request(const detect_hotkey_request &other788) { - type = other779.type; - action = other779.action; - pid = other779.pid; - __isset = other779.__isset; + type = other788.type; + action = other788.action; + pid = other788.pid; + __isset = other788.__isset; } -detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other780) +detect_hotkey_request::detect_hotkey_request(detect_hotkey_request &&other789) { - type = std::move(other780.type); - action = std::move(other780.action); - pid = std::move(other780.pid); - __isset = std::move(other780.__isset); + type = std::move(other789.type); + action = std::move(other789.action); + pid = std::move(other789.pid); + __isset = std::move(other789.__isset); } -detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other781) +detect_hotkey_request &detect_hotkey_request::operator=(const detect_hotkey_request &other790) { - type = other781.type; - action = other781.action; - pid = other781.pid; - __isset = other781.__isset; + type = other790.type; + action = other790.action; + pid = other790.pid; + __isset = other790.__isset; return *this; } -detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other782) +detect_hotkey_request &detect_hotkey_request::operator=(detect_hotkey_request &&other791) { - type = std::move(other782.type); - action = std::move(other782.action); - pid = std::move(other782.pid); - __isset = std::move(other782.__isset); + type = std::move(other791.type); + action = std::move(other791.action); + pid = std::move(other791.pid); + __isset = std::move(other791.__isset); return *this; } void detect_hotkey_request::printTo(std::ostream &out) const @@ -18577,34 +18845,34 @@ void swap(detect_hotkey_response &a, detect_hotkey_response &b) swap(a.__isset, b.__isset); } -detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other783) +detect_hotkey_response::detect_hotkey_response(const detect_hotkey_response &other792) { - err = other783.err; - err_hint = other783.err_hint; - hotkey_result = other783.hotkey_result; - __isset = other783.__isset; + err = other792.err; + err_hint = other792.err_hint; + hotkey_result = other792.hotkey_result; + __isset = other792.__isset; } -detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other784) +detect_hotkey_response::detect_hotkey_response(detect_hotkey_response &&other793) { - err = std::move(other784.err); - err_hint = std::move(other784.err_hint); - hotkey_result = std::move(other784.hotkey_result); - __isset = std::move(other784.__isset); + err = std::move(other793.err); + err_hint = std::move(other793.err_hint); + hotkey_result = std::move(other793.hotkey_result); + __isset = std::move(other793.__isset); } -detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other785) +detect_hotkey_response &detect_hotkey_response::operator=(const detect_hotkey_response &other794) { - err = other785.err; - err_hint = other785.err_hint; - hotkey_result = other785.hotkey_result; - __isset = other785.__isset; + err = other794.err; + err_hint = other794.err_hint; + hotkey_result = other794.hotkey_result; + __isset = other794.__isset; return *this; } -detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other786) +detect_hotkey_response &detect_hotkey_response::operator=(detect_hotkey_response &&other795) { - err = std::move(other786.err); - err_hint = std::move(other786.err_hint); - hotkey_result = std::move(other786.hotkey_result); - __isset = std::move(other786.__isset); + err = std::move(other795.err); + err_hint = std::move(other795.err_hint); + hotkey_result = std::move(other795.hotkey_result); + __isset = std::move(other795.__isset); return *this; } void detect_hotkey_response::printTo(std::ostream &out) const diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index f2c0b90851..42f6b771e8 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -477,6 +477,8 @@ void meta_service::register_rpc_handlers() register_rpc_handler_with_rpc_holder(RPC_CM_REGISTER_CHILD_REPLICA, "register_child_on_meta", &meta_service::on_register_child_on_meta); + register_rpc_handler_with_rpc_holder( + RPC_CM_NOTIFY_STOP_SPLIT, "notify_stop_split", &meta_service::on_notify_stop_split); register_rpc_handler_with_rpc_holder( RPC_CM_START_BULK_LOAD, "start_bulk_load", &meta_service::on_start_bulk_load); register_rpc_handler_with_rpc_holder( @@ -1041,6 +1043,22 @@ void meta_service::on_register_child_on_meta(register_child_rpc rpc) server_state::sStateHash); } +void meta_service::on_notify_stop_split(notify_stop_split_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + if (_split_svc == nullptr) { + derror_f("meta doesn't support partition split"); + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _split_svc->notify_stop_split(std::move(rpc)); }, + server_state::sStateHash); +} + void meta_service::on_start_bulk_load(start_bulk_load_rpc rpc) { if (!check_status(rpc)) { diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index ced331430f..dcc46204c2 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -185,6 +185,7 @@ class meta_service : public serverlet void on_start_partition_split(start_split_rpc rpc); void on_control_partition_split(control_split_rpc rpc); void on_register_child_on_meta(register_child_rpc rpc); + void on_notify_stop_split(notify_stop_split_rpc rpc); // bulk load void on_start_bulk_load(start_bulk_load_rpc rpc); diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp index a5412a026a..ae0e3b97cf 100644 --- a/src/meta/meta_split_service.cpp +++ b/src/meta/meta_split_service.cpp @@ -429,5 +429,88 @@ void meta_split_service::do_control_all(std::shared_ptr app, control_ response.err = ERR_OK; } +void meta_split_service::notify_stop_split(notify_stop_split_rpc rpc) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + zauto_write_lock(app_lock()); + std::shared_ptr app = _state->get_app(request.app_name); + dassert_f(app != nullptr, "app({}) is not existed", request.app_name); + dassert_f(app->is_stateful, "app({}) is stateless currently", request.app_name); + dassert_f(request.meta_split_status == split_status::PAUSING || + request.meta_split_status == split_status::CANCELING, + "invalid split_status({})", + dsn::enum_to_string(request.meta_split_status)); + + const std::string stop_type = + rpc.request().meta_split_status == split_status::PAUSING ? "pause" : "cancel"; + auto iter = app->helpers->split_states.status.find(request.parent_gpid.get_partition_index()); + if (iter == app->helpers->split_states.status.end()) { + dwarn_f("app({}) partition({}) is not executing partition split, ignore out-dated {} split " + "request", + app->app_name, + request.parent_gpid, + stop_type); + response.err = ERR_INVALID_VERSION; + return; + } + + if (iter->second != request.meta_split_status) { + dwarn_f("app({}) partition({}) split_status = {}, ignore out-dated {} split request", + app->app_name, + request.parent_gpid, + dsn::enum_to_string(iter->second), + stop_type); + response.err = ERR_INVALID_VERSION; + return; + } + + ddebug_f("app({}) partition({}) notify {} split succeed", + app->app_name, + request.parent_gpid, + stop_type); + + // pausing split + if (iter->second == split_status::PAUSING) { + iter->second = split_status::PAUSED; + response.err = ERR_OK; + return; + } + + // canceling split + dassert_f(request.partition_count * 2 == app->partition_count, + "wrong partition_count, request({}) vs meta({})", + request.partition_count, + app->partition_count); + app->helpers->split_states.status.erase(request.parent_gpid.get_partition_index()); + response.err = ERR_OK; + // when all partitions finish, partition_count should be updated + if (--app->helpers->split_states.splitting_count == 0) { + do_cancel_partition_split(std::move(app), rpc); + } +} + +void meta_split_service::do_cancel_partition_split(std::shared_ptr app, + notify_stop_split_rpc rpc) +{ + auto on_write_storage_complete = [app, rpc, this]() { + ddebug_f("app({}) update partition count on remote storage, new partition count is {}", + app->app_name, + app->partition_count / 2); + zauto_write_lock l(app_lock()); + app->partition_count /= 2; + for (int i = app->partition_count; i < app->partition_count * 2; ++i) { + app->partitions.erase(app->partitions.cbegin() + i); + app->helpers->contexts.erase(app->helpers->contexts.cbegin() + i); + } + }; + + auto copy = *app; + copy.partition_count = rpc.request().partition_count; + blob value = dsn::json::json_forwarder::encode(copy); + _meta_svc->get_meta_storage()->set_data( + _state->get_app_path(*app), std::move(value), on_write_storage_complete); +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_split_service.h b/src/meta/meta_split_service.h index f85356bc89..439d2382c8 100644 --- a/src/meta/meta_split_service.h +++ b/src/meta/meta_split_service.h @@ -57,6 +57,10 @@ class meta_split_service void on_add_child_on_remote_storage_reply(error_code ec, register_child_rpc rpc, bool create_new); + // primary replica -> meta to notify group pause or cancel split succeed + void notify_stop_split(notify_stop_split_rpc rpc); + void do_cancel_partition_split(std::shared_ptr app, notify_stop_split_rpc rpc); + static const std::string control_type_str(split_control_type::type type) { std::string str = ""; diff --git a/src/meta/test/meta_split_service_test.cpp b/src/meta/test/meta_split_service_test.cpp index 9605563249..7ad072dfbc 100644 --- a/src/meta/test/meta_split_service_test.cpp +++ b/src/meta/test/meta_split_service_test.cpp @@ -115,6 +115,21 @@ class meta_split_service_test : public meta_test_base return rpc.response().err; } + error_code notify_stop_split(split_status::type req_split_status) + { + auto req = make_unique(); + req->__set_app_name(NAME); + req->__set_parent_gpid(dsn::gpid(app->app_id, PARENT_INDEX)); + req->__set_meta_split_status(req_split_status); + req->__set_partition_count(PARTITION_COUNT); + + notify_stop_split_rpc rpc(std::move(req), RPC_CM_NOTIFY_STOP_SPLIT); + split_svc().notify_stop_split(rpc); + wait_all(); + + return rpc.response().err; + } + int32_t on_config_sync(configuration_query_by_node_request req) { auto request = make_unique(req); @@ -158,6 +173,25 @@ class meta_split_service_test : public meta_test_base app->helpers->split_states.status.clear(); } + void mock_only_one_partition_split(split_status::type split_status) + { + app->partition_count = NEW_PARTITION_COUNT; + app->partitions.resize(app->partition_count); + app->helpers->contexts.resize(app->partition_count); + for (int i = 0; i < app->partition_count; ++i) { + app->helpers->contexts[i].config_owner = &app->partitions[i]; + app->partitions[i].pid = dsn::gpid(app->app_id, i); + if (i >= app->partition_count / 2) { + app->partitions[i].ballot = invalid_ballot; + } else { + app->partitions[i].ballot = PARENT_BALLOT; + app->helpers->contexts[i].stage = config_status::not_pending; + } + } + app->helpers->split_states.splitting_count = 1; + app->helpers->split_states.status[PARENT_INDEX] = split_status; + } + void mock_child_registered() { app->partitions[CHILD_INDEX].ballot = PARENT_BALLOT; @@ -487,5 +521,99 @@ TEST_F(meta_split_service_test, cancel_split_test) } } +// notify stop split unit tests +TEST_F(meta_split_service_test, notify_stop_split_test) +{ + // Test case: + // - request split pausing, meta not_split + // - request split pausing, meta paused + // - request split pausing, meta splitting + // - request split pausing, meta pausing + // - request split pausing, meta canceling + // - request split canceling, meta not_split + // - request split canceling, meta paused + // - request split canceling, meta splitting + // - request split canceling, meta pausing + // - request split canceling, meta canceling + // - request split canceling, meta canceling, last cancel request + struct notify_stop_split_test + { + split_status::type req_split_status; + split_status::type meta_split_status; + bool last_canceled; + error_code expected_err; + split_status::type expected_status; + } tests[] = { + {split_status::PAUSING, + split_status::NOT_SPLIT, + false, + ERR_INVALID_VERSION, + split_status::NOT_SPLIT}, + {split_status::PAUSING, + split_status::PAUSED, + false, + ERR_INVALID_VERSION, + split_status::PAUSED}, + {split_status::PAUSING, + split_status::SPLITTING, + false, + ERR_INVALID_VERSION, + split_status::SPLITTING}, + {split_status::PAUSING, split_status::PAUSING, false, ERR_OK, split_status::PAUSING}, + {split_status::PAUSING, + split_status::CANCELING, + false, + ERR_INVALID_VERSION, + split_status::CANCELING}, + {split_status::CANCELING, + split_status::NOT_SPLIT, + false, + ERR_INVALID_VERSION, + split_status::NOT_SPLIT}, + {split_status::CANCELING, + split_status::PAUSED, + false, + ERR_INVALID_VERSION, + split_status::PAUSED}, + {split_status::CANCELING, + split_status::SPLITTING, + false, + ERR_INVALID_VERSION, + split_status::SPLITTING}, + {split_status::CANCELING, + split_status::PAUSING, + false, + ERR_INVALID_VERSION, + split_status::PAUSING}, + {split_status::CANCELING, split_status::CANCELING, false, ERR_OK, split_status::NOT_SPLIT}, + {split_status::CANCELING, split_status::CANCELING, true, ERR_OK, split_status::NOT_SPLIT}}; + + for (auto test : tests) { + if (test.last_canceled) { + mock_only_one_partition_split(split_status::CANCELING); + } else { + mock_app_partition_split_context(); + if (test.meta_split_status == split_status::NOT_SPLIT) { + mock_child_registered(); + } else { + mock_split_states(test.meta_split_status, PARENT_INDEX); + } + } + + ASSERT_EQ(notify_stop_split(test.req_split_status), test.expected_err); + if (test.last_canceled) { + auto app = find_app(NAME); + ASSERT_EQ(app->partition_count, PARTITION_COUNT); + ASSERT_EQ(app->helpers->split_states.splitting_count, 0); + } else if (test.expected_status != split_status::NOT_SPLIT) { + auto app = find_app(NAME); + ASSERT_EQ(app->partition_count, NEW_PARTITION_COUNT); + check_split_status(test.expected_status, PARENT_INDEX); + } + + clear_app_partition_split_context(); + } +} + } // namespace replication } // namespace dsn diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index b0aa34fd8d..463587e0e5 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -1346,7 +1346,26 @@ void replica_split_manager::parent_send_notify_stop_request( split_status::type meta_split_status) // on primary parent { FAIL_POINT_INJECT_F("replica_parent_send_notify_stop_request", [](dsn::string_view) {}); - // TODO(hyc): TBD + rpc_address meta_address(_stub->_failure_detector->get_servers()); + std::unique_ptr req = make_unique(); + req->app_name = _replica->_app_info.app_name; + req->parent_gpid = get_gpid(); + req->meta_split_status = meta_split_status; + req->partition_count = _replica->_app_info.partition_count; + + ddebug_replica("group {} split succeed, send notify_stop_request to meta server({})", + meta_split_status == split_status::PAUSING ? "pause" : "cancel", + meta_address.to_string()); + notify_stop_split_rpc rpc( + std::move(req), RPC_CM_NOTIFY_STOP_SPLIT, 0_ms, 0, get_gpid().thread_hash()); + rpc.call(meta_address, tracker(), [this, rpc](error_code ec) mutable { + error_code err = ec == ERR_OK ? rpc.response().err : ec; + const std::string type = + rpc.request().meta_split_status == split_status::PAUSING ? "pause" : "cancel"; + if (err != ERR_OK) { + dwarn_replica("notify {} split failed, error = {}, wait for next round", type, err); + } + }); } } // namespace replication diff --git a/src/replication.thrift b/src/replication.thrift index d1dd23ce9f..ac85aec21b 100644 --- a/src/replication.thrift +++ b/src/replication.thrift @@ -982,6 +982,22 @@ struct register_child_response 4:dsn.layer2.partition_configuration child_config; } +// primary -> meta to report pause or cancel split succeed +struct notify_stop_split_request +{ + 1:string app_name; + 2:dsn.gpid parent_gpid; + 3:split_status meta_split_status; + 4:i32 partition_count; +} + +struct notify_stop_split_response +{ + // Possible errors: + // - ERR_INVALID_VERSION: request is out-dated + 1:dsn.error_code err; +} + /////////////////// bulk-load-related structs //////////////////// // app partition bulk load status From 5885150af8638dbfede7e66d6973ced8721eff3c Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 21 Jan 2021 09:46:42 +0800 Subject: [PATCH 2/2] update by code review --- src/meta/meta_split_service.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp index ae0e3b97cf..a4821a3caf 100644 --- a/src/meta/meta_split_service.cpp +++ b/src/meta/meta_split_service.cpp @@ -442,9 +442,10 @@ void meta_split_service::notify_stop_split(notify_stop_split_rpc rpc) "invalid split_status({})", dsn::enum_to_string(request.meta_split_status)); - const std::string stop_type = + const std::string &stop_type = rpc.request().meta_split_status == split_status::PAUSING ? "pause" : "cancel"; - auto iter = app->helpers->split_states.status.find(request.parent_gpid.get_partition_index()); + const auto iter = + app->helpers->split_states.status.find(request.parent_gpid.get_partition_index()); if (iter == app->helpers->split_states.status.end()) { dwarn_f("app({}) partition({}) is not executing partition split, ignore out-dated {} split " "request", @@ -499,10 +500,8 @@ void meta_split_service::do_cancel_partition_split(std::shared_ptr ap app->partition_count / 2); zauto_write_lock l(app_lock()); app->partition_count /= 2; - for (int i = app->partition_count; i < app->partition_count * 2; ++i) { - app->partitions.erase(app->partitions.cbegin() + i); - app->helpers->contexts.erase(app->helpers->contexts.cbegin() + i); - } + app->helpers->contexts.resize(app->partition_count); + app->partitions.resize(app->partition_count); }; auto copy = *app;