diff --git a/include/dsn/dist/replication/duplication_common.h b/include/dsn/dist/replication/duplication_common.h index e2ea759ded..5cbd3276ea 100644 --- a/include/dsn/dist/replication/duplication_common.h +++ b/include/dsn/dist/replication/duplication_common.h @@ -44,6 +44,8 @@ typedef int32_t dupid_t; extern const char *duplication_status_to_string(duplication_status::type status); +extern const char *duplication_fail_mode_to_string(duplication_fail_mode::type); + inline bool is_duplication_status_valid(duplication_status::type status) { return status == duplication_status::DS_PAUSE || status == duplication_status::DS_START; diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index dd87fab3a9..5e3cd5733f 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -118,6 +118,8 @@ class replication_ddl_client error_with change_dup_status(std::string app_name, int dupid, duplication_status::type status); + error_with + update_dup_fail_mode(std::string app_name, int dupid, duplication_fail_mode::type fmode); error_with query_dup(std::string app_name); diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 3ace35e104..a259eede04 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -162,6 +162,18 @@ struct duplication_status extern const std::map _duplication_status_VALUES_TO_NAMES; +struct duplication_fail_mode +{ + enum type + { + FAIL_SLOW = 0, + FAIL_SKIP = 1, + FAIL_FAST = 2 + }; +}; + +extern const std::map _duplication_fail_mode_VALUES_TO_NAMES; + class mutation_header; class mutation_update; @@ -5017,10 +5029,14 @@ inline std::ostream &operator<<(std::ostream &out, const duplication_add_respons typedef struct _duplication_modify_request__isset { - _duplication_modify_request__isset() : app_name(false), dupid(false), status(false) {} + _duplication_modify_request__isset() + : app_name(false), dupid(false), status(false), fail_mode(false) + { + } bool app_name : 1; bool dupid : 1; bool status : 1; + bool fail_mode : 1; } _duplication_modify_request__isset; class duplication_modify_request @@ -5030,12 +5046,19 @@ class duplication_modify_request duplication_modify_request(duplication_modify_request &&); duplication_modify_request &operator=(const duplication_modify_request &); duplication_modify_request &operator=(duplication_modify_request &&); - duplication_modify_request() : app_name(), dupid(0), status((duplication_status::type)0) {} + duplication_modify_request() + : app_name(), + dupid(0), + status((duplication_status::type)0), + fail_mode((duplication_fail_mode::type)0) + { + } virtual ~duplication_modify_request() throw(); std::string app_name; int32_t dupid; duplication_status::type status; + duplication_fail_mode::type fail_mode; _duplication_modify_request__isset __isset; @@ -5045,6 +5068,8 @@ class duplication_modify_request void __set_status(const duplication_status::type val); + void __set_fail_mode(const duplication_fail_mode::type val); + bool operator==(const duplication_modify_request &rhs) const { if (!(app_name == rhs.app_name)) @@ -5055,6 +5080,10 @@ class duplication_modify_request return false; else if (__isset.status && !(status == rhs.status)) return false; + if (__isset.fail_mode != rhs.__isset.fail_mode) + return false; + else if (__isset.fail_mode && !(fail_mode == rhs.fail_mode)) + return false; return true; } bool operator!=(const duplication_modify_request &rhs) const { return !(*this == rhs); } @@ -5135,7 +5164,7 @@ typedef struct _duplication_entry__isset remote(false), create_ts(false), progress(false), - not_confirmed(false) + fail_mode(false) { } bool dupid : 1; @@ -5143,7 +5172,7 @@ typedef struct _duplication_entry__isset bool remote : 1; bool create_ts : 1; bool progress : 1; - bool not_confirmed : 1; + bool fail_mode : 1; } _duplication_entry__isset; class duplication_entry @@ -5153,7 +5182,14 @@ class duplication_entry duplication_entry(duplication_entry &&); duplication_entry &operator=(const duplication_entry &); duplication_entry &operator=(duplication_entry &&); - duplication_entry() : dupid(0), status((duplication_status::type)0), remote(), create_ts(0) {} + duplication_entry() + : dupid(0), + status((duplication_status::type)0), + remote(), + create_ts(0), + fail_mode((duplication_fail_mode::type)0) + { + } virtual ~duplication_entry() throw(); int32_t dupid; @@ -5161,7 +5197,7 @@ class duplication_entry std::string remote; int64_t create_ts; std::map progress; - std::map not_confirmed; + duplication_fail_mode::type fail_mode; _duplication_entry__isset __isset; @@ -5175,7 +5211,7 @@ class duplication_entry void __set_progress(const std::map &val); - void __set_not_confirmed(const std::map &val); + void __set_fail_mode(const duplication_fail_mode::type val); bool operator==(const duplication_entry &rhs) const { @@ -5191,9 +5227,9 @@ class duplication_entry return false; else if (__isset.progress && !(progress == rhs.progress)) return false; - if (__isset.not_confirmed != rhs.__isset.not_confirmed) + if (__isset.fail_mode != rhs.__isset.fail_mode) return false; - else if (__isset.not_confirmed && !(not_confirmed == rhs.not_confirmed)) + else if (__isset.fail_mode && !(fail_mode == rhs.fail_mode)) return false; return true; } diff --git a/src/dist/replication/common/duplication_common.cpp b/src/dist/replication/common/duplication_common.cpp index c3e97e06a7..d51d528b4d 100644 --- a/src/dist/replication/common/duplication_common.cpp +++ b/src/dist/replication/common/duplication_common.cpp @@ -43,6 +43,15 @@ namespace replication { return it->second; } +/*extern*/ const char *duplication_fail_mode_to_string(duplication_fail_mode::type fmode) +{ + auto it = _duplication_fail_mode_VALUES_TO_NAMES.find(fmode); + dassert(it != _duplication_fail_mode_VALUES_TO_NAMES.end(), + "unexpected type of duplication_fail_mode: %d", + fmode); + return it->second; +} + /*extern*/ const char *get_current_cluster_name() { static const char *cluster_name = @@ -122,6 +131,7 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent) {"create_ts", ts_buf}, {"remote", ent.remote}, {"status", duplication_status_to_string(ent.status)}, + {"fail_mode", duplication_fail_mode_to_string(ent.fail_mode)}, }; if (ent.__isset.progress) { nlohmann::json sub_json; diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 6f7c053c97..e80d0f2b0a 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -137,6 +137,14 @@ const std::map _duplication_status_VALUES_TO_NAMES( ::apache::thrift::TEnumIterator(4, _kduplication_statusValues, _kduplication_statusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +int _kduplication_fail_modeValues[] = {duplication_fail_mode::FAIL_SLOW, + duplication_fail_mode::FAIL_SKIP, + duplication_fail_mode::FAIL_FAST}; +const char *_kduplication_fail_modeNames[] = {"FAIL_SLOW", "FAIL_SKIP", "FAIL_FAST"}; +const std::map _duplication_fail_mode_VALUES_TO_NAMES( + ::apache::thrift::TEnumIterator(3, _kduplication_fail_modeValues, _kduplication_fail_modeNames), + ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + mutation_header::~mutation_header() throw() {} void mutation_header::__set_pid(const ::dsn::gpid &val) { this->pid = val; } @@ -11746,6 +11754,12 @@ void duplication_modify_request::__set_status(const duplication_status::type val __isset.status = true; } +void duplication_modify_request::__set_fail_mode(const duplication_fail_mode::type val) +{ + this->fail_mode = val; + __isset.fail_mode = true; +} + uint32_t duplication_modify_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -11791,6 +11805,16 @@ uint32_t duplication_modify_request::read(::apache::thrift::protocol::TProtocol xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast499; + xfer += iprot->readI32(ecast499); + this->fail_mode = (duplication_fail_mode::type)ecast499; + this->__isset.fail_mode = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -11822,6 +11846,11 @@ uint32_t duplication_modify_request::write(::apache::thrift::protocol::TProtocol xfer += oprot->writeI32((int32_t)this->status); xfer += oprot->writeFieldEnd(); } + if (this->__isset.fail_mode) { + xfer += oprot->writeFieldBegin("fail_mode", ::apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32((int32_t)this->fail_mode); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -11833,39 +11862,44 @@ void swap(duplication_modify_request &a, duplication_modify_request &b) swap(a.app_name, b.app_name); swap(a.dupid, b.dupid); swap(a.status, b.status); + swap(a.fail_mode, b.fail_mode); swap(a.__isset, b.__isset); } -duplication_modify_request::duplication_modify_request(const duplication_modify_request &other499) +duplication_modify_request::duplication_modify_request(const duplication_modify_request &other500) { - app_name = other499.app_name; - dupid = other499.dupid; - status = other499.status; - __isset = other499.__isset; + app_name = other500.app_name; + dupid = other500.dupid; + status = other500.status; + fail_mode = other500.fail_mode; + __isset = other500.__isset; } -duplication_modify_request::duplication_modify_request(duplication_modify_request &&other500) +duplication_modify_request::duplication_modify_request(duplication_modify_request &&other501) { - app_name = std::move(other500.app_name); - dupid = std::move(other500.dupid); - status = std::move(other500.status); - __isset = std::move(other500.__isset); + app_name = std::move(other501.app_name); + dupid = std::move(other501.dupid); + status = std::move(other501.status); + fail_mode = std::move(other501.fail_mode); + __isset = std::move(other501.__isset); } duplication_modify_request &duplication_modify_request:: -operator=(const duplication_modify_request &other501) +operator=(const duplication_modify_request &other502) { - app_name = other501.app_name; - dupid = other501.dupid; - status = other501.status; - __isset = other501.__isset; + app_name = other502.app_name; + dupid = other502.dupid; + status = other502.status; + fail_mode = other502.fail_mode; + __isset = other502.__isset; return *this; } duplication_modify_request &duplication_modify_request:: -operator=(duplication_modify_request &&other502) +operator=(duplication_modify_request &&other503) { - app_name = std::move(other502.app_name); - dupid = std::move(other502.dupid); - status = std::move(other502.status); - __isset = std::move(other502.__isset); + app_name = std::move(other503.app_name); + dupid = std::move(other503.dupid); + status = std::move(other503.status); + fail_mode = std::move(other503.fail_mode); + __isset = std::move(other503.__isset); return *this; } void duplication_modify_request::printTo(std::ostream &out) const @@ -11878,6 +11912,9 @@ void duplication_modify_request::printTo(std::ostream &out) const out << ", " << "status="; (__isset.status ? (out << to_string(status)) : (out << "")); + out << ", " + << "fail_mode="; + (__isset.fail_mode ? (out << to_string(fail_mode)) : (out << "")); out << ")"; } @@ -11962,32 +11999,32 @@ void swap(duplication_modify_response &a, duplication_modify_response &b) } duplication_modify_response::duplication_modify_response( - const duplication_modify_response &other503) + const duplication_modify_response &other504) { - err = other503.err; - appid = other503.appid; - __isset = other503.__isset; + err = other504.err; + appid = other504.appid; + __isset = other504.__isset; } -duplication_modify_response::duplication_modify_response(duplication_modify_response &&other504) +duplication_modify_response::duplication_modify_response(duplication_modify_response &&other505) { - err = std::move(other504.err); - appid = std::move(other504.appid); - __isset = std::move(other504.__isset); + err = std::move(other505.err); + appid = std::move(other505.appid); + __isset = std::move(other505.__isset); } duplication_modify_response &duplication_modify_response:: -operator=(const duplication_modify_response &other505) +operator=(const duplication_modify_response &other506) { - err = other505.err; - appid = other505.appid; - __isset = other505.__isset; + err = other506.err; + appid = other506.appid; + __isset = other506.__isset; return *this; } duplication_modify_response &duplication_modify_response:: -operator=(duplication_modify_response &&other506) +operator=(duplication_modify_response &&other507) { - err = std::move(other506.err); - appid = std::move(other506.appid); - __isset = std::move(other506.__isset); + err = std::move(other507.err); + appid = std::move(other507.appid); + __isset = std::move(other507.__isset); return *this; } void duplication_modify_response::printTo(std::ostream &out) const @@ -12016,10 +12053,10 @@ void duplication_entry::__set_progress(const std::map &val) __isset.progress = true; } -void duplication_entry::__set_not_confirmed(const std::map &val) +void duplication_entry::__set_fail_mode(const duplication_fail_mode::type val) { - this->not_confirmed = val; - __isset.not_confirmed = true; + this->fail_mode = val; + __isset.fail_mode = true; } uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) @@ -12051,9 +12088,9 @@ uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) break; case 2: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast507; - xfer += iprot->readI32(ecast507); - this->status = (duplication_status::type)ecast507; + int32_t ecast508; + xfer += iprot->readI32(ecast508); + this->status = (duplication_status::type)ecast508; this->__isset.status = true; } else { xfer += iprot->skip(ftype); @@ -12079,16 +12116,16 @@ uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_MAP) { { this->progress.clear(); - uint32_t _size508; - ::apache::thrift::protocol::TType _ktype509; - ::apache::thrift::protocol::TType _vtype510; - xfer += iprot->readMapBegin(_ktype509, _vtype510, _size508); - uint32_t _i512; - for (_i512 = 0; _i512 < _size508; ++_i512) { - int32_t _key513; - xfer += iprot->readI32(_key513); - int64_t &_val514 = this->progress[_key513]; - xfer += iprot->readI64(_val514); + uint32_t _size509; + ::apache::thrift::protocol::TType _ktype510; + ::apache::thrift::protocol::TType _vtype511; + xfer += iprot->readMapBegin(_ktype510, _vtype511, _size509); + uint32_t _i513; + for (_i513 = 0; _i513 < _size509; ++_i513) { + int32_t _key514; + xfer += iprot->readI32(_key514); + int64_t &_val515 = this->progress[_key514]; + xfer += iprot->readI64(_val515); } xfer += iprot->readMapEnd(); } @@ -12097,24 +12134,12 @@ uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; - case 6: - if (ftype == ::apache::thrift::protocol::T_MAP) { - { - this->not_confirmed.clear(); - uint32_t _size515; - ::apache::thrift::protocol::TType _ktype516; - ::apache::thrift::protocol::TType _vtype517; - xfer += iprot->readMapBegin(_ktype516, _vtype517, _size515); - uint32_t _i519; - for (_i519 = 0; _i519 < _size515; ++_i519) { - int32_t _key520; - xfer += iprot->readI32(_key520); - int64_t &_val521 = this->not_confirmed[_key520]; - xfer += iprot->readI64(_val521); - } - xfer += iprot->readMapEnd(); - } - this->__isset.not_confirmed = true; + case 7: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast516; + xfer += iprot->readI32(ecast516); + this->fail_mode = (duplication_fail_mode::type)ecast516; + this->__isset.fail_mode = true; } else { xfer += iprot->skip(ftype); } @@ -12159,29 +12184,18 @@ uint32_t duplication_entry::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, ::apache::thrift::protocol::T_I64, static_cast(this->progress.size())); - std::map::const_iterator _iter522; - for (_iter522 = this->progress.begin(); _iter522 != this->progress.end(); ++_iter522) { - xfer += oprot->writeI32(_iter522->first); - xfer += oprot->writeI64(_iter522->second); + std::map::const_iterator _iter517; + for (_iter517 = this->progress.begin(); _iter517 != this->progress.end(); ++_iter517) { + xfer += oprot->writeI32(_iter517->first); + xfer += oprot->writeI64(_iter517->second); } xfer += oprot->writeMapEnd(); } xfer += oprot->writeFieldEnd(); } - if (this->__isset.not_confirmed) { - xfer += oprot->writeFieldBegin("not_confirmed", ::apache::thrift::protocol::T_MAP, 6); - { - xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, - ::apache::thrift::protocol::T_I64, - static_cast(this->not_confirmed.size())); - std::map::const_iterator _iter523; - for (_iter523 = this->not_confirmed.begin(); _iter523 != this->not_confirmed.end(); - ++_iter523) { - xfer += oprot->writeI32(_iter523->first); - xfer += oprot->writeI64(_iter523->second); - } - xfer += oprot->writeMapEnd(); - } + if (this->__isset.fail_mode) { + xfer += oprot->writeFieldBegin("fail_mode", ::apache::thrift::protocol::T_I32, 7); + xfer += oprot->writeI32((int32_t)this->fail_mode); xfer += oprot->writeFieldEnd(); } xfer += oprot->writeFieldStop(); @@ -12197,50 +12211,50 @@ void swap(duplication_entry &a, duplication_entry &b) swap(a.remote, b.remote); swap(a.create_ts, b.create_ts); swap(a.progress, b.progress); - swap(a.not_confirmed, b.not_confirmed); + swap(a.fail_mode, b.fail_mode); swap(a.__isset, b.__isset); } -duplication_entry::duplication_entry(const duplication_entry &other524) +duplication_entry::duplication_entry(const duplication_entry &other518) { - dupid = other524.dupid; - status = other524.status; - remote = other524.remote; - create_ts = other524.create_ts; - progress = other524.progress; - not_confirmed = other524.not_confirmed; - __isset = other524.__isset; + dupid = other518.dupid; + status = other518.status; + remote = other518.remote; + create_ts = other518.create_ts; + progress = other518.progress; + fail_mode = other518.fail_mode; + __isset = other518.__isset; } -duplication_entry::duplication_entry(duplication_entry &&other525) +duplication_entry::duplication_entry(duplication_entry &&other519) { - dupid = std::move(other525.dupid); - status = std::move(other525.status); - remote = std::move(other525.remote); - create_ts = std::move(other525.create_ts); - progress = std::move(other525.progress); - not_confirmed = std::move(other525.not_confirmed); - __isset = std::move(other525.__isset); + dupid = std::move(other519.dupid); + status = std::move(other519.status); + remote = std::move(other519.remote); + create_ts = std::move(other519.create_ts); + progress = std::move(other519.progress); + fail_mode = std::move(other519.fail_mode); + __isset = std::move(other519.__isset); } -duplication_entry &duplication_entry::operator=(const duplication_entry &other526) +duplication_entry &duplication_entry::operator=(const duplication_entry &other520) { - dupid = other526.dupid; - status = other526.status; - remote = other526.remote; - create_ts = other526.create_ts; - progress = other526.progress; - not_confirmed = other526.not_confirmed; - __isset = other526.__isset; + dupid = other520.dupid; + status = other520.status; + remote = other520.remote; + create_ts = other520.create_ts; + progress = other520.progress; + fail_mode = other520.fail_mode; + __isset = other520.__isset; return *this; } -duplication_entry &duplication_entry::operator=(duplication_entry &&other527) +duplication_entry &duplication_entry::operator=(duplication_entry &&other521) { - dupid = std::move(other527.dupid); - status = std::move(other527.status); - remote = std::move(other527.remote); - create_ts = std::move(other527.create_ts); - progress = std::move(other527.progress); - not_confirmed = std::move(other527.not_confirmed); - __isset = std::move(other527.__isset); + dupid = std::move(other521.dupid); + status = std::move(other521.status); + remote = std::move(other521.remote); + create_ts = std::move(other521.create_ts); + progress = std::move(other521.progress); + fail_mode = std::move(other521.fail_mode); + __isset = std::move(other521.__isset); return *this; } void duplication_entry::printTo(std::ostream &out) const @@ -12258,8 +12272,8 @@ void duplication_entry::printTo(std::ostream &out) const << "progress="; (__isset.progress ? (out << to_string(progress)) : (out << "")); out << ", " - << "not_confirmed="; - (__isset.not_confirmed ? (out << to_string(not_confirmed)) : (out << "")); + << "fail_mode="; + (__isset.fail_mode ? (out << to_string(fail_mode)) : (out << "")); out << ")"; } @@ -12328,28 +12342,28 @@ void swap(duplication_query_request &a, duplication_query_request &b) swap(a.__isset, b.__isset); } -duplication_query_request::duplication_query_request(const duplication_query_request &other528) +duplication_query_request::duplication_query_request(const duplication_query_request &other522) { - app_name = other528.app_name; - __isset = other528.__isset; + app_name = other522.app_name; + __isset = other522.__isset; } -duplication_query_request::duplication_query_request(duplication_query_request &&other529) +duplication_query_request::duplication_query_request(duplication_query_request &&other523) { - app_name = std::move(other529.app_name); - __isset = std::move(other529.__isset); + app_name = std::move(other523.app_name); + __isset = std::move(other523.__isset); } duplication_query_request &duplication_query_request:: -operator=(const duplication_query_request &other530) +operator=(const duplication_query_request &other524) { - app_name = other530.app_name; - __isset = other530.__isset; + app_name = other524.app_name; + __isset = other524.__isset; return *this; } duplication_query_request &duplication_query_request:: -operator=(duplication_query_request &&other531) +operator=(duplication_query_request &&other525) { - app_name = std::move(other531.app_name); - __isset = std::move(other531.__isset); + app_name = std::move(other525.app_name); + __isset = std::move(other525.__isset); return *this; } void duplication_query_request::printTo(std::ostream &out) const @@ -12410,13 +12424,13 @@ uint32_t duplication_query_response::read(::apache::thrift::protocol::TProtocol if (ftype == ::apache::thrift::protocol::T_LIST) { { this->entry_list.clear(); - uint32_t _size532; - ::apache::thrift::protocol::TType _etype535; - xfer += iprot->readListBegin(_etype535, _size532); - this->entry_list.resize(_size532); - uint32_t _i536; - for (_i536 = 0; _i536 < _size532; ++_i536) { - xfer += this->entry_list[_i536].read(iprot); + uint32_t _size526; + ::apache::thrift::protocol::TType _etype529; + xfer += iprot->readListBegin(_etype529, _size526); + this->entry_list.resize(_size526); + uint32_t _i530; + for (_i530 = 0; _i530 < _size526; ++_i530) { + xfer += this->entry_list[_i530].read(iprot); } xfer += iprot->readListEnd(); } @@ -12455,9 +12469,9 @@ uint32_t duplication_query_response::write(::apache::thrift::protocol::TProtocol { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->entry_list.size())); - std::vector::const_iterator _iter537; - for (_iter537 = this->entry_list.begin(); _iter537 != this->entry_list.end(); ++_iter537) { - xfer += (*_iter537).write(oprot); + std::vector::const_iterator _iter531; + for (_iter531 = this->entry_list.begin(); _iter531 != this->entry_list.end(); ++_iter531) { + xfer += (*_iter531).write(oprot); } xfer += oprot->writeListEnd(); } @@ -12477,36 +12491,36 @@ void swap(duplication_query_response &a, duplication_query_response &b) swap(a.__isset, b.__isset); } -duplication_query_response::duplication_query_response(const duplication_query_response &other538) +duplication_query_response::duplication_query_response(const duplication_query_response &other532) { - err = other538.err; - appid = other538.appid; - entry_list = other538.entry_list; - __isset = other538.__isset; + err = other532.err; + appid = other532.appid; + entry_list = other532.entry_list; + __isset = other532.__isset; } -duplication_query_response::duplication_query_response(duplication_query_response &&other539) +duplication_query_response::duplication_query_response(duplication_query_response &&other533) { - err = std::move(other539.err); - appid = std::move(other539.appid); - entry_list = std::move(other539.entry_list); - __isset = std::move(other539.__isset); + err = std::move(other533.err); + appid = std::move(other533.appid); + entry_list = std::move(other533.entry_list); + __isset = std::move(other533.__isset); } duplication_query_response &duplication_query_response:: -operator=(const duplication_query_response &other540) +operator=(const duplication_query_response &other534) { - err = other540.err; - appid = other540.appid; - entry_list = other540.entry_list; - __isset = other540.__isset; + err = other534.err; + appid = other534.appid; + entry_list = other534.entry_list; + __isset = other534.__isset; return *this; } duplication_query_response &duplication_query_response:: -operator=(duplication_query_response &&other541) +operator=(duplication_query_response &&other535) { - err = std::move(other541.err); - appid = std::move(other541.appid); - entry_list = std::move(other541.entry_list); - __isset = std::move(other541.__isset); + err = std::move(other535.err); + appid = std::move(other535.appid); + entry_list = std::move(other535.entry_list); + __isset = std::move(other535.__isset); return *this; } void duplication_query_response::printTo(std::ostream &out) const @@ -12604,32 +12618,32 @@ void swap(duplication_confirm_entry &a, duplication_confirm_entry &b) swap(a.__isset, b.__isset); } -duplication_confirm_entry::duplication_confirm_entry(const duplication_confirm_entry &other542) +duplication_confirm_entry::duplication_confirm_entry(const duplication_confirm_entry &other536) { - dupid = other542.dupid; - confirmed_decree = other542.confirmed_decree; - __isset = other542.__isset; + dupid = other536.dupid; + confirmed_decree = other536.confirmed_decree; + __isset = other536.__isset; } -duplication_confirm_entry::duplication_confirm_entry(duplication_confirm_entry &&other543) +duplication_confirm_entry::duplication_confirm_entry(duplication_confirm_entry &&other537) { - dupid = std::move(other543.dupid); - confirmed_decree = std::move(other543.confirmed_decree); - __isset = std::move(other543.__isset); + dupid = std::move(other537.dupid); + confirmed_decree = std::move(other537.confirmed_decree); + __isset = std::move(other537.__isset); } duplication_confirm_entry &duplication_confirm_entry:: -operator=(const duplication_confirm_entry &other544) +operator=(const duplication_confirm_entry &other538) { - dupid = other544.dupid; - confirmed_decree = other544.confirmed_decree; - __isset = other544.__isset; + dupid = other538.dupid; + confirmed_decree = other538.confirmed_decree; + __isset = other538.__isset; return *this; } duplication_confirm_entry &duplication_confirm_entry:: -operator=(duplication_confirm_entry &&other545) +operator=(duplication_confirm_entry &&other539) { - dupid = std::move(other545.dupid); - confirmed_decree = std::move(other545.confirmed_decree); - __isset = std::move(other545.__isset); + dupid = std::move(other539.dupid); + confirmed_decree = std::move(other539.confirmed_decree); + __isset = std::move(other539.__isset); return *this; } void duplication_confirm_entry::printTo(std::ostream &out) const @@ -12683,25 +12697,25 @@ uint32_t duplication_sync_request::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_MAP) { { this->confirm_list.clear(); - uint32_t _size546; - ::apache::thrift::protocol::TType _ktype547; - ::apache::thrift::protocol::TType _vtype548; - xfer += iprot->readMapBegin(_ktype547, _vtype548, _size546); - uint32_t _i550; - for (_i550 = 0; _i550 < _size546; ++_i550) { - ::dsn::gpid _key551; - xfer += _key551.read(iprot); - std::vector &_val552 = - this->confirm_list[_key551]; + uint32_t _size540; + ::apache::thrift::protocol::TType _ktype541; + ::apache::thrift::protocol::TType _vtype542; + xfer += iprot->readMapBegin(_ktype541, _vtype542, _size540); + uint32_t _i544; + for (_i544 = 0; _i544 < _size540; ++_i544) { + ::dsn::gpid _key545; + xfer += _key545.read(iprot); + std::vector &_val546 = + this->confirm_list[_key545]; { - _val552.clear(); - uint32_t _size553; - ::apache::thrift::protocol::TType _etype556; - xfer += iprot->readListBegin(_etype556, _size553); - _val552.resize(_size553); - uint32_t _i557; - for (_i557 = 0; _i557 < _size553; ++_i557) { - xfer += _val552[_i557].read(iprot); + _val546.clear(); + uint32_t _size547; + ::apache::thrift::protocol::TType _etype550; + xfer += iprot->readListBegin(_etype550, _size547); + _val546.resize(_size547); + uint32_t _i551; + for (_i551 = 0; _i551 < _size547; ++_i551) { + xfer += _val546[_i551].read(iprot); } xfer += iprot->readListEnd(); } @@ -12740,17 +12754,17 @@ uint32_t duplication_sync_request::write(::apache::thrift::protocol::TProtocol * xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_LIST, static_cast(this->confirm_list.size())); - std::map<::dsn::gpid, std::vector>::const_iterator _iter558; - for (_iter558 = this->confirm_list.begin(); _iter558 != this->confirm_list.end(); - ++_iter558) { - xfer += _iter558->first.write(oprot); + std::map<::dsn::gpid, std::vector>::const_iterator _iter552; + for (_iter552 = this->confirm_list.begin(); _iter552 != this->confirm_list.end(); + ++_iter552) { + xfer += _iter552->first.write(oprot); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, - static_cast(_iter558->second.size())); - std::vector::const_iterator _iter559; - for (_iter559 = _iter558->second.begin(); _iter559 != _iter558->second.end(); - ++_iter559) { - xfer += (*_iter559).write(oprot); + static_cast(_iter552->second.size())); + std::vector::const_iterator _iter553; + for (_iter553 = _iter552->second.begin(); _iter553 != _iter552->second.end(); + ++_iter553) { + xfer += (*_iter553).write(oprot); } xfer += oprot->writeListEnd(); } @@ -12772,31 +12786,31 @@ void swap(duplication_sync_request &a, duplication_sync_request &b) swap(a.__isset, b.__isset); } -duplication_sync_request::duplication_sync_request(const duplication_sync_request &other560) +duplication_sync_request::duplication_sync_request(const duplication_sync_request &other554) { - node = other560.node; - confirm_list = other560.confirm_list; - __isset = other560.__isset; + node = other554.node; + confirm_list = other554.confirm_list; + __isset = other554.__isset; } -duplication_sync_request::duplication_sync_request(duplication_sync_request &&other561) +duplication_sync_request::duplication_sync_request(duplication_sync_request &&other555) { - node = std::move(other561.node); - confirm_list = std::move(other561.confirm_list); - __isset = std::move(other561.__isset); + node = std::move(other555.node); + confirm_list = std::move(other555.confirm_list); + __isset = std::move(other555.__isset); } duplication_sync_request &duplication_sync_request:: -operator=(const duplication_sync_request &other562) +operator=(const duplication_sync_request &other556) { - node = other562.node; - confirm_list = other562.confirm_list; - __isset = other562.__isset; + node = other556.node; + confirm_list = other556.confirm_list; + __isset = other556.__isset; return *this; } -duplication_sync_request &duplication_sync_request::operator=(duplication_sync_request &&other563) +duplication_sync_request &duplication_sync_request::operator=(duplication_sync_request &&other557) { - node = std::move(other563.node); - confirm_list = std::move(other563.confirm_list); - __isset = std::move(other563.__isset); + node = std::move(other557.node); + confirm_list = std::move(other557.confirm_list); + __isset = std::move(other557.__isset); return *this; } void duplication_sync_request::printTo(std::ostream &out) const @@ -12850,27 +12864,27 @@ uint32_t duplication_sync_response::read(::apache::thrift::protocol::TProtocol * if (ftype == ::apache::thrift::protocol::T_MAP) { { this->dup_map.clear(); - uint32_t _size564; - ::apache::thrift::protocol::TType _ktype565; - ::apache::thrift::protocol::TType _vtype566; - xfer += iprot->readMapBegin(_ktype565, _vtype566, _size564); - uint32_t _i568; - for (_i568 = 0; _i568 < _size564; ++_i568) { - int32_t _key569; - xfer += iprot->readI32(_key569); - std::map &_val570 = this->dup_map[_key569]; + uint32_t _size558; + ::apache::thrift::protocol::TType _ktype559; + ::apache::thrift::protocol::TType _vtype560; + xfer += iprot->readMapBegin(_ktype559, _vtype560, _size558); + uint32_t _i562; + for (_i562 = 0; _i562 < _size558; ++_i562) { + int32_t _key563; + xfer += iprot->readI32(_key563); + std::map &_val564 = this->dup_map[_key563]; { - _val570.clear(); - uint32_t _size571; - ::apache::thrift::protocol::TType _ktype572; - ::apache::thrift::protocol::TType _vtype573; - xfer += iprot->readMapBegin(_ktype572, _vtype573, _size571); - uint32_t _i575; - for (_i575 = 0; _i575 < _size571; ++_i575) { - int32_t _key576; - xfer += iprot->readI32(_key576); - duplication_entry &_val577 = _val570[_key576]; - xfer += _val577.read(iprot); + _val564.clear(); + uint32_t _size565; + ::apache::thrift::protocol::TType _ktype566; + ::apache::thrift::protocol::TType _vtype567; + xfer += iprot->readMapBegin(_ktype566, _vtype567, _size565); + uint32_t _i569; + for (_i569 = 0; _i569 < _size565; ++_i569) { + int32_t _key570; + xfer += iprot->readI32(_key570); + duplication_entry &_val571 = _val564[_key570]; + xfer += _val571.read(iprot); } xfer += iprot->readMapEnd(); } @@ -12909,18 +12923,18 @@ uint32_t duplication_sync_response::write(::apache::thrift::protocol::TProtocol xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, ::apache::thrift::protocol::T_MAP, static_cast(this->dup_map.size())); - std::map>::const_iterator _iter578; - for (_iter578 = this->dup_map.begin(); _iter578 != this->dup_map.end(); ++_iter578) { - xfer += oprot->writeI32(_iter578->first); + std::map>::const_iterator _iter572; + for (_iter572 = this->dup_map.begin(); _iter572 != this->dup_map.end(); ++_iter572) { + xfer += oprot->writeI32(_iter572->first); { xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, ::apache::thrift::protocol::T_STRUCT, - static_cast(_iter578->second.size())); - std::map::const_iterator _iter579; - for (_iter579 = _iter578->second.begin(); _iter579 != _iter578->second.end(); - ++_iter579) { - xfer += oprot->writeI32(_iter579->first); - xfer += _iter579->second.write(oprot); + static_cast(_iter572->second.size())); + std::map::const_iterator _iter573; + for (_iter573 = _iter572->second.begin(); _iter573 != _iter572->second.end(); + ++_iter573) { + xfer += oprot->writeI32(_iter573->first); + xfer += _iter573->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -12942,32 +12956,32 @@ void swap(duplication_sync_response &a, duplication_sync_response &b) swap(a.__isset, b.__isset); } -duplication_sync_response::duplication_sync_response(const duplication_sync_response &other580) +duplication_sync_response::duplication_sync_response(const duplication_sync_response &other574) { - err = other580.err; - dup_map = other580.dup_map; - __isset = other580.__isset; + err = other574.err; + dup_map = other574.dup_map; + __isset = other574.__isset; } -duplication_sync_response::duplication_sync_response(duplication_sync_response &&other581) +duplication_sync_response::duplication_sync_response(duplication_sync_response &&other575) { - err = std::move(other581.err); - dup_map = std::move(other581.dup_map); - __isset = std::move(other581.__isset); + err = std::move(other575.err); + dup_map = std::move(other575.dup_map); + __isset = std::move(other575.__isset); } duplication_sync_response &duplication_sync_response:: -operator=(const duplication_sync_response &other582) +operator=(const duplication_sync_response &other576) { - err = other582.err; - dup_map = other582.dup_map; - __isset = other582.__isset; + err = other576.err; + dup_map = other576.dup_map; + __isset = other576.__isset; return *this; } duplication_sync_response &duplication_sync_response:: -operator=(duplication_sync_response &&other583) +operator=(duplication_sync_response &&other577) { - err = std::move(other583.err); - dup_map = std::move(other583.dup_map); - __isset = std::move(other583.__isset); + err = std::move(other577.err); + dup_map = std::move(other577.dup_map); + __isset = std::move(other577.__isset); return *this; } void duplication_sync_response::printTo(std::ostream &out) const @@ -13045,26 +13059,26 @@ void swap(ddd_diagnose_request &a, ddd_diagnose_request &b) swap(a.__isset, b.__isset); } -ddd_diagnose_request::ddd_diagnose_request(const ddd_diagnose_request &other584) +ddd_diagnose_request::ddd_diagnose_request(const ddd_diagnose_request &other578) { - pid = other584.pid; - __isset = other584.__isset; + pid = other578.pid; + __isset = other578.__isset; } -ddd_diagnose_request::ddd_diagnose_request(ddd_diagnose_request &&other585) +ddd_diagnose_request::ddd_diagnose_request(ddd_diagnose_request &&other579) { - pid = std::move(other585.pid); - __isset = std::move(other585.__isset); + pid = std::move(other579.pid); + __isset = std::move(other579.__isset); } -ddd_diagnose_request &ddd_diagnose_request::operator=(const ddd_diagnose_request &other586) +ddd_diagnose_request &ddd_diagnose_request::operator=(const ddd_diagnose_request &other580) { - pid = other586.pid; - __isset = other586.__isset; + pid = other580.pid; + __isset = other580.__isset; return *this; } -ddd_diagnose_request &ddd_diagnose_request::operator=(ddd_diagnose_request &&other587) +ddd_diagnose_request &ddd_diagnose_request::operator=(ddd_diagnose_request &&other581) { - pid = std::move(other587.pid); - __isset = std::move(other587.__isset); + pid = std::move(other581.pid); + __isset = std::move(other581.__isset); return *this; } void ddd_diagnose_request::printTo(std::ostream &out) const @@ -13236,50 +13250,50 @@ void swap(ddd_node_info &a, ddd_node_info &b) swap(a.__isset, b.__isset); } -ddd_node_info::ddd_node_info(const ddd_node_info &other588) +ddd_node_info::ddd_node_info(const ddd_node_info &other582) { - node = other588.node; - drop_time_ms = other588.drop_time_ms; - is_alive = other588.is_alive; - is_collected = other588.is_collected; - ballot = other588.ballot; - last_committed_decree = other588.last_committed_decree; - last_prepared_decree = other588.last_prepared_decree; - __isset = other588.__isset; + node = other582.node; + drop_time_ms = other582.drop_time_ms; + is_alive = other582.is_alive; + is_collected = other582.is_collected; + ballot = other582.ballot; + last_committed_decree = other582.last_committed_decree; + last_prepared_decree = other582.last_prepared_decree; + __isset = other582.__isset; } -ddd_node_info::ddd_node_info(ddd_node_info &&other589) +ddd_node_info::ddd_node_info(ddd_node_info &&other583) { - node = std::move(other589.node); - drop_time_ms = std::move(other589.drop_time_ms); - is_alive = std::move(other589.is_alive); - is_collected = std::move(other589.is_collected); - ballot = std::move(other589.ballot); - last_committed_decree = std::move(other589.last_committed_decree); - last_prepared_decree = std::move(other589.last_prepared_decree); - __isset = std::move(other589.__isset); + node = std::move(other583.node); + drop_time_ms = std::move(other583.drop_time_ms); + is_alive = std::move(other583.is_alive); + is_collected = std::move(other583.is_collected); + ballot = std::move(other583.ballot); + last_committed_decree = std::move(other583.last_committed_decree); + last_prepared_decree = std::move(other583.last_prepared_decree); + __isset = std::move(other583.__isset); } -ddd_node_info &ddd_node_info::operator=(const ddd_node_info &other590) +ddd_node_info &ddd_node_info::operator=(const ddd_node_info &other584) { - node = other590.node; - drop_time_ms = other590.drop_time_ms; - is_alive = other590.is_alive; - is_collected = other590.is_collected; - ballot = other590.ballot; - last_committed_decree = other590.last_committed_decree; - last_prepared_decree = other590.last_prepared_decree; - __isset = other590.__isset; + node = other584.node; + drop_time_ms = other584.drop_time_ms; + is_alive = other584.is_alive; + is_collected = other584.is_collected; + ballot = other584.ballot; + last_committed_decree = other584.last_committed_decree; + last_prepared_decree = other584.last_prepared_decree; + __isset = other584.__isset; return *this; } -ddd_node_info &ddd_node_info::operator=(ddd_node_info &&other591) +ddd_node_info &ddd_node_info::operator=(ddd_node_info &&other585) { - node = std::move(other591.node); - drop_time_ms = std::move(other591.drop_time_ms); - is_alive = std::move(other591.is_alive); - is_collected = std::move(other591.is_collected); - ballot = std::move(other591.ballot); - last_committed_decree = std::move(other591.last_committed_decree); - last_prepared_decree = std::move(other591.last_prepared_decree); - __isset = std::move(other591.__isset); + node = std::move(other585.node); + drop_time_ms = std::move(other585.drop_time_ms); + is_alive = std::move(other585.is_alive); + is_collected = std::move(other585.is_collected); + ballot = std::move(other585.ballot); + last_committed_decree = std::move(other585.last_committed_decree); + last_prepared_decree = std::move(other585.last_prepared_decree); + __isset = std::move(other585.__isset); return *this; } void ddd_node_info::printTo(std::ostream &out) const @@ -13347,13 +13361,13 @@ uint32_t ddd_partition_info::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_LIST) { { this->dropped.clear(); - uint32_t _size592; - ::apache::thrift::protocol::TType _etype595; - xfer += iprot->readListBegin(_etype595, _size592); - this->dropped.resize(_size592); - uint32_t _i596; - for (_i596 = 0; _i596 < _size592; ++_i596) { - xfer += this->dropped[_i596].read(iprot); + uint32_t _size586; + ::apache::thrift::protocol::TType _etype589; + xfer += iprot->readListBegin(_etype589, _size586); + this->dropped.resize(_size586); + uint32_t _i590; + for (_i590 = 0; _i590 < _size586; ++_i590) { + xfer += this->dropped[_i590].read(iprot); } xfer += iprot->readListEnd(); } @@ -13396,9 +13410,9 @@ uint32_t ddd_partition_info::write(::apache::thrift::protocol::TProtocol *oprot) { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->dropped.size())); - std::vector::const_iterator _iter597; - for (_iter597 = this->dropped.begin(); _iter597 != this->dropped.end(); ++_iter597) { - xfer += (*_iter597).write(oprot); + std::vector::const_iterator _iter591; + for (_iter591 = this->dropped.begin(); _iter591 != this->dropped.end(); ++_iter591) { + xfer += (*_iter591).write(oprot); } xfer += oprot->writeListEnd(); } @@ -13422,34 +13436,34 @@ void swap(ddd_partition_info &a, ddd_partition_info &b) swap(a.__isset, b.__isset); } -ddd_partition_info::ddd_partition_info(const ddd_partition_info &other598) +ddd_partition_info::ddd_partition_info(const ddd_partition_info &other592) { - config = other598.config; - dropped = other598.dropped; - reason = other598.reason; - __isset = other598.__isset; + config = other592.config; + dropped = other592.dropped; + reason = other592.reason; + __isset = other592.__isset; } -ddd_partition_info::ddd_partition_info(ddd_partition_info &&other599) +ddd_partition_info::ddd_partition_info(ddd_partition_info &&other593) { - config = std::move(other599.config); - dropped = std::move(other599.dropped); - reason = std::move(other599.reason); - __isset = std::move(other599.__isset); + config = std::move(other593.config); + dropped = std::move(other593.dropped); + reason = std::move(other593.reason); + __isset = std::move(other593.__isset); } -ddd_partition_info &ddd_partition_info::operator=(const ddd_partition_info &other600) +ddd_partition_info &ddd_partition_info::operator=(const ddd_partition_info &other594) { - config = other600.config; - dropped = other600.dropped; - reason = other600.reason; - __isset = other600.__isset; + config = other594.config; + dropped = other594.dropped; + reason = other594.reason; + __isset = other594.__isset; return *this; } -ddd_partition_info &ddd_partition_info::operator=(ddd_partition_info &&other601) +ddd_partition_info &ddd_partition_info::operator=(ddd_partition_info &&other595) { - config = std::move(other601.config); - dropped = std::move(other601.dropped); - reason = std::move(other601.reason); - __isset = std::move(other601.__isset); + config = std::move(other595.config); + dropped = std::move(other595.dropped); + reason = std::move(other595.reason); + __isset = std::move(other595.__isset); return *this; } void ddd_partition_info::printTo(std::ostream &out) const @@ -13504,13 +13518,13 @@ uint32_t ddd_diagnose_response::read(::apache::thrift::protocol::TProtocol *ipro if (ftype == ::apache::thrift::protocol::T_LIST) { { this->partitions.clear(); - uint32_t _size602; - ::apache::thrift::protocol::TType _etype605; - xfer += iprot->readListBegin(_etype605, _size602); - this->partitions.resize(_size602); - uint32_t _i606; - for (_i606 = 0; _i606 < _size602; ++_i606) { - xfer += this->partitions[_i606].read(iprot); + uint32_t _size596; + ::apache::thrift::protocol::TType _etype599; + xfer += iprot->readListBegin(_etype599, _size596); + this->partitions.resize(_size596); + uint32_t _i600; + for (_i600 = 0; _i600 < _size596; ++_i600) { + xfer += this->partitions[_i600].read(iprot); } xfer += iprot->readListEnd(); } @@ -13545,9 +13559,9 @@ uint32_t ddd_diagnose_response::write(::apache::thrift::protocol::TProtocol *opr { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->partitions.size())); - std::vector::const_iterator _iter607; - for (_iter607 = this->partitions.begin(); _iter607 != this->partitions.end(); ++_iter607) { - xfer += (*_iter607).write(oprot); + std::vector::const_iterator _iter601; + for (_iter601 = this->partitions.begin(); _iter601 != this->partitions.end(); ++_iter601) { + xfer += (*_iter601).write(oprot); } xfer += oprot->writeListEnd(); } @@ -13566,30 +13580,30 @@ void swap(ddd_diagnose_response &a, ddd_diagnose_response &b) swap(a.__isset, b.__isset); } -ddd_diagnose_response::ddd_diagnose_response(const ddd_diagnose_response &other608) +ddd_diagnose_response::ddd_diagnose_response(const ddd_diagnose_response &other602) { - err = other608.err; - partitions = other608.partitions; - __isset = other608.__isset; + err = other602.err; + partitions = other602.partitions; + __isset = other602.__isset; } -ddd_diagnose_response::ddd_diagnose_response(ddd_diagnose_response &&other609) +ddd_diagnose_response::ddd_diagnose_response(ddd_diagnose_response &&other603) { - err = std::move(other609.err); - partitions = std::move(other609.partitions); - __isset = std::move(other609.__isset); + err = std::move(other603.err); + partitions = std::move(other603.partitions); + __isset = std::move(other603.__isset); } -ddd_diagnose_response &ddd_diagnose_response::operator=(const ddd_diagnose_response &other610) +ddd_diagnose_response &ddd_diagnose_response::operator=(const ddd_diagnose_response &other604) { - err = other610.err; - partitions = other610.partitions; - __isset = other610.__isset; + err = other604.err; + partitions = other604.partitions; + __isset = other604.__isset; return *this; } -ddd_diagnose_response &ddd_diagnose_response::operator=(ddd_diagnose_response &&other611) +ddd_diagnose_response &ddd_diagnose_response::operator=(ddd_diagnose_response &&other605) { - err = std::move(other611.err); - partitions = std::move(other611.partitions); - __isset = std::move(other611.__isset); + err = std::move(other605.err); + partitions = std::move(other605.partitions); + __isset = std::move(other605.__isset); return *this; } void ddd_diagnose_response::printTo(std::ostream &out) const @@ -13686,32 +13700,32 @@ void swap(app_partition_split_request &a, app_partition_split_request &b) } app_partition_split_request::app_partition_split_request( - const app_partition_split_request &other612) + const app_partition_split_request &other606) { - app_name = other612.app_name; - new_partition_count = other612.new_partition_count; - __isset = other612.__isset; + app_name = other606.app_name; + new_partition_count = other606.new_partition_count; + __isset = other606.__isset; } -app_partition_split_request::app_partition_split_request(app_partition_split_request &&other613) +app_partition_split_request::app_partition_split_request(app_partition_split_request &&other607) { - app_name = std::move(other613.app_name); - new_partition_count = std::move(other613.new_partition_count); - __isset = std::move(other613.__isset); + app_name = std::move(other607.app_name); + new_partition_count = std::move(other607.new_partition_count); + __isset = std::move(other607.__isset); } app_partition_split_request &app_partition_split_request:: -operator=(const app_partition_split_request &other614) +operator=(const app_partition_split_request &other608) { - app_name = other614.app_name; - new_partition_count = other614.new_partition_count; - __isset = other614.__isset; + app_name = other608.app_name; + new_partition_count = other608.new_partition_count; + __isset = other608.__isset; return *this; } app_partition_split_request &app_partition_split_request:: -operator=(app_partition_split_request &&other615) +operator=(app_partition_split_request &&other609) { - app_name = std::move(other615.app_name); - new_partition_count = std::move(other615.new_partition_count); - __isset = std::move(other615.__isset); + app_name = std::move(other609.app_name); + new_partition_count = std::move(other609.new_partition_count); + __isset = std::move(other609.__isset); return *this; } void app_partition_split_request::printTo(std::ostream &out) const @@ -13823,36 +13837,36 @@ void swap(app_partition_split_response &a, app_partition_split_response &b) } app_partition_split_response::app_partition_split_response( - const app_partition_split_response &other616) + const app_partition_split_response &other610) { - err = other616.err; - app_id = other616.app_id; - partition_count = other616.partition_count; - __isset = other616.__isset; + err = other610.err; + app_id = other610.app_id; + partition_count = other610.partition_count; + __isset = other610.__isset; } -app_partition_split_response::app_partition_split_response(app_partition_split_response &&other617) +app_partition_split_response::app_partition_split_response(app_partition_split_response &&other611) { - err = std::move(other617.err); - app_id = std::move(other617.app_id); - partition_count = std::move(other617.partition_count); - __isset = std::move(other617.__isset); + err = std::move(other611.err); + app_id = std::move(other611.app_id); + partition_count = std::move(other611.partition_count); + __isset = std::move(other611.__isset); } app_partition_split_response &app_partition_split_response:: -operator=(const app_partition_split_response &other618) +operator=(const app_partition_split_response &other612) { - err = other618.err; - app_id = other618.app_id; - partition_count = other618.partition_count; - __isset = other618.__isset; + err = other612.err; + app_id = other612.app_id; + partition_count = other612.partition_count; + __isset = other612.__isset; return *this; } app_partition_split_response &app_partition_split_response:: -operator=(app_partition_split_response &&other619) +operator=(app_partition_split_response &&other613) { - err = std::move(other619.err); - app_id = std::move(other619.app_id); - partition_count = std::move(other619.partition_count); - __isset = std::move(other619.__isset); + err = std::move(other613.err); + app_id = std::move(other613.app_id); + partition_count = std::move(other613.partition_count); + __isset = std::move(other613.__isset); return *this; } void app_partition_split_response::printTo(std::ostream &out) const @@ -13980,38 +13994,38 @@ void swap(notify_catch_up_request &a, notify_catch_up_request &b) swap(a.__isset, b.__isset); } -notify_catch_up_request::notify_catch_up_request(const notify_catch_up_request &other620) +notify_catch_up_request::notify_catch_up_request(const notify_catch_up_request &other614) { - parent_gpid = other620.parent_gpid; - child_gpid = other620.child_gpid; - child_ballot = other620.child_ballot; - child_address = other620.child_address; - __isset = other620.__isset; + parent_gpid = other614.parent_gpid; + child_gpid = other614.child_gpid; + child_ballot = other614.child_ballot; + child_address = other614.child_address; + __isset = other614.__isset; } -notify_catch_up_request::notify_catch_up_request(notify_catch_up_request &&other621) +notify_catch_up_request::notify_catch_up_request(notify_catch_up_request &&other615) { - parent_gpid = std::move(other621.parent_gpid); - child_gpid = std::move(other621.child_gpid); - child_ballot = std::move(other621.child_ballot); - child_address = std::move(other621.child_address); - __isset = std::move(other621.__isset); + parent_gpid = std::move(other615.parent_gpid); + child_gpid = std::move(other615.child_gpid); + child_ballot = std::move(other615.child_ballot); + child_address = std::move(other615.child_address); + __isset = std::move(other615.__isset); } -notify_catch_up_request ¬ify_catch_up_request::operator=(const notify_catch_up_request &other622) +notify_catch_up_request ¬ify_catch_up_request::operator=(const notify_catch_up_request &other616) { - parent_gpid = other622.parent_gpid; - child_gpid = other622.child_gpid; - child_ballot = other622.child_ballot; - child_address = other622.child_address; - __isset = other622.__isset; + parent_gpid = other616.parent_gpid; + child_gpid = other616.child_gpid; + child_ballot = other616.child_ballot; + child_address = other616.child_address; + __isset = other616.__isset; return *this; } -notify_catch_up_request ¬ify_catch_up_request::operator=(notify_catch_up_request &&other623) +notify_catch_up_request ¬ify_catch_up_request::operator=(notify_catch_up_request &&other617) { - parent_gpid = std::move(other623.parent_gpid); - child_gpid = std::move(other623.child_gpid); - child_ballot = std::move(other623.child_ballot); - child_address = std::move(other623.child_address); - __isset = std::move(other623.__isset); + parent_gpid = std::move(other617.parent_gpid); + child_gpid = std::move(other617.child_gpid); + child_ballot = std::move(other617.child_ballot); + child_address = std::move(other617.child_address); + __isset = std::move(other617.__isset); return *this; } void notify_catch_up_request::printTo(std::ostream &out) const @@ -14093,27 +14107,27 @@ void swap(notify_cacth_up_response &a, notify_cacth_up_response &b) swap(a.__isset, b.__isset); } -notify_cacth_up_response::notify_cacth_up_response(const notify_cacth_up_response &other624) +notify_cacth_up_response::notify_cacth_up_response(const notify_cacth_up_response &other618) { - err = other624.err; - __isset = other624.__isset; + err = other618.err; + __isset = other618.__isset; } -notify_cacth_up_response::notify_cacth_up_response(notify_cacth_up_response &&other625) +notify_cacth_up_response::notify_cacth_up_response(notify_cacth_up_response &&other619) { - err = std::move(other625.err); - __isset = std::move(other625.__isset); + err = std::move(other619.err); + __isset = std::move(other619.__isset); } notify_cacth_up_response ¬ify_cacth_up_response:: -operator=(const notify_cacth_up_response &other626) +operator=(const notify_cacth_up_response &other620) { - err = other626.err; - __isset = other626.__isset; + err = other620.err; + __isset = other620.__isset; return *this; } -notify_cacth_up_response ¬ify_cacth_up_response::operator=(notify_cacth_up_response &&other627) +notify_cacth_up_response ¬ify_cacth_up_response::operator=(notify_cacth_up_response &&other621) { - err = std::move(other627.err); - __isset = std::move(other627.__isset); + err = std::move(other621.err); + __isset = std::move(other621.__isset); return *this; } void notify_cacth_up_response::printTo(std::ostream &out) const diff --git a/src/dist/replication/ddl_lib/replication_ddl_client.cpp b/src/dist/replication/ddl_lib/replication_ddl_client.cpp index 3f184ae8d6..305c0efd63 100644 --- a/src/dist/replication/ddl_lib/replication_ddl_client.cpp +++ b/src/dist/replication/ddl_lib/replication_ddl_client.cpp @@ -1364,6 +1364,20 @@ error_with replication_ddl_client::change_dup_statu return call_rpc_sync(duplication_modify_rpc(std::move(req), RPC_CM_MODIFY_DUPLICATION)); } +error_with replication_ddl_client::update_dup_fail_mode( + std::string app_name, int dupid, duplication_fail_mode::type fmode) +{ + if (_duplication_fail_mode_VALUES_TO_NAMES.find(fmode) == + _duplication_fail_mode_VALUES_TO_NAMES.end()) { + return FMT_ERR(ERR_INVALID_PARAMETERS, "unexpected duplication_fail_mode {}", fmode); + } + auto req = make_unique(); + req->app_name = std::move(app_name); + req->dupid = dupid; + req->__set_fail_mode(fmode); + return call_rpc_sync(duplication_modify_rpc(std::move(req), RPC_CM_MODIFY_DUPLICATION)); +} + error_with replication_ddl_client::query_dup(std::string app_name) { auto req = make_unique(); diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp index bce490179f..8ed13f3bc4 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp @@ -172,6 +172,7 @@ duplication_sync_timer::get_dup_states(int app_id, /*out*/ bool *app_found) state.duplicating = s.duplicating; state.not_confirmed = std::max(decree(0), last_committed_decree - s.confirmed_decree); state.not_duplicated = std::max(decree(0), last_committed_decree - s.last_decree); + state.fail_mode = s.fail_mode; result.emplace(std::make_pair(s.dupid, state)); } } diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.h b/src/dist/replication/lib/duplication/duplication_sync_timer.h index 83427cf18e..1acbdb121c 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.h +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.h @@ -36,6 +36,7 @@ class duplication_sync_timer bool duplicating{false}; decree not_duplicated{0}; decree not_confirmed{0}; + duplication_fail_mode::type fail_mode{duplication_fail_mode::FAIL_SLOW}; }; std::multimap get_dup_states(int app_id, /*out*/ bool *app_found); diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 696471e3b7..81939816cb 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -14,6 +14,19 @@ namespace dsn { namespace replication { /*static*/ constexpr int load_from_private_log::MAX_ALLOWED_BLOCK_REPEATS; +/*static*/ constexpr int load_from_private_log::MAX_ALLOWED_FILE_REPEATS; + +bool load_from_private_log::will_fail_skip() const +{ + return _err_file_repeats_num >= MAX_ALLOWED_FILE_REPEATS && + _duplicator->fail_mode() == duplication_fail_mode::FAIL_SKIP; +} + +bool load_from_private_log::will_fail_fast() const +{ + return _err_file_repeats_num >= MAX_ALLOWED_FILE_REPEATS && + _duplicator->fail_mode() == duplication_fail_mode::FAIL_FAST; +} // Fast path to next file. If next file (_current->index + 1) is invalid, // we try to list all files and select a new one to start (find_log_file_to_start). @@ -122,6 +135,13 @@ void load_from_private_log::replay_log_block() return; } + // Error handling on loading failure: + // - If block loading failed for `MAX_ALLOWED_REPEATS` times, it restarts reading the file. + // - If file loading failed for `MAX_ALLOWED_FILE_REPEATS` times, which means it + // met some permanent problem (maybe data corruption), there are 2 options for + // the next move: + // 1. skip this file, abandon the data, can be adopted by who allows minor data lost. + // 2. fail-slow, retry reading this file until human interference. _err_block_repeats_num++; if (_err_block_repeats_num >= MAX_ALLOWED_BLOCK_REPEATS) { derror_replica( @@ -131,6 +151,28 @@ void load_from_private_log::replay_log_block() _current->path(), _start_offset); _counter_dup_load_file_failed_count->increment(); + _err_file_repeats_num++; + if (dsn_unlikely(will_fail_skip())) { + // skip this file + derror_replica("failed loading for {} times, abandon file {} and try next", + _err_file_repeats_num, + _current->path()); + _err_file_repeats_num = 0; + + auto prev_offset = _current_global_end_offset; + if (switch_to_next_log_file()) { + // successfully skip to next file + auto skipped_bytes = _current_global_end_offset - prev_offset; + _counter_dup_load_skipped_bytes_count->add(skipped_bytes); + repeat(_repeat_delay); + return; + } + } else if (dsn_unlikely(will_fail_fast())) { + dassert_replica( + false, + "unable to load file {}, fail fast. please check if the file is corrupted", + _current->path()); + } // retry from file start find_log_file_to_start(); } @@ -165,6 +207,11 @@ load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup "dup.load_file_failed_count", COUNTER_TYPE_NUMBER, "the number of failures loading a private log file during duplication"); + _counter_dup_load_skipped_bytes_count.init_app_counter( + "eon.replica_stub", + "dup.load_skipped_bytes_count", + COUNTER_TYPE_NUMBER, + "bytes of mutations that were skipped because of failure during duplication"); } void load_from_private_log::set_start_decree(decree start_decree) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.h b/src/dist/replication/lib/duplication/load_from_private_log.h index f68c17e8f1..82f02be5b4 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.h +++ b/src/dist/replication/lib/duplication/load_from_private_log.h @@ -49,14 +49,22 @@ class load_from_private_log : public replica_base, void start_from_log_file(log_file_ptr f); + bool will_fail_skip() const; + bool will_fail_fast() const; + void TEST_set_repeat_delay(std::chrono::milliseconds delay) { const_cast(_repeat_delay) = delay; } static constexpr int MAX_ALLOWED_BLOCK_REPEATS{3}; + static constexpr int MAX_ALLOWED_FILE_REPEATS{10}; private: friend class load_from_private_log_test; + friend class load_fail_mode_test; + FRIEND_TEST(load_fail_mode_test, fail_skip); + FRIEND_TEST(load_fail_mode_test, fail_slow); + FRIEND_TEST(load_fail_mode_test, fail_skip_real_corrupted_file); mutation_log_ptr _private_log; replica_duplicator *_duplicator; @@ -70,10 +78,13 @@ class load_from_private_log : public replica_base, // How many times it repeats reading from current block but failed. int _err_block_repeats_num{0}; + // How many times it repeats reading current log file but failed. + int _err_file_repeats_num{0}; decree _start_decree{0}; perf_counter_wrapper _counter_dup_load_file_failed_count; + perf_counter_wrapper _counter_dup_load_skipped_bytes_count; perf_counter_wrapper _counter_dup_log_read_bytes_rate; perf_counter_wrapper _counter_dup_log_read_mutations_rate; diff --git a/src/dist/replication/lib/duplication/replica_duplicator.h b/src/dist/replication/lib/duplication/replica_duplicator.h index 02953d0708..a77a9bf023 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.h +++ b/src/dist/replication/lib/duplication/replica_duplicator.h @@ -65,6 +65,15 @@ class replica_duplicator : public replica_base, public pipeline::base // Not thread-safe. void update_status_if_needed(duplication_status::type next_status); + void update_fail_mode(duplication_fail_mode::type fmode) + { + _fail_mode.store(fmode, std::memory_order_relaxed); + } + duplication_fail_mode::type fail_mode() const + { + return _fail_mode.load(std::memory_order_relaxed); + } + dupid_t id() const { return _id; } const std::string &remote_cluster_name() const { return _remote_cluster_name; } @@ -121,6 +130,7 @@ class replica_duplicator : public replica_base, public pipeline::base dsn::task_tracker _tracker; duplication_status::type _status{duplication_status::DS_INIT}; + std::atomic _fail_mode{duplication_fail_mode::FAIL_SLOW}; // protect the access of _progress. mutable zrwlock_nr _lock; diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp index 85ae458577..9690efceea 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp @@ -57,6 +57,9 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent) duplication_progress newp = dup->progress().set_confirmed_decree(it->second); dcheck_eq_replica(dup->update_progress(newp), error_s::ok()); dup->update_status_if_needed(next_status); + if (ent.__isset.fail_mode) { + dup->update_fail_mode(ent.fail_mode); + } } } @@ -142,6 +145,7 @@ replica_duplicator_manager::get_dup_states() const auto progress = dup.second->progress(); state.last_decree = progress.last_decree; state.confirmed_decree = progress.confirmed_decree; + state.fail_mode = dup.second->fail_mode(); ret.emplace_back(state); } return ret; diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.h b/src/dist/replication/lib/duplication/replica_duplicator_manager.h index 60de1a2fe6..ee99ec0bd5 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.h +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.h @@ -64,6 +64,7 @@ class replica_duplicator_manager : public replica_base bool duplicating{false}; decree last_decree{invalid_decree}; decree confirmed_decree{invalid_decree}; + duplication_fail_mode::type fail_mode{duplication_fail_mode::FAIL_SLOW}; }; std::vector get_dup_states() const; diff --git a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp index f21a79725a..47e6ff6dda 100644 --- a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp +++ b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp @@ -28,6 +28,24 @@ class load_from_private_log_test : public duplication_test_base duplicator = create_test_duplicator(); } + // return number of entries written + int generate_multiple_log_files(uint files_num = 3) + { + // decree ranges from [1, files_num*10) + for (int f = 0; f < files_num; f++) { + // each round mlog will replay the former logs, and create new file + mutation_log_ptr mlog = create_private_log(); + for (int i = 1; i <= 10; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(10 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + mlog->close(); + } + return static_cast(files_num * 10); + } + void test_find_log_file_to_start() { load_from_private_log load(_replica.get(), duplicator.get()); @@ -47,15 +65,7 @@ class load_from_private_log_test : public duplication_test_base load.find_log_file_to_start({}); ASSERT_FALSE(load._current); - { // writing mutations to log which will generate multiple files - for (int i = 0; i < 1000 * 50; i++) { - std::string msg = "hello!"; - mutations.push_back(msg); - mutation_ptr mu = create_test_mutation(2 + i, msg); - mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); - } - mlog->tracker()->wait_outstanding_tasks(); - } + int num_entries = generate_multiple_log_files(3); auto files = open_log_file_map(_log_dir); @@ -65,14 +75,14 @@ class load_from_private_log_test : public duplication_test_base ASSERT_EQ(load._current->index(), 1); load._current = nullptr; - load.set_start_decree(50); + load.set_start_decree(5); load.find_log_file_to_start(files); ASSERT_TRUE(load._current); ASSERT_EQ(load._current->index(), 1); int last_idx = files.rbegin()->first; load._current = nullptr; - load.set_start_decree(1000 * 50 + 200); + load.set_start_decree(num_entries + 200); load.find_log_file_to_start(files); ASSERT_TRUE(load._current); ASSERT_EQ(load._current->index(), last_idx); @@ -165,17 +175,7 @@ class load_from_private_log_test : public duplication_test_base { load_from_private_log load(_replica.get(), duplicator.get()); - // start duplication from a compacted plog dir. - // first log file is log.2.xxx - for (int f = 0; f < 2; f++) { - mutation_log_ptr mlog = create_private_log(); - for (int i = 0; i < 100; i++) { - std::string msg = "hello!"; - mutation_ptr mu = create_test_mutation(39000 + 100 * f + i, msg); - mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); - } - mlog->tracker()->wait_outstanding_tasks(); - } + int num_entries = generate_multiple_log_files(2); std::vector files; ASSERT_EQ(log_utils::list_all_files(_log_dir, files), error_s::ok()); @@ -371,5 +371,95 @@ TEST_F(load_from_private_log_test, ignore_useless) ASSERT_EQ(result.size(), 0); } +class load_fail_mode_test : public load_from_private_log_test +{ +public: + void SetUp() override + { + const int num_entries = generate_multiple_log_files(); + + // prepare loading pipeline + mlog = create_private_log(); + _replica->init_private_log(mlog); + duplicator = create_test_duplicator(1); + load = make_unique(_replica.get(), duplicator.get()); + load->TEST_set_repeat_delay(0_ms); // no delay + load->set_start_decree(duplicator->progress().last_decree + 1); + end_stage = make_unique( + [this, num_entries](decree &&d, mutation_tuple_set &&mutations) { + load->set_start_decree(d + 1); + if (d < num_entries - 1) { + load->run(); + } + }); + duplicator->from(*load).link(*end_stage); + } + + mutation_log_ptr mlog; + std::unique_ptr load; + + using end_stage_t = pipeline::do_when; + std::unique_ptr end_stage; +}; + +TEST_F(load_fail_mode_test, fail_skip) +{ + duplicator->update_fail_mode(duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + + // will trigger fail-skip and read the subsequent file, some mutations will be lost. + auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS; + fail::setup(); + fail::cfg("mutation_log_replay_block", fmt::format("100%{}*return()", repeats)); + duplicator->run_pipeline(); + duplicator->wait_all(); + fail::teardown(); + + ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + load_from_private_log::MAX_ALLOWED_FILE_REPEATS); + ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); +} + +TEST_F(load_fail_mode_test, fail_slow) +{ + duplicator->update_fail_mode(duplication_fail_mode::FAIL_SLOW); + ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), 0); + + // will trigger fail-slow and retry infinitely + auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS; + fail::setup(); + fail::cfg("mutation_log_replay_block", fmt::format("100%{}*return()", repeats)); + duplicator->run_pipeline(); + duplicator->wait_all(); + fail::teardown(); + + ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + load_from_private_log::MAX_ALLOWED_FILE_REPEATS); + ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); +} + +TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file) +{ + { // inject some bad data in the middle of the first file + std::string log_path = _log_dir + "/log.1.0"; + auto file_size = boost::filesystem::file_size(log_path); + int fd = open(log_path.c_str(), O_WRONLY); + const char buf[] = "xxxxxx"; + auto written_size = pwrite(fd, buf, sizeof(buf), file_size / 2); + ASSERT_EQ(written_size, sizeof(buf)); + close(fd); + } + + duplicator->update_fail_mode(duplication_fail_mode::FAIL_SKIP); + duplicator->run_pipeline(); + duplicator->wait_all(); + + // ensure the bad file will be skipped + ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + load_from_private_log::MAX_ALLOWED_FILE_REPEATS); + ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp b/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp index 9049c0fe75..ee69ddafa4 100644 --- a/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp +++ b/src/dist/replication/lib/duplication/test/replica_http_service_test.cpp @@ -48,7 +48,7 @@ TEST_F(replica_http_service_test, query_duplication_handler) resp.body, R"({)" R"("1583306653":)" - R"({"1.1":{"duplicating":false,"not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50}})" + R"({"1.1":{"duplicating":false,"fail_mode":"FAIL_SLOW","not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50}})" R"(})"); } diff --git a/src/dist/replication/lib/replica_http_service.cpp b/src/dist/replication/lib/replica_http_service.cpp index 72c3a1aa2a..7396974e1f 100644 --- a/src/dist/replication/lib/replica_http_service.cpp +++ b/src/dist/replication/lib/replica_http_service.cpp @@ -48,6 +48,7 @@ void replica_http_service::query_duplication_handler(const http_request &req, ht {"duplicating", s.second.duplicating}, {"not_confirmed_mutations_num", s.second.not_confirmed}, {"not_duplicated_mutations_num", s.second.not_duplicated}, + {"fail_mode", duplication_fail_mode_to_string(s.second.fail_mode)}, }; } resp.status_code = http_status_code::ok; diff --git a/src/dist/replication/meta_server/duplication/duplication_info.cpp b/src/dist/replication/meta_server/duplication/duplication_info.cpp index b875137ab9..1ed691fabd 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.cpp +++ b/src/dist/replication/meta_server/duplication/duplication_info.cpp @@ -63,8 +63,36 @@ namespace replication { return false; } +/*extern*/ void json_encode(dsn::json::JsonWriter &out, const duplication_fail_mode::type &fmode) +{ + json::json_encode(out, duplication_fail_mode_to_string(fmode)); +} + +/*extern*/ bool json_decode(const dsn::json::JsonObject &in, duplication_fail_mode::type &fmode) +{ + static const std::map + _duplication_fail_mode_NAMES_TO_VALUES = { + {"FAIL_SLOW", duplication_fail_mode::FAIL_SLOW}, + {"FAIL_SKIP", duplication_fail_mode::FAIL_SKIP}, + {"FAIL_FAST", duplication_fail_mode::FAIL_FAST}, + }; + + std::string name; + json::json_decode(in, name); + auto it = _duplication_fail_mode_NAMES_TO_VALUES.find(name); + if (it != _duplication_fail_mode_NAMES_TO_VALUES.end()) { + fmode = it->second; + return true; + } + derror_f("unexpected duplication_fail_mode name: {}", name); + // marked as default value. + fmode = duplication_fail_mode::FAIL_SLOW; + return false; +} + // lock held -error_code duplication_info::alter_status(duplication_status::type to_status) +error_code duplication_info::alter_status(duplication_status::type to_status, + duplication_fail_mode::type to_fail_mode) { if (_is_altering) { return ERR_BUSY; @@ -78,13 +106,14 @@ error_code duplication_info::alter_status(duplication_status::type to_status) return ERR_INVALID_PARAMETERS; } - if (_status == to_status) { + if (_status == to_status && _fail_mode == to_fail_mode) { return ERR_OK; } zauto_write_lock l(_lock); _is_altering = true; _next_status = to_status; + _next_fail_mode = to_fail_mode; return ERR_OK; } @@ -148,6 +177,7 @@ void duplication_info::persist_status() _is_altering = false; _status = _next_status; _next_status = duplication_status::DS_INIT; + _fail_mode = _next_fail_mode; } std::string duplication_info::to_string() const @@ -161,6 +191,7 @@ blob duplication_info::to_json_blob() const copy.create_timestamp_ms = create_timestamp_ms; copy.remote = remote; copy.status = _next_status; + copy.fail_mode = _next_fail_mode; return json::json_forwarder::encode(copy); } @@ -190,6 +221,7 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, std::move(info.remote), std::move(store_path)); dup->_status = info.status; + dup->_fail_mode = info.fail_mode; return dup; } diff --git a/src/dist/replication/meta_server/duplication/duplication_info.h b/src/dist/replication/meta_server/duplication/duplication_info.h index 549ca10153..b5c8cc8e39 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.h +++ b/src/dist/replication/meta_server/duplication/duplication_info.h @@ -75,16 +75,17 @@ class duplication_info _next_status = duplication_status::DS_START; } - // change current status to `to_status`. // error will be returned if this state transition is not allowed. - error_code alter_status(duplication_status::type to_status); + error_code + alter_status(duplication_status::type to_status, + duplication_fail_mode::type to_fail_mode = duplication_fail_mode::FAIL_SLOW); - // persist current status to `next_status` // call this function after data has been persisted on meta storage. void persist_status(); // not thread-safe duplication_status::type status() const { return _status; } + duplication_fail_mode::type fail_mode() const { return _fail_mode; } // if this duplication is in valid status. bool is_valid() const { return is_duplication_status_valid(_status); } @@ -124,6 +125,7 @@ class duplication_info entry.create_ts = create_timestamp_ms; entry.remote = remote; entry.status = _status; + entry.__set_fail_mode(_fail_mode); entry.__isset.progress = true; for (const auto &kv : _progress) { if (!kv.second.is_inited) { @@ -175,13 +177,16 @@ class duplication_info duplication_status::type _status{duplication_status::DS_INIT}; duplication_status::type _next_status{duplication_status::DS_INIT}; + duplication_fail_mode::type _fail_mode{duplication_fail_mode::FAIL_SLOW}; + duplication_fail_mode::type _next_fail_mode{duplication_fail_mode::FAIL_SLOW}; struct json_helper { std::string remote; duplication_status::type status; int64_t create_timestamp_ms; + duplication_fail_mode::type fail_mode; - DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms); + DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms, fail_mode); }; public: @@ -196,6 +201,10 @@ extern void json_encode(dsn::json::JsonWriter &out, const duplication_status::ty extern bool json_decode(const dsn::json::JsonObject &in, duplication_status::type &s); +extern void json_encode(dsn::json::JsonWriter &out, const duplication_fail_mode::type &s); + +extern bool json_decode(const dsn::json::JsonObject &in, duplication_fail_mode::type &s); + // Macros for writing log message prefixed by appid and dupid. #define ddebug_dup(_dup_, ...) \ ddebug_f("[a{}d{}] {}", _dup_->app_id, _dup_->id, fmt::format(__VA_ARGS__)); diff --git a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp index 8e28b6c20f..fb57ae9b37 100644 --- a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp +++ b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp @@ -65,9 +65,10 @@ void meta_duplication_service::modify_duplication(duplication_modify_rpc rpc) const auto &request = rpc.request(); auto &response = rpc.response(); - ddebug_f("modify duplication({}) to {} for app({})", + ddebug_f("modify duplication({}) to [status={},fail_mode={}] for app({})", request.dupid, request.__isset.status ? duplication_status_to_string(request.status) : "nil", + request.__isset.fail_mode ? duplication_fail_mode_to_string(request.fail_mode) : "nil", request.app_name); dupid_t dupid = request.dupid; @@ -86,7 +87,8 @@ void meta_duplication_service::modify_duplication(duplication_modify_rpc rpc) duplication_info_s_ptr dup = it->second; auto to_status = request.__isset.status ? request.status : dup->status(); - response.err = dup->alter_status(to_status); + auto to_fail_mode = request.__isset.fail_mode ? request.fail_mode : dup->fail_mode(); + response.err = dup->alter_status(to_status, to_fail_mode); if (response.err != ERR_OK) { return; } diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 79b0bd0809..04892d7920 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -659,6 +659,22 @@ enum duplication_status DS_REMOVED, } +// How duplication reacts on permanent failure. +enum duplication_fail_mode +{ + // The default mode. If some permanent failure occurred that makes duplication + // blocked, it will retry forever until external interference. + FAIL_SLOW = 0, + + // Skip the writes that failed to duplicate, which means minor data loss on the remote cluster. + // This will certainly achieve better stability of the system. + FAIL_SKIP, + + // Stop immediately after it ensures itself unable to duplicate. + // WARN: this mode kills the server process, replicas on the server will all be effected. + FAIL_FAST +} + // This request is sent from client to meta. struct duplication_add_request { @@ -686,6 +702,7 @@ struct duplication_modify_request 1:string app_name; 2:i32 dupid; 3:optional duplication_status status; + 4:optional duplication_fail_mode fail_mode; } struct duplication_modify_response @@ -709,8 +726,7 @@ struct duplication_entry // partition_index => confirmed decree 5:optional map progress; - // partition_index => approximate number of mutations that are not confirmed yet - 6:optional map not_confirmed; + 7:optional duplication_fail_mode fail_mode; } // This request is sent from client to meta. diff --git a/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp index 5d75b30a32..495c927be4 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp @@ -86,6 +86,21 @@ class meta_duplication_service_test : public meta_test_base return rpc.response(); } + duplication_modify_response + update_fail_mode(const std::string &app_name, dupid_t dupid, duplication_fail_mode::type fmode) + { + auto req = make_unique(); + req->dupid = dupid; + req->app_name = app_name; + req->__set_fail_mode(fmode); + + duplication_modify_rpc rpc(std::move(req), RPC_CM_MODIFY_DUPLICATION); + dup_svc().modify_duplication(rpc); + wait_all(); + + return rpc.response(); + } + duplication_sync_response duplication_sync(const rpc_address &node, std::map> confirm_list) @@ -522,6 +537,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) ASSERT_EQ(resp.dup_map[app->app_id][dupid].status, duplication_status::DS_START); ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts, dup->create_timestamp_ms); ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote, dup->remote); + ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode, dup->fail_mode()); auto progress_map = resp.dup_map[app->app_id][dupid].progress; ASSERT_EQ(progress_map.size(), 8); @@ -678,8 +694,56 @@ TEST_F(meta_duplication_service_test, query_duplication_handler) ASSERT_EQ(fake_resp.body, std::string() + R"({"1":{"create_ts":")" + ts_buf + R"(","dupid":)" + std::to_string(dup->id) + + R"(,"fail_mode":"FAIL_SLOW")" R"(,"remote":"slave-cluster","status":"DS_START"},"appid":2})"); } +TEST_F(meta_duplication_service_test, fail_mode) +{ + std::string test_app = "test-app"; + create_app(test_app); + auto app = find_app(test_app); + + auto dup_add_resp = create_dup(test_app); + auto dup = app->duplications[dup_add_resp.dupid]; + ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SLOW); + ASSERT_EQ(dup->status(), duplication_status::DS_START); + + auto resp = update_fail_mode(test_app, dup->id, duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(dup->status(), duplication_status::DS_START); + + // change nothing + resp = update_fail_mode(test_app, dup->id, duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(dup->status(), duplication_status::DS_START); + + // change status but fail mode not changed + resp = change_dup_status(test_app, dup->id, duplication_status::DS_PAUSE); + ASSERT_EQ(resp.err, ERR_OK); + ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP); + ASSERT_EQ(dup->status(), duplication_status::DS_PAUSE); + + // ensure dup_sync will synchronize fail_mode + std::vector server_nodes = generate_node_list(3); + rpc_address node = server_nodes[0]; + for (partition_configuration &pc : app->partitions) { + pc.primary = server_nodes[0]; + } + initialize_node_state(); + duplication_sync_response sync_resp = duplication_sync(node, {}); + ASSERT_TRUE(sync_resp.dup_map[app->app_id][dup->id].__isset.fail_mode); + ASSERT_EQ(sync_resp.dup_map[app->app_id][dup->id].fail_mode, duplication_fail_mode::FAIL_SKIP); + + // ensure recovery will not lose fail_mode. + SetUp(); + recover_from_meta_state(); + app = find_app(test_app); + dup = app->duplications[dup->id]; + ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP); +} + } // namespace replication } // namespace dsn