diff --git a/include/dsn/dist/replication/duplication_common.h b/include/dsn/dist/replication/duplication_common.h index 7ba3e7f98d..fcfaeb16ef 100644 --- a/include/dsn/dist/replication/duplication_common.h +++ b/include/dsn/dist/replication/duplication_common.h @@ -68,9 +68,12 @@ extern const char *get_current_cluster_name(); /// The returned cluster id of get_duplication_cluster_id("wuhan-mi-srv-ad") is 3. extern error_with get_duplication_cluster_id(const std::string &cluster_name); -/// Returns a displayable string for this duplication_entry. +/// Returns a json string. extern std::string duplication_entry_to_string(const duplication_entry &dup); +/// Returns a json string. +extern std::string duplication_query_response_to_string(const duplication_query_response &); + /// Returns a mapping from cluster_name to cluster_id. extern const std::map &get_duplication_group(); diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index dedc570195..1fac0cf674 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -4917,7 +4917,12 @@ inline std::ostream &operator<<(std::ostream &out, const duplication_status_chan typedef struct _duplication_entry__isset { _duplication_entry__isset() - : dupid(false), status(false), remote(false), create_ts(false), progress(false) + : dupid(false), + status(false), + remote(false), + create_ts(false), + progress(false), + not_confirmed(false) { } bool dupid : 1; @@ -4925,6 +4930,7 @@ typedef struct _duplication_entry__isset bool remote : 1; bool create_ts : 1; bool progress : 1; + bool not_confirmed : 1; } _duplication_entry__isset; class duplication_entry @@ -4942,6 +4948,7 @@ class duplication_entry std::string remote; int64_t create_ts; std::map progress; + std::map not_confirmed; _duplication_entry__isset __isset; @@ -4955,6 +4962,8 @@ class duplication_entry void __set_progress(const std::map &val); + void __set_not_confirmed(const std::map &val); + bool operator==(const duplication_entry &rhs) const { if (!(dupid == rhs.dupid)) @@ -4965,7 +4974,13 @@ class duplication_entry return false; if (!(create_ts == rhs.create_ts)) return false; - if (!(progress == rhs.progress)) + if (__isset.progress != rhs.__isset.progress) + return false; + else if (__isset.progress && !(progress == rhs.progress)) + return false; + if (__isset.not_confirmed != rhs.__isset.not_confirmed) + return false; + else if (__isset.not_confirmed && !(not_confirmed == rhs.not_confirmed)) return false; return true; } diff --git a/src/dist/http/http_server.cpp b/src/dist/http/http_server.cpp index e03c3a9bb9..208be25af6 100644 --- a/src/dist/http/http_server.cpp +++ b/src/dist/http/http_server.cpp @@ -127,23 +127,29 @@ void http_server::add_service(http_service *service) unresolved_query = decoded_unresolved_query.get_value(); } + // remove tailing '\0' + if (!unresolved_path.empty() && *unresolved_path.crbegin() == '\0') { + unresolved_path.pop_back(); + } std::vector args; boost::split(args, unresolved_path, boost::is_any_of("/")); - std::vector real_args; - for (string_view arg : args) { - if (!arg.empty() && strlen(arg.data()) != 0) { - real_args.emplace_back(string_view(arg.data())); + std::vector real_args; + for (std::string &arg : args) { + if (!arg.empty()) { + real_args.emplace_back(std::move(arg)); } } - if (real_args.size() > 2) { - return error_s::make(ERR_INVALID_PARAMETERS); - } - if (real_args.size() == 1) { - ret.service_method = {std::string(real_args[0]), ""}; - } else if (real_args.size() == 0) { + if (real_args.size() == 0) { ret.service_method = {"", ""}; + } else if (real_args.size() == 1) { + ret.service_method = {std::move(real_args[0]), ""}; } else { - ret.service_method = {std::string(real_args[0]), std::string(real_args[1])}; + std::string method = std::move(real_args[1]); + for (int i = 2; i < real_args.size(); i++) { + method += '/'; + method += real_args[i]; + } + ret.service_method = {std::move(real_args[0]), std::move(method)}; } // find if there are method args (://?=&=) diff --git a/src/dist/http/test/http_server_test.cpp b/src/dist/http/test/http_server_test.cpp index 7dd024a1c7..596aea92f7 100644 --- a/src/dist/http/test/http_server_test.cpp +++ b/src/dist/http/test/http_server_test.cpp @@ -27,7 +27,8 @@ TEST(http_server, parse_url) {"http://127.0.0.1:34601/threads/", ERR_OK, {"threads", ""}}, {"http://127.0.0.1:34601//pprof/heap/", ERR_OK, {"pprof", "heap"}}, {"http://127.0.0.1:34601//pprof///heap", ERR_OK, {"pprof", "heap"}}, - {"http://127.0.0.1:34601/pprof/heap/arg/", ERR_INVALID_PARAMETERS, {}}, + {"http://127.0.0.1:34601/pprof/heap/arg/", ERR_OK, {"pprof", "heap/arg"}}, + {"http://127.0.0.1:34601/pprof///heap///arg/", ERR_OK, {"pprof", "heap/arg"}}, }; for (auto tt : tests) { diff --git a/src/dist/replication/common/duplication_common.cpp b/src/dist/replication/common/duplication_common.cpp index b0c50c0d3a..ea7ed5acc6 100644 --- a/src/dist/replication/common/duplication_common.cpp +++ b/src/dist/replication/common/duplication_common.cpp @@ -28,9 +28,8 @@ #include #include #include -#include -#include -#include +#include +#include namespace dsn { namespace replication { @@ -114,32 +113,48 @@ class duplication_group_registry : public utils::singleton(ent.create_ts), ts_buf, sizeof(ts_buf)); + nlohmann::json json{ + {"dupid", ent.dupid}, + {"create_ts", ts_buf}, + {"remote", ent.remote}, + {"status", duplication_status_to_string(ent.status)}, + }; + if (ent.__isset.not_confirmed) { + nlohmann::json sub_json; + for (const auto &p : ent.not_confirmed) { + sub_json[std::to_string(p.first)] = p.second; + } + json["not_confirmed_mutations_num"] = sub_json; + } + if (ent.__isset.progress) { + nlohmann::json sub_json; + for (const auto &p : ent.progress) { + sub_json[std::to_string(p.first)] = p.second; + } + json["progress"] = sub_json; } + return json; +} - rapidjson::StringBuffer sb; - rapidjson::Writer writer(sb); - doc.Accept(writer); - return sb.GetString(); +/*extern*/ std::string duplication_entry_to_string(const duplication_entry &ent) +{ + return duplication_entry_to_json(ent).dump(); +} + +/*extern*/ std::string duplication_query_response_to_string(const duplication_query_response &resp) +{ + nlohmann::json json; + int i = 1; + for (const auto &ent : resp.entry_list) { + json["appid"] = resp.appid; + json[std::to_string(i)] = duplication_entry_to_json(ent); + i++; + } + return json.dump(); } /*extern*/ const std::map &get_duplication_group() diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 42afdc9243..f8b4047470 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -11456,6 +11456,13 @@ void duplication_entry::__set_create_ts(const int64_t val) { this->create_ts = v void duplication_entry::__set_progress(const std::map &val) { this->progress = val; + __isset.progress = true; +} + +void duplication_entry::__set_not_confirmed(const std::map &val) +{ + this->not_confirmed = val; + __isset.not_confirmed = true; } uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) @@ -11533,6 +11540,28 @@ uint32_t duplication_entry::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 6: + if (ftype == ::apache::thrift::protocol::T_MAP) { + { + this->not_confirmed.clear(); + uint32_t _size481; + ::apache::thrift::protocol::TType _ktype482; + ::apache::thrift::protocol::TType _vtype483; + xfer += iprot->readMapBegin(_ktype482, _vtype483, _size481); + uint32_t _i485; + for (_i485 = 0; _i485 < _size481; ++_i485) { + int32_t _key486; + xfer += iprot->readI32(_key486); + int64_t &_val487 = this->not_confirmed[_key486]; + xfer += iprot->readI64(_val487); + } + xfer += iprot->readMapEnd(); + } + this->__isset.not_confirmed = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -11567,20 +11596,37 @@ uint32_t duplication_entry::write(::apache::thrift::protocol::TProtocol *oprot) xfer += oprot->writeI64(this->create_ts); xfer += oprot->writeFieldEnd(); - xfer += oprot->writeFieldBegin("progress", ::apache::thrift::protocol::T_MAP, 5); - { - xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, - ::apache::thrift::protocol::T_I64, - static_cast(this->progress.size())); - std::map::const_iterator _iter481; - for (_iter481 = this->progress.begin(); _iter481 != this->progress.end(); ++_iter481) { - xfer += oprot->writeI32(_iter481->first); - xfer += oprot->writeI64(_iter481->second); + if (this->__isset.progress) { + xfer += oprot->writeFieldBegin("progress", ::apache::thrift::protocol::T_MAP, 5); + { + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, + ::apache::thrift::protocol::T_I64, + static_cast(this->progress.size())); + std::map::const_iterator _iter488; + for (_iter488 = this->progress.begin(); _iter488 != this->progress.end(); ++_iter488) { + xfer += oprot->writeI32(_iter488->first); + xfer += oprot->writeI64(_iter488->second); + } + xfer += oprot->writeMapEnd(); } - xfer += oprot->writeMapEnd(); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.not_confirmed) { + xfer += oprot->writeFieldBegin("not_confirmed", ::apache::thrift::protocol::T_MAP, 6); + { + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, + ::apache::thrift::protocol::T_I64, + static_cast(this->not_confirmed.size())); + std::map::const_iterator _iter489; + for (_iter489 = this->not_confirmed.begin(); _iter489 != this->not_confirmed.end(); + ++_iter489) { + xfer += oprot->writeI32(_iter489->first); + xfer += oprot->writeI64(_iter489->second); + } + xfer += oprot->writeMapEnd(); + } + xfer += oprot->writeFieldEnd(); } - xfer += oprot->writeFieldEnd(); - xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -11594,45 +11640,50 @@ void swap(duplication_entry &a, duplication_entry &b) swap(a.remote, b.remote); swap(a.create_ts, b.create_ts); swap(a.progress, b.progress); + swap(a.not_confirmed, b.not_confirmed); swap(a.__isset, b.__isset); } -duplication_entry::duplication_entry(const duplication_entry &other482) +duplication_entry::duplication_entry(const duplication_entry &other490) { - dupid = other482.dupid; - status = other482.status; - remote = other482.remote; - create_ts = other482.create_ts; - progress = other482.progress; - __isset = other482.__isset; + dupid = other490.dupid; + status = other490.status; + remote = other490.remote; + create_ts = other490.create_ts; + progress = other490.progress; + not_confirmed = other490.not_confirmed; + __isset = other490.__isset; } -duplication_entry::duplication_entry(duplication_entry &&other483) +duplication_entry::duplication_entry(duplication_entry &&other491) { - dupid = std::move(other483.dupid); - status = std::move(other483.status); - remote = std::move(other483.remote); - create_ts = std::move(other483.create_ts); - progress = std::move(other483.progress); - __isset = std::move(other483.__isset); + dupid = std::move(other491.dupid); + status = std::move(other491.status); + remote = std::move(other491.remote); + create_ts = std::move(other491.create_ts); + progress = std::move(other491.progress); + not_confirmed = std::move(other491.not_confirmed); + __isset = std::move(other491.__isset); } -duplication_entry &duplication_entry::operator=(const duplication_entry &other484) +duplication_entry &duplication_entry::operator=(const duplication_entry &other492) { - dupid = other484.dupid; - status = other484.status; - remote = other484.remote; - create_ts = other484.create_ts; - progress = other484.progress; - __isset = other484.__isset; + dupid = other492.dupid; + status = other492.status; + remote = other492.remote; + create_ts = other492.create_ts; + progress = other492.progress; + not_confirmed = other492.not_confirmed; + __isset = other492.__isset; return *this; } -duplication_entry &duplication_entry::operator=(duplication_entry &&other485) +duplication_entry &duplication_entry::operator=(duplication_entry &&other493) { - dupid = std::move(other485.dupid); - status = std::move(other485.status); - remote = std::move(other485.remote); - create_ts = std::move(other485.create_ts); - progress = std::move(other485.progress); - __isset = std::move(other485.__isset); + dupid = std::move(other493.dupid); + status = std::move(other493.status); + remote = std::move(other493.remote); + create_ts = std::move(other493.create_ts); + progress = std::move(other493.progress); + not_confirmed = std::move(other493.not_confirmed); + __isset = std::move(other493.__isset); return *this; } void duplication_entry::printTo(std::ostream &out) const @@ -11647,7 +11698,11 @@ void duplication_entry::printTo(std::ostream &out) const out << ", " << "create_ts=" << to_string(create_ts); out << ", " - << "progress=" << to_string(progress); + << "progress="; + (__isset.progress ? (out << to_string(progress)) : (out << "")); + out << ", " + << "not_confirmed="; + (__isset.not_confirmed ? (out << to_string(not_confirmed)) : (out << "")); out << ")"; } @@ -11716,28 +11771,28 @@ void swap(duplication_query_request &a, duplication_query_request &b) swap(a.__isset, b.__isset); } -duplication_query_request::duplication_query_request(const duplication_query_request &other486) +duplication_query_request::duplication_query_request(const duplication_query_request &other494) { - app_name = other486.app_name; - __isset = other486.__isset; + app_name = other494.app_name; + __isset = other494.__isset; } -duplication_query_request::duplication_query_request(duplication_query_request &&other487) +duplication_query_request::duplication_query_request(duplication_query_request &&other495) { - app_name = std::move(other487.app_name); - __isset = std::move(other487.__isset); + app_name = std::move(other495.app_name); + __isset = std::move(other495.__isset); } duplication_query_request &duplication_query_request:: -operator=(const duplication_query_request &other488) +operator=(const duplication_query_request &other496) { - app_name = other488.app_name; - __isset = other488.__isset; + app_name = other496.app_name; + __isset = other496.__isset; return *this; } duplication_query_request &duplication_query_request:: -operator=(duplication_query_request &&other489) +operator=(duplication_query_request &&other497) { - app_name = std::move(other489.app_name); - __isset = std::move(other489.__isset); + app_name = std::move(other497.app_name); + __isset = std::move(other497.__isset); return *this; } void duplication_query_request::printTo(std::ostream &out) const @@ -11798,13 +11853,13 @@ uint32_t duplication_query_response::read(::apache::thrift::protocol::TProtocol if (ftype == ::apache::thrift::protocol::T_LIST) { { this->entry_list.clear(); - uint32_t _size490; - ::apache::thrift::protocol::TType _etype493; - xfer += iprot->readListBegin(_etype493, _size490); - this->entry_list.resize(_size490); - uint32_t _i494; - for (_i494 = 0; _i494 < _size490; ++_i494) { - xfer += this->entry_list[_i494].read(iprot); + uint32_t _size498; + ::apache::thrift::protocol::TType _etype501; + xfer += iprot->readListBegin(_etype501, _size498); + this->entry_list.resize(_size498); + uint32_t _i502; + for (_i502 = 0; _i502 < _size498; ++_i502) { + xfer += this->entry_list[_i502].read(iprot); } xfer += iprot->readListEnd(); } @@ -11843,9 +11898,9 @@ uint32_t duplication_query_response::write(::apache::thrift::protocol::TProtocol { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->entry_list.size())); - std::vector::const_iterator _iter495; - for (_iter495 = this->entry_list.begin(); _iter495 != this->entry_list.end(); ++_iter495) { - xfer += (*_iter495).write(oprot); + std::vector::const_iterator _iter503; + for (_iter503 = this->entry_list.begin(); _iter503 != this->entry_list.end(); ++_iter503) { + xfer += (*_iter503).write(oprot); } xfer += oprot->writeListEnd(); } @@ -11865,36 +11920,36 @@ void swap(duplication_query_response &a, duplication_query_response &b) swap(a.__isset, b.__isset); } -duplication_query_response::duplication_query_response(const duplication_query_response &other496) +duplication_query_response::duplication_query_response(const duplication_query_response &other504) { - err = other496.err; - appid = other496.appid; - entry_list = other496.entry_list; - __isset = other496.__isset; + err = other504.err; + appid = other504.appid; + entry_list = other504.entry_list; + __isset = other504.__isset; } -duplication_query_response::duplication_query_response(duplication_query_response &&other497) +duplication_query_response::duplication_query_response(duplication_query_response &&other505) { - err = std::move(other497.err); - appid = std::move(other497.appid); - entry_list = std::move(other497.entry_list); - __isset = std::move(other497.__isset); + err = std::move(other505.err); + appid = std::move(other505.appid); + entry_list = std::move(other505.entry_list); + __isset = std::move(other505.__isset); } duplication_query_response &duplication_query_response:: -operator=(const duplication_query_response &other498) +operator=(const duplication_query_response &other506) { - err = other498.err; - appid = other498.appid; - entry_list = other498.entry_list; - __isset = other498.__isset; + err = other506.err; + appid = other506.appid; + entry_list = other506.entry_list; + __isset = other506.__isset; return *this; } duplication_query_response &duplication_query_response:: -operator=(duplication_query_response &&other499) +operator=(duplication_query_response &&other507) { - err = std::move(other499.err); - appid = std::move(other499.appid); - entry_list = std::move(other499.entry_list); - __isset = std::move(other499.__isset); + err = std::move(other507.err); + appid = std::move(other507.appid); + entry_list = std::move(other507.entry_list); + __isset = std::move(other507.__isset); return *this; } void duplication_query_response::printTo(std::ostream &out) const @@ -11992,32 +12047,32 @@ void swap(duplication_confirm_entry &a, duplication_confirm_entry &b) swap(a.__isset, b.__isset); } -duplication_confirm_entry::duplication_confirm_entry(const duplication_confirm_entry &other500) +duplication_confirm_entry::duplication_confirm_entry(const duplication_confirm_entry &other508) { - dupid = other500.dupid; - confirmed_decree = other500.confirmed_decree; - __isset = other500.__isset; + dupid = other508.dupid; + confirmed_decree = other508.confirmed_decree; + __isset = other508.__isset; } -duplication_confirm_entry::duplication_confirm_entry(duplication_confirm_entry &&other501) +duplication_confirm_entry::duplication_confirm_entry(duplication_confirm_entry &&other509) { - dupid = std::move(other501.dupid); - confirmed_decree = std::move(other501.confirmed_decree); - __isset = std::move(other501.__isset); + dupid = std::move(other509.dupid); + confirmed_decree = std::move(other509.confirmed_decree); + __isset = std::move(other509.__isset); } duplication_confirm_entry &duplication_confirm_entry:: -operator=(const duplication_confirm_entry &other502) +operator=(const duplication_confirm_entry &other510) { - dupid = other502.dupid; - confirmed_decree = other502.confirmed_decree; - __isset = other502.__isset; + dupid = other510.dupid; + confirmed_decree = other510.confirmed_decree; + __isset = other510.__isset; return *this; } duplication_confirm_entry &duplication_confirm_entry:: -operator=(duplication_confirm_entry &&other503) +operator=(duplication_confirm_entry &&other511) { - dupid = std::move(other503.dupid); - confirmed_decree = std::move(other503.confirmed_decree); - __isset = std::move(other503.__isset); + dupid = std::move(other511.dupid); + confirmed_decree = std::move(other511.confirmed_decree); + __isset = std::move(other511.__isset); return *this; } void duplication_confirm_entry::printTo(std::ostream &out) const @@ -12071,25 +12126,25 @@ uint32_t duplication_sync_request::read(::apache::thrift::protocol::TProtocol *i if (ftype == ::apache::thrift::protocol::T_MAP) { { this->confirm_list.clear(); - uint32_t _size504; - ::apache::thrift::protocol::TType _ktype505; - ::apache::thrift::protocol::TType _vtype506; - xfer += iprot->readMapBegin(_ktype505, _vtype506, _size504); - uint32_t _i508; - for (_i508 = 0; _i508 < _size504; ++_i508) { - ::dsn::gpid _key509; - xfer += _key509.read(iprot); - std::vector &_val510 = - this->confirm_list[_key509]; + uint32_t _size512; + ::apache::thrift::protocol::TType _ktype513; + ::apache::thrift::protocol::TType _vtype514; + xfer += iprot->readMapBegin(_ktype513, _vtype514, _size512); + uint32_t _i516; + for (_i516 = 0; _i516 < _size512; ++_i516) { + ::dsn::gpid _key517; + xfer += _key517.read(iprot); + std::vector &_val518 = + this->confirm_list[_key517]; { - _val510.clear(); - uint32_t _size511; - ::apache::thrift::protocol::TType _etype514; - xfer += iprot->readListBegin(_etype514, _size511); - _val510.resize(_size511); - uint32_t _i515; - for (_i515 = 0; _i515 < _size511; ++_i515) { - xfer += _val510[_i515].read(iprot); + _val518.clear(); + uint32_t _size519; + ::apache::thrift::protocol::TType _etype522; + xfer += iprot->readListBegin(_etype522, _size519); + _val518.resize(_size519); + uint32_t _i523; + for (_i523 = 0; _i523 < _size519; ++_i523) { + xfer += _val518[_i523].read(iprot); } xfer += iprot->readListEnd(); } @@ -12128,17 +12183,17 @@ uint32_t duplication_sync_request::write(::apache::thrift::protocol::TProtocol * xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_LIST, static_cast(this->confirm_list.size())); - std::map<::dsn::gpid, std::vector>::const_iterator _iter516; - for (_iter516 = this->confirm_list.begin(); _iter516 != this->confirm_list.end(); - ++_iter516) { - xfer += _iter516->first.write(oprot); + std::map<::dsn::gpid, std::vector>::const_iterator _iter524; + for (_iter524 = this->confirm_list.begin(); _iter524 != this->confirm_list.end(); + ++_iter524) { + xfer += _iter524->first.write(oprot); { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, - static_cast(_iter516->second.size())); - std::vector::const_iterator _iter517; - for (_iter517 = _iter516->second.begin(); _iter517 != _iter516->second.end(); - ++_iter517) { - xfer += (*_iter517).write(oprot); + static_cast(_iter524->second.size())); + std::vector::const_iterator _iter525; + for (_iter525 = _iter524->second.begin(); _iter525 != _iter524->second.end(); + ++_iter525) { + xfer += (*_iter525).write(oprot); } xfer += oprot->writeListEnd(); } @@ -12160,31 +12215,31 @@ void swap(duplication_sync_request &a, duplication_sync_request &b) swap(a.__isset, b.__isset); } -duplication_sync_request::duplication_sync_request(const duplication_sync_request &other518) +duplication_sync_request::duplication_sync_request(const duplication_sync_request &other526) { - node = other518.node; - confirm_list = other518.confirm_list; - __isset = other518.__isset; + node = other526.node; + confirm_list = other526.confirm_list; + __isset = other526.__isset; } -duplication_sync_request::duplication_sync_request(duplication_sync_request &&other519) +duplication_sync_request::duplication_sync_request(duplication_sync_request &&other527) { - node = std::move(other519.node); - confirm_list = std::move(other519.confirm_list); - __isset = std::move(other519.__isset); + node = std::move(other527.node); + confirm_list = std::move(other527.confirm_list); + __isset = std::move(other527.__isset); } duplication_sync_request &duplication_sync_request:: -operator=(const duplication_sync_request &other520) +operator=(const duplication_sync_request &other528) { - node = other520.node; - confirm_list = other520.confirm_list; - __isset = other520.__isset; + node = other528.node; + confirm_list = other528.confirm_list; + __isset = other528.__isset; return *this; } -duplication_sync_request &duplication_sync_request::operator=(duplication_sync_request &&other521) +duplication_sync_request &duplication_sync_request::operator=(duplication_sync_request &&other529) { - node = std::move(other521.node); - confirm_list = std::move(other521.confirm_list); - __isset = std::move(other521.__isset); + node = std::move(other529.node); + confirm_list = std::move(other529.confirm_list); + __isset = std::move(other529.__isset); return *this; } void duplication_sync_request::printTo(std::ostream &out) const @@ -12238,27 +12293,27 @@ uint32_t duplication_sync_response::read(::apache::thrift::protocol::TProtocol * if (ftype == ::apache::thrift::protocol::T_MAP) { { this->dup_map.clear(); - uint32_t _size522; - ::apache::thrift::protocol::TType _ktype523; - ::apache::thrift::protocol::TType _vtype524; - xfer += iprot->readMapBegin(_ktype523, _vtype524, _size522); - uint32_t _i526; - for (_i526 = 0; _i526 < _size522; ++_i526) { - int32_t _key527; - xfer += iprot->readI32(_key527); - std::map &_val528 = this->dup_map[_key527]; + uint32_t _size530; + ::apache::thrift::protocol::TType _ktype531; + ::apache::thrift::protocol::TType _vtype532; + xfer += iprot->readMapBegin(_ktype531, _vtype532, _size530); + uint32_t _i534; + for (_i534 = 0; _i534 < _size530; ++_i534) { + int32_t _key535; + xfer += iprot->readI32(_key535); + std::map &_val536 = this->dup_map[_key535]; { - _val528.clear(); - uint32_t _size529; - ::apache::thrift::protocol::TType _ktype530; - ::apache::thrift::protocol::TType _vtype531; - xfer += iprot->readMapBegin(_ktype530, _vtype531, _size529); - uint32_t _i533; - for (_i533 = 0; _i533 < _size529; ++_i533) { - int32_t _key534; - xfer += iprot->readI32(_key534); - duplication_entry &_val535 = _val528[_key534]; - xfer += _val535.read(iprot); + _val536.clear(); + uint32_t _size537; + ::apache::thrift::protocol::TType _ktype538; + ::apache::thrift::protocol::TType _vtype539; + xfer += iprot->readMapBegin(_ktype538, _vtype539, _size537); + uint32_t _i541; + for (_i541 = 0; _i541 < _size537; ++_i541) { + int32_t _key542; + xfer += iprot->readI32(_key542); + duplication_entry &_val543 = _val536[_key542]; + xfer += _val543.read(iprot); } xfer += iprot->readMapEnd(); } @@ -12297,18 +12352,18 @@ uint32_t duplication_sync_response::write(::apache::thrift::protocol::TProtocol xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, ::apache::thrift::protocol::T_MAP, static_cast(this->dup_map.size())); - std::map>::const_iterator _iter536; - for (_iter536 = this->dup_map.begin(); _iter536 != this->dup_map.end(); ++_iter536) { - xfer += oprot->writeI32(_iter536->first); + std::map>::const_iterator _iter544; + for (_iter544 = this->dup_map.begin(); _iter544 != this->dup_map.end(); ++_iter544) { + xfer += oprot->writeI32(_iter544->first); { xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_I32, ::apache::thrift::protocol::T_STRUCT, - static_cast(_iter536->second.size())); - std::map::const_iterator _iter537; - for (_iter537 = _iter536->second.begin(); _iter537 != _iter536->second.end(); - ++_iter537) { - xfer += oprot->writeI32(_iter537->first); - xfer += _iter537->second.write(oprot); + static_cast(_iter544->second.size())); + std::map::const_iterator _iter545; + for (_iter545 = _iter544->second.begin(); _iter545 != _iter544->second.end(); + ++_iter545) { + xfer += oprot->writeI32(_iter545->first); + xfer += _iter545->second.write(oprot); } xfer += oprot->writeMapEnd(); } @@ -12330,32 +12385,32 @@ void swap(duplication_sync_response &a, duplication_sync_response &b) swap(a.__isset, b.__isset); } -duplication_sync_response::duplication_sync_response(const duplication_sync_response &other538) +duplication_sync_response::duplication_sync_response(const duplication_sync_response &other546) { - err = other538.err; - dup_map = other538.dup_map; - __isset = other538.__isset; + err = other546.err; + dup_map = other546.dup_map; + __isset = other546.__isset; } -duplication_sync_response::duplication_sync_response(duplication_sync_response &&other539) +duplication_sync_response::duplication_sync_response(duplication_sync_response &&other547) { - err = std::move(other539.err); - dup_map = std::move(other539.dup_map); - __isset = std::move(other539.__isset); + err = std::move(other547.err); + dup_map = std::move(other547.dup_map); + __isset = std::move(other547.__isset); } duplication_sync_response &duplication_sync_response:: -operator=(const duplication_sync_response &other540) +operator=(const duplication_sync_response &other548) { - err = other540.err; - dup_map = other540.dup_map; - __isset = other540.__isset; + err = other548.err; + dup_map = other548.dup_map; + __isset = other548.__isset; return *this; } duplication_sync_response &duplication_sync_response:: -operator=(duplication_sync_response &&other541) +operator=(duplication_sync_response &&other549) { - err = std::move(other541.err); - dup_map = std::move(other541.dup_map); - __isset = std::move(other541.__isset); + err = std::move(other549.err); + dup_map = std::move(other549.dup_map); + __isset = std::move(other549.__isset); return *this; } void duplication_sync_response::printTo(std::ostream &out) const @@ -12433,26 +12488,26 @@ void swap(ddd_diagnose_request &a, ddd_diagnose_request &b) swap(a.__isset, b.__isset); } -ddd_diagnose_request::ddd_diagnose_request(const ddd_diagnose_request &other542) +ddd_diagnose_request::ddd_diagnose_request(const ddd_diagnose_request &other550) { - pid = other542.pid; - __isset = other542.__isset; + pid = other550.pid; + __isset = other550.__isset; } -ddd_diagnose_request::ddd_diagnose_request(ddd_diagnose_request &&other543) +ddd_diagnose_request::ddd_diagnose_request(ddd_diagnose_request &&other551) { - pid = std::move(other543.pid); - __isset = std::move(other543.__isset); + pid = std::move(other551.pid); + __isset = std::move(other551.__isset); } -ddd_diagnose_request &ddd_diagnose_request::operator=(const ddd_diagnose_request &other544) +ddd_diagnose_request &ddd_diagnose_request::operator=(const ddd_diagnose_request &other552) { - pid = other544.pid; - __isset = other544.__isset; + pid = other552.pid; + __isset = other552.__isset; return *this; } -ddd_diagnose_request &ddd_diagnose_request::operator=(ddd_diagnose_request &&other545) +ddd_diagnose_request &ddd_diagnose_request::operator=(ddd_diagnose_request &&other553) { - pid = std::move(other545.pid); - __isset = std::move(other545.__isset); + pid = std::move(other553.pid); + __isset = std::move(other553.__isset); return *this; } void ddd_diagnose_request::printTo(std::ostream &out) const @@ -12624,50 +12679,50 @@ void swap(ddd_node_info &a, ddd_node_info &b) swap(a.__isset, b.__isset); } -ddd_node_info::ddd_node_info(const ddd_node_info &other546) -{ - node = other546.node; - drop_time_ms = other546.drop_time_ms; - is_alive = other546.is_alive; - is_collected = other546.is_collected; - ballot = other546.ballot; - last_committed_decree = other546.last_committed_decree; - last_prepared_decree = other546.last_prepared_decree; - __isset = other546.__isset; -} -ddd_node_info::ddd_node_info(ddd_node_info &&other547) -{ - node = std::move(other547.node); - drop_time_ms = std::move(other547.drop_time_ms); - is_alive = std::move(other547.is_alive); - is_collected = std::move(other547.is_collected); - ballot = std::move(other547.ballot); - last_committed_decree = std::move(other547.last_committed_decree); - last_prepared_decree = std::move(other547.last_prepared_decree); - __isset = std::move(other547.__isset); -} -ddd_node_info &ddd_node_info::operator=(const ddd_node_info &other548) -{ - node = other548.node; - drop_time_ms = other548.drop_time_ms; - is_alive = other548.is_alive; - is_collected = other548.is_collected; - ballot = other548.ballot; - last_committed_decree = other548.last_committed_decree; - last_prepared_decree = other548.last_prepared_decree; - __isset = other548.__isset; +ddd_node_info::ddd_node_info(const ddd_node_info &other554) +{ + node = other554.node; + drop_time_ms = other554.drop_time_ms; + is_alive = other554.is_alive; + is_collected = other554.is_collected; + ballot = other554.ballot; + last_committed_decree = other554.last_committed_decree; + last_prepared_decree = other554.last_prepared_decree; + __isset = other554.__isset; +} +ddd_node_info::ddd_node_info(ddd_node_info &&other555) +{ + node = std::move(other555.node); + drop_time_ms = std::move(other555.drop_time_ms); + is_alive = std::move(other555.is_alive); + is_collected = std::move(other555.is_collected); + ballot = std::move(other555.ballot); + last_committed_decree = std::move(other555.last_committed_decree); + last_prepared_decree = std::move(other555.last_prepared_decree); + __isset = std::move(other555.__isset); +} +ddd_node_info &ddd_node_info::operator=(const ddd_node_info &other556) +{ + node = other556.node; + drop_time_ms = other556.drop_time_ms; + is_alive = other556.is_alive; + is_collected = other556.is_collected; + ballot = other556.ballot; + last_committed_decree = other556.last_committed_decree; + last_prepared_decree = other556.last_prepared_decree; + __isset = other556.__isset; return *this; } -ddd_node_info &ddd_node_info::operator=(ddd_node_info &&other549) +ddd_node_info &ddd_node_info::operator=(ddd_node_info &&other557) { - node = std::move(other549.node); - drop_time_ms = std::move(other549.drop_time_ms); - is_alive = std::move(other549.is_alive); - is_collected = std::move(other549.is_collected); - ballot = std::move(other549.ballot); - last_committed_decree = std::move(other549.last_committed_decree); - last_prepared_decree = std::move(other549.last_prepared_decree); - __isset = std::move(other549.__isset); + node = std::move(other557.node); + drop_time_ms = std::move(other557.drop_time_ms); + is_alive = std::move(other557.is_alive); + is_collected = std::move(other557.is_collected); + ballot = std::move(other557.ballot); + last_committed_decree = std::move(other557.last_committed_decree); + last_prepared_decree = std::move(other557.last_prepared_decree); + __isset = std::move(other557.__isset); return *this; } void ddd_node_info::printTo(std::ostream &out) const @@ -12735,13 +12790,13 @@ uint32_t ddd_partition_info::read(::apache::thrift::protocol::TProtocol *iprot) if (ftype == ::apache::thrift::protocol::T_LIST) { { this->dropped.clear(); - uint32_t _size550; - ::apache::thrift::protocol::TType _etype553; - xfer += iprot->readListBegin(_etype553, _size550); - this->dropped.resize(_size550); - uint32_t _i554; - for (_i554 = 0; _i554 < _size550; ++_i554) { - xfer += this->dropped[_i554].read(iprot); + uint32_t _size558; + ::apache::thrift::protocol::TType _etype561; + xfer += iprot->readListBegin(_etype561, _size558); + this->dropped.resize(_size558); + uint32_t _i562; + for (_i562 = 0; _i562 < _size558; ++_i562) { + xfer += this->dropped[_i562].read(iprot); } xfer += iprot->readListEnd(); } @@ -12784,9 +12839,9 @@ uint32_t ddd_partition_info::write(::apache::thrift::protocol::TProtocol *oprot) { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->dropped.size())); - std::vector::const_iterator _iter555; - for (_iter555 = this->dropped.begin(); _iter555 != this->dropped.end(); ++_iter555) { - xfer += (*_iter555).write(oprot); + std::vector::const_iterator _iter563; + for (_iter563 = this->dropped.begin(); _iter563 != this->dropped.end(); ++_iter563) { + xfer += (*_iter563).write(oprot); } xfer += oprot->writeListEnd(); } @@ -12810,34 +12865,34 @@ void swap(ddd_partition_info &a, ddd_partition_info &b) swap(a.__isset, b.__isset); } -ddd_partition_info::ddd_partition_info(const ddd_partition_info &other556) +ddd_partition_info::ddd_partition_info(const ddd_partition_info &other564) { - config = other556.config; - dropped = other556.dropped; - reason = other556.reason; - __isset = other556.__isset; + config = other564.config; + dropped = other564.dropped; + reason = other564.reason; + __isset = other564.__isset; } -ddd_partition_info::ddd_partition_info(ddd_partition_info &&other557) +ddd_partition_info::ddd_partition_info(ddd_partition_info &&other565) { - config = std::move(other557.config); - dropped = std::move(other557.dropped); - reason = std::move(other557.reason); - __isset = std::move(other557.__isset); + config = std::move(other565.config); + dropped = std::move(other565.dropped); + reason = std::move(other565.reason); + __isset = std::move(other565.__isset); } -ddd_partition_info &ddd_partition_info::operator=(const ddd_partition_info &other558) +ddd_partition_info &ddd_partition_info::operator=(const ddd_partition_info &other566) { - config = other558.config; - dropped = other558.dropped; - reason = other558.reason; - __isset = other558.__isset; + config = other566.config; + dropped = other566.dropped; + reason = other566.reason; + __isset = other566.__isset; return *this; } -ddd_partition_info &ddd_partition_info::operator=(ddd_partition_info &&other559) +ddd_partition_info &ddd_partition_info::operator=(ddd_partition_info &&other567) { - config = std::move(other559.config); - dropped = std::move(other559.dropped); - reason = std::move(other559.reason); - __isset = std::move(other559.__isset); + config = std::move(other567.config); + dropped = std::move(other567.dropped); + reason = std::move(other567.reason); + __isset = std::move(other567.__isset); return *this; } void ddd_partition_info::printTo(std::ostream &out) const @@ -12892,13 +12947,13 @@ uint32_t ddd_diagnose_response::read(::apache::thrift::protocol::TProtocol *ipro if (ftype == ::apache::thrift::protocol::T_LIST) { { this->partitions.clear(); - uint32_t _size560; - ::apache::thrift::protocol::TType _etype563; - xfer += iprot->readListBegin(_etype563, _size560); - this->partitions.resize(_size560); - uint32_t _i564; - for (_i564 = 0; _i564 < _size560; ++_i564) { - xfer += this->partitions[_i564].read(iprot); + uint32_t _size568; + ::apache::thrift::protocol::TType _etype571; + xfer += iprot->readListBegin(_etype571, _size568); + this->partitions.resize(_size568); + uint32_t _i572; + for (_i572 = 0; _i572 < _size568; ++_i572) { + xfer += this->partitions[_i572].read(iprot); } xfer += iprot->readListEnd(); } @@ -12933,9 +12988,9 @@ uint32_t ddd_diagnose_response::write(::apache::thrift::protocol::TProtocol *opr { xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast(this->partitions.size())); - std::vector::const_iterator _iter565; - for (_iter565 = this->partitions.begin(); _iter565 != this->partitions.end(); ++_iter565) { - xfer += (*_iter565).write(oprot); + std::vector::const_iterator _iter573; + for (_iter573 = this->partitions.begin(); _iter573 != this->partitions.end(); ++_iter573) { + xfer += (*_iter573).write(oprot); } xfer += oprot->writeListEnd(); } @@ -12954,30 +13009,30 @@ void swap(ddd_diagnose_response &a, ddd_diagnose_response &b) swap(a.__isset, b.__isset); } -ddd_diagnose_response::ddd_diagnose_response(const ddd_diagnose_response &other566) +ddd_diagnose_response::ddd_diagnose_response(const ddd_diagnose_response &other574) { - err = other566.err; - partitions = other566.partitions; - __isset = other566.__isset; + err = other574.err; + partitions = other574.partitions; + __isset = other574.__isset; } -ddd_diagnose_response::ddd_diagnose_response(ddd_diagnose_response &&other567) +ddd_diagnose_response::ddd_diagnose_response(ddd_diagnose_response &&other575) { - err = std::move(other567.err); - partitions = std::move(other567.partitions); - __isset = std::move(other567.__isset); + err = std::move(other575.err); + partitions = std::move(other575.partitions); + __isset = std::move(other575.__isset); } -ddd_diagnose_response &ddd_diagnose_response::operator=(const ddd_diagnose_response &other568) +ddd_diagnose_response &ddd_diagnose_response::operator=(const ddd_diagnose_response &other576) { - err = other568.err; - partitions = other568.partitions; - __isset = other568.__isset; + err = other576.err; + partitions = other576.partitions; + __isset = other576.__isset; return *this; } -ddd_diagnose_response &ddd_diagnose_response::operator=(ddd_diagnose_response &&other569) +ddd_diagnose_response &ddd_diagnose_response::operator=(ddd_diagnose_response &&other577) { - err = std::move(other569.err); - partitions = std::move(other569.partitions); - __isset = std::move(other569.__isset); + err = std::move(other577.err); + partitions = std::move(other577.partitions); + __isset = std::move(other577.__isset); return *this; } void ddd_diagnose_response::printTo(std::ostream &out) const @@ -13074,32 +13129,32 @@ void swap(app_partition_split_request &a, app_partition_split_request &b) } app_partition_split_request::app_partition_split_request( - const app_partition_split_request &other570) + const app_partition_split_request &other578) { - app_name = other570.app_name; - new_partition_count = other570.new_partition_count; - __isset = other570.__isset; + app_name = other578.app_name; + new_partition_count = other578.new_partition_count; + __isset = other578.__isset; } -app_partition_split_request::app_partition_split_request(app_partition_split_request &&other571) +app_partition_split_request::app_partition_split_request(app_partition_split_request &&other579) { - app_name = std::move(other571.app_name); - new_partition_count = std::move(other571.new_partition_count); - __isset = std::move(other571.__isset); + app_name = std::move(other579.app_name); + new_partition_count = std::move(other579.new_partition_count); + __isset = std::move(other579.__isset); } app_partition_split_request &app_partition_split_request:: -operator=(const app_partition_split_request &other572) +operator=(const app_partition_split_request &other580) { - app_name = other572.app_name; - new_partition_count = other572.new_partition_count; - __isset = other572.__isset; + app_name = other580.app_name; + new_partition_count = other580.new_partition_count; + __isset = other580.__isset; return *this; } app_partition_split_request &app_partition_split_request:: -operator=(app_partition_split_request &&other573) +operator=(app_partition_split_request &&other581) { - app_name = std::move(other573.app_name); - new_partition_count = std::move(other573.new_partition_count); - __isset = std::move(other573.__isset); + app_name = std::move(other581.app_name); + new_partition_count = std::move(other581.new_partition_count); + __isset = std::move(other581.__isset); return *this; } void app_partition_split_request::printTo(std::ostream &out) const @@ -13211,36 +13266,36 @@ void swap(app_partition_split_response &a, app_partition_split_response &b) } app_partition_split_response::app_partition_split_response( - const app_partition_split_response &other574) + const app_partition_split_response &other582) { - err = other574.err; - app_id = other574.app_id; - partition_count = other574.partition_count; - __isset = other574.__isset; + err = other582.err; + app_id = other582.app_id; + partition_count = other582.partition_count; + __isset = other582.__isset; } -app_partition_split_response::app_partition_split_response(app_partition_split_response &&other575) +app_partition_split_response::app_partition_split_response(app_partition_split_response &&other583) { - err = std::move(other575.err); - app_id = std::move(other575.app_id); - partition_count = std::move(other575.partition_count); - __isset = std::move(other575.__isset); + err = std::move(other583.err); + app_id = std::move(other583.app_id); + partition_count = std::move(other583.partition_count); + __isset = std::move(other583.__isset); } app_partition_split_response &app_partition_split_response:: -operator=(const app_partition_split_response &other576) +operator=(const app_partition_split_response &other584) { - err = other576.err; - app_id = other576.app_id; - partition_count = other576.partition_count; - __isset = other576.__isset; + err = other584.err; + app_id = other584.app_id; + partition_count = other584.partition_count; + __isset = other584.__isset; return *this; } app_partition_split_response &app_partition_split_response:: -operator=(app_partition_split_response &&other577) +operator=(app_partition_split_response &&other585) { - err = std::move(other577.err); - app_id = std::move(other577.app_id); - partition_count = std::move(other577.partition_count); - __isset = std::move(other577.__isset); + err = std::move(other585.err); + app_id = std::move(other585.app_id); + partition_count = std::move(other585.partition_count); + __isset = std::move(other585.__isset); return *this; } void app_partition_split_response::printTo(std::ostream &out) const diff --git a/src/dist/replication/meta_server/duplication/duplication_info.cpp b/src/dist/replication/meta_server/duplication/duplication_info.cpp index bbb1129a4f..10d7c0eee4 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.cpp +++ b/src/dist/replication/meta_server/duplication/duplication_info.cpp @@ -191,5 +191,25 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, return dup; } +void duplication_info::append_if_valid_for_query( + const app_state &app, + /*out*/ std::vector &entry_list) const +{ + zauto_read_lock l(_lock); + + entry_list.emplace_back(to_duplication_entry()); + duplication_entry &ent = entry_list.back(); + ent.__isset.not_confirmed = true; + // the confirmed decree is not useful for displaying + // the overall state of duplication, instead we show pending mutations. + ent.__isset.progress = false; + for (const partition_configuration &part : app.partitions) { + int pid = part.pid.get_partition_index(); + auto it = _progress.find(pid); + int64_t pending = part.last_committed_decree - it->second.stored_decree; + ent.not_confirmed[pid] = std::max(pending, int64_t(0)); + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/meta_server/duplication/duplication_info.h b/src/dist/replication/meta_server/duplication/duplication_info.h index 78b59eeb72..7ab6ccab9f 100644 --- a/src/dist/replication/meta_server/duplication/duplication_info.h +++ b/src/dist/replication/meta_server/duplication/duplication_info.h @@ -115,12 +115,8 @@ class duplication_info // duplication_query_rpc is handled in THREAD_POOL_META_SERVER, // which is not thread safe for read. - void append_if_valid_for_query( - /*out*/ std::vector &entry_list) const - { - zauto_read_lock l(_lock); - entry_list.emplace_back(to_duplication_entry()); - } + void append_if_valid_for_query(const app_state &app, + /*out*/ std::vector &entry_list) const; duplication_entry to_duplication_entry() const { @@ -129,6 +125,7 @@ class duplication_info entry.create_ts = create_timestamp_ms; entry.remote = remote; entry.status = _status; + entry.__isset.progress = true; for (const auto &kv : _progress) { if (!kv.second.is_inited) { continue; diff --git a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp index 8b6deb42d1..0ac4921d0b 100644 --- a/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp +++ b/src/dist/replication/meta_server/duplication/meta_duplication_service.cpp @@ -53,7 +53,7 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re response.appid = app->app_id; for (auto &dup_id_to_info : app->duplications) { const duplication_info_s_ptr &dup = dup_id_to_info.second; - dup->append_if_valid_for_query(response.entry_list); + dup->append_if_valid_for_query(*app, response.entry_list); } } } diff --git a/src/dist/replication/meta_server/meta_http_service.cpp b/src/dist/replication/meta_server/meta_http_service.cpp index 0579361fef..abe1f63cad 100644 --- a/src/dist/replication/meta_server/meta_http_service.cpp +++ b/src/dist/replication/meta_server/meta_http_service.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include "meta_server_failure_detector.h" #include "server_load_balancer.h" #include "server_state.h" +#include "duplication/meta_duplication_service.h" namespace dsn { namespace replication { @@ -573,8 +575,39 @@ void meta_http_service::query_backup_policy_handler(const http_request &req, htt tp_query_backup_policy.output(out, dsn::utils::table_printer::output_format::kJsonCompact); resp.body = out.str(); resp.status_code = http_status_code::ok; +} - return; +void meta_http_service::query_duplication_handler(const http_request &req, http_response &resp) +{ + if (!redirect_if_not_primary(req, resp)) { + return; + } + if (_service->_dup_svc == nullptr) { + resp.body = "duplication is not enabled [duplication_enabled=false]"; + resp.status_code = http_status_code::not_found; + return; + } + duplication_query_request rpc_req; + auto it = req.query_args.find("name"); + if (it == req.query_args.end()) { + resp.body = "name should not be empty"; + resp.status_code = http_status_code::bad_request; + return; + } + rpc_req.app_name = it->second; + duplication_query_response rpc_resp; + _service->_dup_svc->query_duplication_info(rpc_req, rpc_resp); + if (rpc_resp.err != ERR_OK) { + resp.body = rpc_resp.err.to_string(); + if (rpc_resp.err == ERR_APP_NOT_EXIST) { + resp.status_code = http_status_code::not_found; + } else { + resp.status_code = http_status_code::internal_server_error; + } + return; + } + resp.status_code = http_status_code::ok; + resp.body = duplication_query_response_to_string(rpc_resp); } bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp) diff --git a/src/dist/replication/meta_server/meta_http_service.h b/src/dist/replication/meta_server/meta_http_service.h index ae7a96b846..494f4c976e 100644 --- a/src/dist/replication/meta_server/meta_http_service.h +++ b/src/dist/replication/meta_server/meta_http_service.h @@ -23,6 +23,12 @@ class meta_http_service : public http_service std::placeholders::_1, std::placeholders::_2), "ip:port/meta/app?app_name=temp"); + register_handler("app/duplication", + std::bind(&meta_http_service::query_duplication_handler, + this, + std::placeholders::_1, + std::placeholders::_2), + "ip:port/meta/app/duplication?name="); register_handler("apps", std::bind(&meta_http_service::list_app_handler, this, @@ -63,6 +69,7 @@ class meta_http_service : public http_service void get_cluster_info_handler(const http_request &req, http_response &resp); void get_app_envs_handler(const http_request &req, http_response &resp); void query_backup_policy_handler(const http_request &req, http_response &resp); + void query_duplication_handler(const http_request &req, http_response &resp); private: // set redirect location if current server is not primary diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 98b166c91d..f258281d23 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -678,7 +678,10 @@ struct duplication_entry 4:i64 create_ts; // partition_index => confirmed decree - 5:map progress; + 5:optional map progress; + + // partition_index => approximate number of mutations that are not confirmed yet + 6:optional map not_confirmed; } // This request is sent from client to meta. diff --git a/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp index bacba95b9e..dbeb816b1f 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp @@ -26,9 +26,11 @@ #include #include +#include #include "dist/replication/meta_server/server_load_balancer.h" #include "dist/replication/meta_server/meta_server_failure_detector.h" +#include "dist/replication/meta_server/meta_http_service.h" #include "dist/replication/meta_server/duplication/meta_duplication_service.h" #include "dist/replication/test/meta_test/misc/misc.h" @@ -650,5 +652,36 @@ TEST_F(meta_duplication_service_test, recover_from_corrupted_meta_data) test_recover_from_corrupted_meta_data(); } +TEST_F(meta_duplication_service_test, query_duplication_handler) +{ + std::string test_app = "test-app"; + create_app(test_app); + create_dup(test_app); + meta_http_service mhs(_ms.get()); + + http_request fake_req; + http_response fake_resp; + fake_req.query_args["name"] = test_app + "not-found"; + mhs.query_duplication_handler(fake_req, fake_resp); + ASSERT_EQ(fake_resp.status_code, http_status_code::not_found); + + const auto &duplications = find_app(test_app)->duplications; + ASSERT_EQ(duplications.size(), 1); + auto dup = duplications.begin()->second; + + fake_req.query_args["name"] = test_app; + mhs.query_duplication_handler(fake_req, fake_resp); + ASSERT_EQ(fake_resp.status_code, http_status_code::ok); + char ts_buf[32]; + utils::time_ms_to_date_time( + static_cast(dup->create_timestamp_ms), ts_buf, sizeof(ts_buf)); + ASSERT_EQ( + fake_resp.body, + std::string() + + R"({"1":{"create_ts":")" + ts_buf + R"(","dupid":)" + std::to_string(dup->id) + + R"(,"not_confirmed_mutations_num":{"0":1,"1":1,"2":1,"3":1,"4":1,"5":1,"6":1,"7":1})" + R"(,"remote":"slave-cluster","status":"DS_START"},"appid":2})"); +} + } // namespace replication } // namespace dsn diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 85725928a1..49b32def80 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -312,3 +312,10 @@ if [ ! -d $TP_OUTPUT/include/prometheus ]; then else echo "skip build prometheus-cpp" fi + +#build nlohmann_json +if [ ! -d $TP_OUTPUT/include/nlohmann ]; then + mv $TP_SRC/nlohmann_json-3.7.3/include/nlohmann $TP_OUTPUT/include +else + echo "skip build nlohmann_json" +fi diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index af2d52dcba..efa0e7e977 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -280,6 +280,15 @@ check_and_download "${CURL_PKG}"\ "${CURL_NAME}" exit_if_fail $? +# nlohmann_json +# from: https://github.com/nlohmann/json/releases/download/v3.7.3/include.zip +NLOHMANN_JSON_NAME=nlohmann_json-3.7.3 +NLOHMANN_JSON_PKG=${NLOHMANN_JSON_NAME}.zip +check_and_download "${NLOHMANN_JSON_PKG}" \ + "${OSS_URL_PREFIX}/${NLOHMANN_JSON_PKG}" \ + "7249387593792b565dcb30d87bca0de3" \ + "${NLOHMANN_JSON_NAME}" + # s2geometry # from: https://github.com/google/s2geometry/archive/0239455c1e260d6d2c843649385b4fb9f5b28dba.zip S2GEOMETRY_NAME=s2geometry-0239455c1e260d6d2c843649385b4fb9f5b28dba