From b771f99f53142bbb1496dbcddccc5defaaaa8dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Guti=C3=A9rrez=20Segal=C3=A9s?= Date: Mon, 25 Mar 2019 10:40:35 -0700 Subject: [PATCH] ZooKeeper proxy filter (#5991) This filter decodes the ZooKeeper wire protocol and emits stats & metadata about requests, responses and events. This wire protocol parsing is based on: https://github.com/twitter/zktraffic https://github.com/rgs1/zktraffic-cpp The actual filter structure is based on the Mysql proxy filter. Signed-off-by: Raul Gutierrez Segales --- CODEOWNERS | 2 + .../network/zookeeper_proxy/v1alpha1/BUILD | 8 + .../v1alpha1/zookeeper_proxy.proto | 33 + .../network_filters/network_filters.rst | 1 + .../zookeeper_proxy_filter.rst | 92 ++ .../well_known_dynamic_metadata.rst | 1 + docs/root/intro/version_history.rst | 2 + source/common/common/enum_to_int.h | 7 +- source/extensions/extensions_build_config.bzl | 2 + .../filters/network/well_known_names.h | 2 + .../filters/network/zookeeper_proxy/BUILD | 47 + .../zookeeper_proxy/zookeeper_config.cc | 47 + .../zookeeper_proxy/zookeeper_config.h | 33 + .../zookeeper_proxy/zookeeper_decoder.cc | 415 +++++++++ .../zookeeper_proxy/zookeeper_decoder.h | 150 +++ .../zookeeper_proxy/zookeeper_filter.cc | 239 +++++ .../zookeeper_proxy/zookeeper_filter.h | 141 +++ .../zookeeper_proxy/zookeeper_utils.cc | 71 ++ .../network/zookeeper_proxy/zookeeper_utils.h | 45 + .../filters/network/zookeeper_proxy/BUILD | 27 + .../zookeeper_proxy/zookeeper_filter_test.cc | 874 ++++++++++++++++++ tools/spelling_dictionary.txt | 3 + 22 files changed, 2241 insertions(+), 1 deletion(-) create mode 100644 api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/BUILD create mode 100644 api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.proto create mode 100644 docs/root/configuration/network_filters/zookeeper_proxy_filter.rst create mode 100644 source/extensions/filters/network/zookeeper_proxy/BUILD create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_config.cc create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_config.h create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.cc create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.cc create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.h create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.cc create mode 100644 source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.h create mode 100644 test/extensions/filters/network/zookeeper_proxy/BUILD create mode 100644 test/extensions/filters/network/zookeeper_proxy/zookeeper_filter_test.cc diff --git a/CODEOWNERS b/CODEOWNERS index abdf25c16f86..0f81b447d285 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -22,3 +22,5 @@ /*/extensions/filters/network/mysql_proxy @rshriram @venilnoronha @mattklein123 # quic extension /*/extensions/quic_listeners/ @alyssawilk @danzh2010 @mattklein123 @mpwarres @wu-bin +# zookeeper_proxy extension +/*/extensions/filters/network/zookeeper_proxy @rgs1 @snowp diff --git a/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/BUILD b/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/BUILD new file mode 100644 index 000000000000..a29ebf3a8848 --- /dev/null +++ b/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/BUILD @@ -0,0 +1,8 @@ +load("//bazel:api_build_system.bzl", "api_proto_library_internal") + +licenses(["notice"]) # Apache 2 + +api_proto_library_internal( + name = "zookeeper_proxy", + srcs = ["zookeeper_proxy.proto"], +) diff --git a/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.proto b/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.proto new file mode 100644 index 000000000000..6a8afdd12ec0 --- /dev/null +++ b/api/envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package envoy.config.filter.network.zookeeper_proxy.v1alpha1; + +option java_outer_classname = "ZookeeperProxyProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.config.filter.network.zookeeper_proxy.v1alpha1"; +option go_package = "v1alpha1"; + +import "validate/validate.proto"; +import "google/protobuf/wrappers.proto"; + +// [#protodoc-title: ZooKeeper proxy] +// ZooKeeper Proxy :ref:`configuration overview `. +message ZooKeeperProxy { + // The human readable prefix to use when emitting :ref:`statistics + // `. + string stat_prefix = 1 [(validate.rules).string.min_bytes = 1]; + + // [#not-implemented-hide:] The optional path to use for writing ZooKeeper access logs. + // If the access log field is empty, access logs will not be written. + string access_log = 2; + + // Messages — requests, responses and events — that are bigger than this value will + // be ignored. If it is not set, the default value is 1Mb. + // + // The value here should match the jute.maxbuffer property in your cluster configuration: + // + // https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#Unsafe+Options + // + // if that is set. If it isn't, ZooKeeper's default is also 1Mb. + google.protobuf.UInt32Value max_packet_bytes = 3; +} diff --git a/docs/root/configuration/network_filters/network_filters.rst b/docs/root/configuration/network_filters/network_filters.rst index dd559ddd6689..91693bc40ab0 100644 --- a/docs/root/configuration/network_filters/network_filters.rst +++ b/docs/root/configuration/network_filters/network_filters.rst @@ -21,3 +21,4 @@ filters. tcp_proxy_filter thrift_proxy_filter sni_cluster_filter + zookeeper_proxy_filter diff --git a/docs/root/configuration/network_filters/zookeeper_proxy_filter.rst b/docs/root/configuration/network_filters/zookeeper_proxy_filter.rst new file mode 100644 index 000000000000..cf8e1c9716a7 --- /dev/null +++ b/docs/root/configuration/network_filters/zookeeper_proxy_filter.rst @@ -0,0 +1,92 @@ +.. _config_network_filters_zookeeper_proxy: + +ZooKeeper proxy +=============== + +The ZooKeeper proxy filter decodes the client protocol for +`Apache ZooKeeper `_. It decodes the requests, +responses and events in the payload. Most opcodes known in +`ZooKeeper 3.5 `_ +are supported. The unsupported ones are related to SASL authentication. + +.. attention:: + + The zookeeper_proxy filter is experimental and is currently under active + development. Capabilities will be expanded over time and the + configuration structures are likely to change. + +.. _config_network_filters_zookeeper_proxy_config: + +Configuration +------------- + +The ZooKeeper proxy filter should be chained with the TCP proxy filter as shown +in the configuration snippet below: + +.. code-block:: yaml + + filter_chains: + - filters: + - name: envoy.filters.network.zookeeper_proxy + config: + stat_prefix: zookeeper + - name: envoy.tcp_proxy + config: + stat_prefix: tcp + cluster: ... + + +.. _config_network_filters_zookeeper_proxy_stats: + +Statistics +---------- + +Every configured ZooKeeper proxy filter has statistics rooted at *zookeeper..* with the +following statistics: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + decoder_error, Counter, Number of times a message wasn't decoded + request_bytes, Counter, Number of bytes in decoded request messages + connect_rq, Counter, Number of regular connect (non-readonly) requests + connect_readonly_rq, Counter, Number of connect requests with the readonly flag set + ping_rq, Counter, Number of ping requests + auth._rq, Counter, Number of auth requests for a given type + getdata_rq, Counter, Number of getdata requests + create_rq, Counter, Number of create requests + create2_rq, Counter, Number of create2 requests + setdata_rq, Counter, Number of setdata requests + getchildren_rq, Counter, Number of getchildren requests + getchildren2_rq, Counter, Number of getchildren2 requests + remove_rq, Counter, Number of delete requests + exists_rq, Counter, Number of stat requests + getacl_rq, Counter, Number of getacl requests + setacl_rq, Counter, Number of setacl requests + sync_rq, Counter, Number of sync requests + multi_rq, Counter, Number of multi transaction requests + reconfig_rq, Counter, Number of reconfig requests + close_rq, Counter, Number of close requests + setwatches_rq, Counter, Number of setwatches requests + checkwatches_rq, Counter, Number of checkwatches requests + removewatches_rq, Counter, Number of removewatches requests + check_rq, Counter, Number of check requests + +.. _config_network_filters_zookeeper_proxy_dynamic_metadata: + +Dynamic Metadata +---------------- + +The ZooKeeper filter emits the following dynamic metadata for each message parsed: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + , string, "The path associated with the request, response or event" + , string, "The opname for the request, response or event" + , string, "The string representation of the flags applied to the znode" + , string, "The size of the request message in bytes" + , string, "True if a watch is being set, false otherwise" + , string, "The version parameter, if any, given with the request" diff --git a/docs/root/configuration/well_known_dynamic_metadata.rst b/docs/root/configuration/well_known_dynamic_metadata.rst index dd11866a42a0..73215617e46d 100644 --- a/docs/root/configuration/well_known_dynamic_metadata.rst +++ b/docs/root/configuration/well_known_dynamic_metadata.rst @@ -17,3 +17,4 @@ The following Envoy filters emit dynamic metadata that other filters can leverag * :ref:`MySQL Proxy Filter ` * :ref:`Role Based Access Control (RBAC) Filter ` * :ref:`Role Based Access Control (RBAC) Network Filter ` +* :ref:`ZooKeeper Proxy Filter ` diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index ec3b38d8353b..b307ddc5d1bd 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -71,6 +71,8 @@ Version history * upstream: add cluster factory to allow creating and registering :ref:`custom cluster type`. * tracing: added :ref:`verbose ` to support logging annotations on spans. * upstream: added support for host weighting and :ref:`locality weighting ` in the :ref:`ring hash load balancer `, and added a :ref:`maximum_ring_size` config parameter to strictly bound the ring size. +* zookeeper: added a ZooKeeper proxy filter that parses ZooKeeper messages (requests/responses/events). + Refer to ::ref:`ZooKeeper proxy` for more details. * upstream: added configuration option to select any host when the fallback policy fails. * upstream: stopped incrementing upstream_rq_total for HTTP/1 conn pool when request is circuit broken. diff --git a/source/common/common/enum_to_int.h b/source/common/common/enum_to_int.h index a9c77b59419e..ec613ef67df9 100644 --- a/source/common/common/enum_to_int.h +++ b/source/common/common/enum_to_int.h @@ -6,5 +6,10 @@ namespace Envoy { /** * Convert an int based enum to an int. */ -template uint32_t enumToInt(T val) { return static_cast(val); } +template constexpr uint32_t enumToInt(T val) { return static_cast(val); } + +/** + * Convert an int based enum to a signed int. + */ +template constexpr int32_t enumToSignedInt(T val) { return static_cast(val); } } // namespace Envoy diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index bd3793cc9d16..f973d63fad0a 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -77,6 +77,7 @@ EXTENSIONS = { "envoy.filters.network.tcp_proxy": "//source/extensions/filters/network/tcp_proxy:config", "envoy.filters.network.thrift_proxy": "//source/extensions/filters/network/thrift_proxy:config", "envoy.filters.network.sni_cluster": "//source/extensions/filters/network/sni_cluster:config", + "envoy.filters.network.zookeeper_proxy": "//source/extensions/filters/network/zookeeper_proxy:config", # # Resource monitors @@ -194,6 +195,7 @@ WINDOWS_EXTENSIONS = { "envoy.filters.network.tcp_proxy": "//source/extensions/filters/network/tcp_proxy:config", #"envoy.filters.network.thrift_proxy": "//source/extensions/filters/network/thrift_proxy:config", #"envoy.filters.network.sni_cluster": "//source/extensions/filters/network/sni_cluster:config", + #"envoy.filters.network.zookeeper_proxy": "//source/extensions/filters/network/zookeeper_proxy:config", # # Stat sinks diff --git a/source/extensions/filters/network/well_known_names.h b/source/extensions/filters/network/well_known_names.h index f540aab0d36f..a1d435f4e7b2 100644 --- a/source/extensions/filters/network/well_known_names.h +++ b/source/extensions/filters/network/well_known_names.h @@ -38,6 +38,8 @@ class NetworkFilterNameValues { const std::string Rbac = "envoy.filters.network.rbac"; // SNI Cluster filter const std::string SniCluster = "envoy.filters.network.sni_cluster"; + // ZooKeeper proxy filter + const std::string ZooKeeperProxy = "envoy.filters.network.zookeeper_proxy"; // Converts names from v1 to v2 const Config::V1Converter v1_converter_; diff --git a/source/extensions/filters/network/zookeeper_proxy/BUILD b/source/extensions/filters/network/zookeeper_proxy/BUILD new file mode 100644 index 000000000000..4fae6bda7267 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/BUILD @@ -0,0 +1,47 @@ +licenses(["notice"]) # Apache 2 + +# ZooKeeper proxy L7 network filter. +# Public docs: docs/root/configuration/network_filters/zookeeper_proxy_filter.rst + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "proxy_lib", + srcs = [ + "zookeeper_decoder.cc", + "zookeeper_filter.cc", + "zookeeper_utils.cc", + ], + hdrs = [ + "zookeeper_decoder.h", + "zookeeper_filter.h", + "zookeeper_utils.h", + ], + deps = [ + "//include/envoy/network:filter_interface", + "//include/envoy/server:filter_config_interface", + "//include/envoy/stats:stats_interface", + "//include/envoy/stats:stats_macros", + "//source/common/config:filter_json_lib", + "//source/common/network:filter_lib", + "//source/extensions/filters/network:well_known_names", + ], +) + +envoy_cc_library( + name = "config", + srcs = ["zookeeper_config.cc"], + hdrs = ["zookeeper_config.h"], + deps = [ + ":proxy_lib", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//envoy/config/filter/network/zookeeper_proxy/v1alpha1:zookeeper_proxy_cc", + ], +) diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.cc b/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.cc new file mode 100644 index 000000000000..7a2bda7a7bcb --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.cc @@ -0,0 +1,47 @@ +#include "extensions/filters/network/zookeeper_proxy/zookeeper_config.h" + +#include + +#include "envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.pb.validate.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "common/common/logger.h" + +#include "extensions/filters/network/zookeeper_proxy/zookeeper_filter.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +/** + * Config registration for the ZooKeeper proxy filter. @see NamedNetworkFilterConfigFactory. + */ +Network::FilterFactoryCb +NetworkFilters::ZooKeeperProxy::ZooKeeperConfigFactory::createFilterFactoryFromProtoTyped( + const envoy::config::filter::network::zookeeper_proxy::v1alpha1::ZooKeeperProxy& proto_config, + Server::Configuration::FactoryContext& context) { + + ASSERT(!proto_config.stat_prefix().empty()); + + const std::string stat_prefix = fmt::format("{}.zookeeper.", proto_config.stat_prefix()); + const uint32_t max_packet_bytes = + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, max_packet_bytes, 1024 * 1024); + + ZooKeeperFilterConfigSharedPtr filter_config( + std::make_shared(stat_prefix, max_packet_bytes, context.scope())); + return [filter_config](Network::FilterManager& filter_manager) -> void { + filter_manager.addFilter(std::make_shared(filter_config)); + }; +} + +/** + * Static registration for the ZooKeeper proxy filter. @see RegisterFactory. + */ +REGISTER_FACTORY(ZooKeeperConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory); + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.h b/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.h new file mode 100644 index 000000000000..2dc1f86ba332 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_config.h @@ -0,0 +1,33 @@ +#pragma once + +#include "envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.pb.h" +#include "envoy/config/filter/network/zookeeper_proxy/v1alpha1/zookeeper_proxy.pb.validate.h" + +#include "extensions/filters/network/common/factory_base.h" +#include "extensions/filters/network/well_known_names.h" +#include "extensions/filters/network/zookeeper_proxy/zookeeper_filter.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +/** + * Config registration for the ZooKeeper proxy filter. + */ +class ZooKeeperConfigFactory + : public Common::FactoryBase< + envoy::config::filter::network::zookeeper_proxy::v1alpha1::ZooKeeperProxy> { +public: + ZooKeeperConfigFactory() : FactoryBase(NetworkFilterNames::get().ZooKeeperProxy) {} + +private: + Network::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::config::filter::network::zookeeper_proxy::v1alpha1::ZooKeeperProxy& proto_config, + Server::Configuration::FactoryContext& context) override; +}; + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.cc b/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.cc new file mode 100644 index 000000000000..dddd22a0ef63 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.cc @@ -0,0 +1,415 @@ +#include "extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +constexpr uint32_t BOOL_LENGTH = 1; +constexpr uint32_t INT_LENGTH = 4; +constexpr uint32_t LONG_LENGTH = 8; +constexpr uint32_t XID_LENGTH = 4; +constexpr uint32_t OPCODE_LENGTH = 4; +constexpr uint32_t ZXID_LENGTH = 8; +constexpr uint32_t TIMEOUT_LENGTH = 4; +constexpr uint32_t SESSION_LENGTH = 8; +constexpr uint32_t MULTI_HEADER_LENGTH = 9; + +const char* createFlagsToString(CreateFlags flags) { + switch (flags) { + case CreateFlags::PERSISTENT: + return "persistent"; + case CreateFlags::PERSISTENT_SEQUENTIAL: + return "persistent_sequential"; + case CreateFlags::EPHEMERAL: + return "ephemeral"; + case CreateFlags::EPHEMERAL_SEQUENTIAL: + return "ephemeral_sequential"; + case CreateFlags::CONTAINER: + return "container"; + case CreateFlags::PERSISTENT_WITH_TTL: + return "persistent_with_ttl"; + case CreateFlags::PERSISTENT_SEQUENTIAL_WITH_TTL: + return "persistent_sequential_with_ttl"; + } + + return "unknown"; +} + +void DecoderImpl::decode(Buffer::Instance& data, uint64_t& offset) { + ENVOY_LOG(trace, "zookeeper_proxy: decoding {} bytes at offset {}", data.length(), offset); + + // Reset the helper's cursor, to ensure the current message stays within the + // allowed max length, even when it's different than the declared length + // by the message. + // + // Note: we need to keep two cursors — offset and helper_'s internal one — because + // a buffer may contain multiple messages, so offset is global and helper_'s + // internal cursor is reset for each individual message. + helper_.reset(); + + // Check message length. + const int32_t len = helper_.peekInt32(data, offset); + ensureMinLength(len, INT_LENGTH + XID_LENGTH); + ensureMaxLength(len); + + // Control requests, with XIDs <= 0. + // + // These are meant to control the state of a session: + // connect, keep-alive, authenticate and set initial watches. + // + // Note: setWatches is a command historically used to set watches + // right after connecting, typically used when roaming from one + // ZooKeeper server to the next. Thus, the special xid. + // However, some client implementations might expose setWatches + // as a regular data request, so we support that as well. + const int32_t xid = helper_.peekInt32(data, offset); + switch (static_cast(xid)) { + case XidCodes::CONNECT_XID: + parseConnect(data, offset, len); + return; + case XidCodes::PING_XID: + offset += OPCODE_LENGTH; + callbacks_.onPing(); + return; + case XidCodes::AUTH_XID: + parseAuthRequest(data, offset, len); + return; + case XidCodes::SET_WATCHES_XID: + offset += OPCODE_LENGTH; + parseSetWatchesRequest(data, offset, len); + return; + default: + // WATCH_XID is generated by the server, so that and everything + // else can be ignored here. + break; + } + + // Data requests, with XIDs > 0. + // + // These are meant to happen after a successful control request, except + // for two cases: auth requests can happen at any time and ping requests + // must happen every 1/3 of the negotiated session timeout, to keep + // the session alive. + const int32_t opcode = helper_.peekInt32(data, offset); + switch (static_cast(opcode)) { + case OpCodes::GETDATA: + parseGetDataRequest(data, offset, len); + break; + case OpCodes::CREATE: + case OpCodes::CREATE2: + case OpCodes::CREATECONTAINER: + case OpCodes::CREATETTL: + parseCreateRequest(data, offset, len, static_cast(opcode)); + break; + case OpCodes::SETDATA: + parseSetRequest(data, offset, len); + break; + case OpCodes::GETCHILDREN: + parseGetChildrenRequest(data, offset, len, false); + break; + case OpCodes::GETCHILDREN2: + parseGetChildrenRequest(data, offset, len, true); + break; + case OpCodes::DELETE: + parseDeleteRequest(data, offset, len); + break; + case OpCodes::EXISTS: + parseExistsRequest(data, offset, len); + break; + case OpCodes::GETACL: + parseGetAclRequest(data, offset, len); + break; + case OpCodes::SETACL: + parseSetAclRequest(data, offset, len); + break; + case OpCodes::SYNC: + callbacks_.onSyncRequest(pathOnlyRequest(data, offset, len)); + break; + case OpCodes::CHECK: + parseCheckRequest(data, offset, len); + break; + case OpCodes::MULTI: + parseMultiRequest(data, offset, len); + break; + case OpCodes::RECONFIG: + parseReconfigRequest(data, offset, len); + break; + case OpCodes::SETWATCHES: + parseSetWatchesRequest(data, offset, len); + break; + case OpCodes::CHECKWATCHES: + parseXWatchesRequest(data, offset, len, OpCodes::CHECKWATCHES); + break; + case OpCodes::REMOVEWATCHES: + parseXWatchesRequest(data, offset, len, OpCodes::REMOVEWATCHES); + break; + case OpCodes::GETEPHEMERALS: + callbacks_.onGetEphemeralsRequest(pathOnlyRequest(data, offset, len)); + break; + case OpCodes::GETALLCHILDRENNUMBER: + callbacks_.onGetAllChildrenNumberRequest(pathOnlyRequest(data, offset, len)); + break; + case OpCodes::CLOSE: + callbacks_.onCloseRequest(); + break; + default: + throw EnvoyException(fmt::format("Unknown opcode: {}", opcode)); + } +} + +void DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const { + if (len < minlen) { + throw EnvoyException("Packet is too small"); + } +} + +void DecoderImpl::ensureMaxLength(const int32_t len) const { + if (static_cast(len) > max_packet_bytes_) { + throw EnvoyException("Packet is too big"); + } +} + +void DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH); + + // Skip zxid, timeout, and session id. + offset += ZXID_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH; + + // Skip password. + skipString(data, offset); + + // Read readonly flag, if it's there. + bool readonly{}; + if (data.length() >= offset + 1) { + readonly = helper_.peekBool(data, offset); + } + + callbacks_.onConnect(readonly); +} + +void DecoderImpl::parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + INT_LENGTH + INT_LENGTH); + + // Skip opcode + type. + offset += OPCODE_LENGTH + INT_LENGTH; + const std::string scheme = helper_.peekString(data, offset); + // Skip credential. + skipString(data, offset); + + callbacks_.onAuthRequest(scheme); +} + +void DecoderImpl::parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); + + callbacks_.onGetDataRequest(path, watch); +} + +void DecoderImpl::skipAcls(Buffer::Instance& data, uint64_t& offset) { + const int32_t count = helper_.peekInt32(data, offset); + + for (int i = 0; i < count; ++i) { + // Perms. + helper_.peekInt32(data, offset); + // Skip scheme. + skipString(data, offset); + // Skip cred. + skipString(data, offset); + } +} + +void DecoderImpl::parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + + // Skip data. + skipString(data, offset); + skipAcls(data, offset); + + const CreateFlags flags = static_cast(helper_.peekInt32(data, offset)); + callbacks_.onCreateRequest(path, flags, opcode); +} + +void DecoderImpl::parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + // Skip data. + skipString(data, offset); + // Ignore version. + helper_.peekInt32(data, offset); + + callbacks_.onSetRequest(path); +} + +void DecoderImpl::parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + const bool two) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); + + callbacks_.onGetChildrenRequest(path, watch, two); +} + +void DecoderImpl::parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + const int32_t version = helper_.peekInt32(data, offset); + + callbacks_.onDeleteRequest(path, version); +} + +void DecoderImpl::parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH + BOOL_LENGTH); + + const std::string path = helper_.peekString(data, offset); + const bool watch = helper_.peekBool(data, offset); + + callbacks_.onExistsRequest(path, watch); +} + +void DecoderImpl::parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); + + const std::string path = helper_.peekString(data, offset); + + callbacks_.onGetAclRequest(path); +} + +void DecoderImpl::parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + skipAcls(data, offset); + const int32_t version = helper_.peekInt32(data, offset); + + callbacks_.onSetAclRequest(path, version); +} + +std::string DecoderImpl::pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + INT_LENGTH); + return helper_.peekString(data, offset); +} + +void DecoderImpl::parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, (2 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + const int32_t version = helper_.peekInt32(data, offset); + + callbacks_.onCheckRequest(path, version); +} + +void DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + // Treat empty transactions as a decoding error, there should be at least 1 header. + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + MULTI_HEADER_LENGTH); + + while (true) { + const int32_t opcode = helper_.peekInt32(data, offset); + const bool done = helper_.peekBool(data, offset); + // Ignore error field. + helper_.peekInt32(data, offset); + + if (done) { + break; + } + + switch (static_cast(opcode)) { + case OpCodes::CREATE: + parseCreateRequest(data, offset, len, OpCodes::CREATE); + break; + case OpCodes::SETDATA: + parseSetRequest(data, offset, len); + break; + case OpCodes::CHECK: + parseCheckRequest(data, offset, len); + break; + default: + throw EnvoyException(fmt::format("Unknown opcode within a transaction: {}", opcode)); + } + } + + callbacks_.onMultiRequest(); +} + +void DecoderImpl::parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH) + LONG_LENGTH); + + // Skip joining. + skipString(data, offset); + // Skip leaving. + skipString(data, offset); + // Skip new members. + skipString(data, offset); + // Read config id. + helper_.peekInt64(data, offset); + + callbacks_.onReconfigRequest(); +} + +void DecoderImpl::parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (3 * INT_LENGTH)); + + // Data watches. + skipStrings(data, offset); + // Exist watches. + skipStrings(data, offset); + // Child watches. + skipStrings(data, offset); + + callbacks_.onSetWatchesRequest(); +} + +void DecoderImpl::parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, + OpCodes opcode) { + ensureMinLength(len, XID_LENGTH + OPCODE_LENGTH + (2 * INT_LENGTH)); + + const std::string path = helper_.peekString(data, offset); + const int32_t type = helper_.peekInt32(data, offset); + + if (opcode == OpCodes::CHECKWATCHES) { + callbacks_.onCheckWatchesRequest(path, type); + } else { + callbacks_.onRemoveWatchesRequest(path, type); + } +} + +void DecoderImpl::skipString(Buffer::Instance& data, uint64_t& offset) { + const int32_t slen = helper_.peekInt32(data, offset); + helper_.skip(slen, offset); +} + +void DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) { + const int32_t count = helper_.peekInt32(data, offset); + + for (int i = 0; i < count; ++i) { + skipString(data, offset); + } +} + +void DecoderImpl::onData(Buffer::Instance& data) { + uint64_t offset = 0; + try { + while (offset < data.length()) { + const uint64_t current = offset; + decode(data, offset); + callbacks_.onRequestBytes(offset - current); + } + } catch (const EnvoyException& e) { + ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what()); + callbacks_.onDecodeError(); + } +} + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h b/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h new file mode 100644 index 000000000000..62144ef91006 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h @@ -0,0 +1,150 @@ +#pragma once +#include + +#include "envoy/common/platform.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/logger.h" + +#include "extensions/filters/network/zookeeper_proxy/zookeeper_utils.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +enum class XidCodes { + CONNECT_XID = 0, + WATCH_XID = -1, + PING_XID = -2, + AUTH_XID = -4, + SET_WATCHES_XID = -8 +}; + +enum class OpCodes { + CONNECT = 0, + CREATE = 1, + DELETE = 2, + EXISTS = 3, + GETDATA = 4, + SETDATA = 5, + GETACL = 6, + SETACL = 7, + GETCHILDREN = 8, + SYNC = 9, + PING = 11, + GETCHILDREN2 = 12, + CHECK = 13, + MULTI = 14, + CREATE2 = 15, + RECONFIG = 16, + CHECKWATCHES = 17, + REMOVEWATCHES = 18, + CREATECONTAINER = 19, + CREATETTL = 21, + CLOSE = -11, + SETAUTH = 100, + SETWATCHES = 101, + GETEPHEMERALS = 103, + GETALLCHILDRENNUMBER = 104 +}; + +enum class WatcherType { CHILDREN = 1, DATA = 2, ANY = 3 }; + +enum class CreateFlags { + PERSISTENT, + PERSISTENT_SEQUENTIAL, + EPHEMERAL, + EPHEMERAL_SEQUENTIAL, + CONTAINER, + PERSISTENT_WITH_TTL, + PERSISTENT_SEQUENTIAL_WITH_TTL +}; + +const char* createFlagsToString(CreateFlags flags); + +/** + * General callbacks for dispatching decoded ZooKeeper messages to a sink. + */ +class DecoderCallbacks { +public: + virtual ~DecoderCallbacks() {} + + virtual void onDecodeError() PURE; + virtual void onRequestBytes(uint64_t bytes) PURE; + virtual void onConnect(bool readonly) PURE; + virtual void onPing() PURE; + virtual void onAuthRequest(const std::string& scheme) PURE; + virtual void onGetDataRequest(const std::string& path, bool watch) PURE; + virtual void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) PURE; + virtual void onSetRequest(const std::string& path) PURE; + virtual void onGetChildrenRequest(const std::string& path, bool watch, bool v2) PURE; + virtual void onGetEphemeralsRequest(const std::string& path) PURE; + virtual void onGetAllChildrenNumberRequest(const std::string& path) PURE; + virtual void onDeleteRequest(const std::string& path, int32_t version) PURE; + virtual void onExistsRequest(const std::string& path, bool watch) PURE; + virtual void onGetAclRequest(const std::string& path) PURE; + virtual void onSetAclRequest(const std::string& path, int32_t version) PURE; + virtual void onSyncRequest(const std::string& path) PURE; + virtual void onCheckRequest(const std::string& path, int32_t version) PURE; + virtual void onMultiRequest() PURE; + virtual void onReconfigRequest() PURE; + virtual void onSetWatchesRequest() PURE; + virtual void onCheckWatchesRequest(const std::string& path, int32_t type) PURE; + virtual void onRemoveWatchesRequest(const std::string& path, int32_t type) PURE; + virtual void onCloseRequest() PURE; +}; + +/** + * ZooKeeper message decoder. + */ +class Decoder { +public: + virtual ~Decoder() {} + + virtual void onData(Buffer::Instance& data) PURE; +}; + +typedef std::unique_ptr DecoderPtr; + +class DecoderImpl : public Decoder, Logger::Loggable { +public: + explicit DecoderImpl(DecoderCallbacks& callbacks, uint32_t max_packet_bytes) + : callbacks_(callbacks), max_packet_bytes_(max_packet_bytes), helper_(max_packet_bytes) {} + + // ZooKeeperProxy::Decoder + void onData(Buffer::Instance& data) override; + +private: + void decode(Buffer::Instance& data, uint64_t& offset); + void parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseCreateRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); + void skipAcls(Buffer::Instance& data, uint64_t& offset); + void parseSetRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetChildrenRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, bool two); + void parseDeleteRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseExistsRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseGetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseSetAclRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseCheckRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseMultiRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseReconfigRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseSetWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + void parseXWatchesRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len, OpCodes opcode); + void skipString(Buffer::Instance& data, uint64_t& offset); + void skipStrings(Buffer::Instance& data, uint64_t& offset); + void ensureMinLength(int32_t len, int32_t minlen) const; + void ensureMaxLength(int32_t len) const; + std::string pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len); + + DecoderCallbacks& callbacks_; + const uint32_t max_packet_bytes_; + BufferHelper helper_; +}; + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.cc b/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.cc new file mode 100644 index 000000000000..ac78aad9c7b7 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.cc @@ -0,0 +1,239 @@ +#include "extensions/filters/network/zookeeper_proxy/zookeeper_filter.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/assert.h" +#include "common/common/enum_to_int.h" +#include "common/common/fmt.h" +#include "common/common/logger.h" + +#include "extensions/filters/network/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +ZooKeeperFilterConfig::ZooKeeperFilterConfig(const std::string& stat_prefix, + const uint32_t max_packet_bytes, Stats::Scope& scope) + : scope_(scope), max_packet_bytes_(max_packet_bytes), stat_prefix_(stat_prefix), + stats_(generateStats(stat_prefix, scope)) {} + +ZooKeeperFilter::ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config) + : config_(std::move(config)) {} + +void ZooKeeperFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; +} + +Network::FilterStatus ZooKeeperFilter::onData(Buffer::Instance& data, bool) { + doDecode(data); + return Network::FilterStatus::Continue; +} + +Network::FilterStatus ZooKeeperFilter::onWrite(Buffer::Instance&, bool) { + return Network::FilterStatus::Continue; +} + +Network::FilterStatus ZooKeeperFilter::onNewConnection() { return Network::FilterStatus::Continue; } + +void ZooKeeperFilter::doDecode(Buffer::Instance& buffer) { + clearDynamicMetadata(); + + if (!decoder_) { + decoder_ = createDecoder(*this); + } + + decoder_->onData(buffer); +} + +DecoderPtr ZooKeeperFilter::createDecoder(DecoderCallbacks& callbacks) { + return std::make_unique(callbacks, config_->maxPacketBytes()); +} + +void ZooKeeperFilter::setDynamicMetadata(const std::string& key, const std::string& value) { + setDynamicMetadata({{key, value}}); +} + +void ZooKeeperFilter::clearDynamicMetadata() { + envoy::api::v2::core::Metadata& dynamic_metadata = + read_callbacks_->connection().streamInfo().dynamicMetadata(); + auto& metadata = + (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy]; + metadata.mutable_fields()->clear(); +} + +void ZooKeeperFilter::setDynamicMetadata( + const std::vector>& data) { + envoy::api::v2::core::Metadata& dynamic_metadata = + read_callbacks_->connection().streamInfo().dynamicMetadata(); + ProtobufWkt::Struct metadata( + (*dynamic_metadata.mutable_filter_metadata())[NetworkFilterNames::get().ZooKeeperProxy]); + auto& fields = *metadata.mutable_fields(); + + for (const auto& pair : data) { + auto val = ProtobufWkt::Value(); + val.set_string_value(pair.second); + fields.insert({pair.first, val}); + } + + read_callbacks_->connection().streamInfo().setDynamicMetadata( + NetworkFilterNames::get().ZooKeeperProxy, metadata); +} + +void ZooKeeperFilter::onConnect(const bool readonly) { + if (readonly) { + config_->stats_.connect_readonly_rq_.inc(); + setDynamicMetadata("opname", "connect_readonly"); + } else { + config_->stats_.connect_rq_.inc(); + setDynamicMetadata("opname", "connect"); + } +} + +void ZooKeeperFilter::onDecodeError() { + config_->stats_.decoder_error_.inc(); + setDynamicMetadata("opname", "error"); +} + +void ZooKeeperFilter::onRequestBytes(const uint64_t bytes) { + config_->stats_.request_bytes_.add(bytes); + setDynamicMetadata("bytes", std::to_string(bytes)); +} + +void ZooKeeperFilter::onPing() { + config_->stats_.ping_rq_.inc(); + setDynamicMetadata("opname", "ping"); +} + +void ZooKeeperFilter::onAuthRequest(const std::string& scheme) { + config_->scope_.counter(fmt::format("{}.auth.{}_rq", config_->stat_prefix_, scheme)).inc(); + setDynamicMetadata("opname", "auth"); +} + +void ZooKeeperFilter::onGetDataRequest(const std::string& path, const bool watch) { + config_->stats_.getdata_rq_.inc(); + setDynamicMetadata({{"opname", "getdata"}, {"path", path}, {"watch", watch ? "true" : "false"}}); +} + +void ZooKeeperFilter::onCreateRequest(const std::string& path, const CreateFlags flags, + const OpCodes opcode) { + std::string opname; + + switch (opcode) { + case OpCodes::CREATE: + opname = "create"; + config_->stats_.create_rq_.inc(); + break; + case OpCodes::CREATE2: + opname = "create2"; + config_->stats_.create2_rq_.inc(); + break; + case OpCodes::CREATECONTAINER: + opname = "createcontainer"; + config_->stats_.createcontainer_rq_.inc(); + break; + case OpCodes::CREATETTL: + opname = "createttl"; + config_->stats_.createttl_rq_.inc(); + break; + default: + throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode))); + break; + } + + setDynamicMetadata( + {{"opname", opname}, {"path", path}, {"create_type", createFlagsToString(flags)}}); +} + +void ZooKeeperFilter::onSetRequest(const std::string& path) { + config_->stats_.setdata_rq_.inc(); + setDynamicMetadata({{"opname", "setdata"}, {"path", path}}); +} + +void ZooKeeperFilter::onGetChildrenRequest(const std::string& path, const bool watch, + const bool v2) { + std::string opname = "getchildren"; + + if (v2) { + config_->stats_.getchildren2_rq_.inc(); + opname = "getchildren2"; + } else { + config_->stats_.getchildren_rq_.inc(); + } + + setDynamicMetadata({{"opname", opname}, {"path", path}, {"watch", watch ? "true" : "false"}}); +} + +void ZooKeeperFilter::onDeleteRequest(const std::string& path, const int32_t version) { + config_->stats_.remove_rq_.inc(); + setDynamicMetadata({{"opname", "remove"}, {"path", path}, {"version", std::to_string(version)}}); +} + +void ZooKeeperFilter::onExistsRequest(const std::string& path, const bool watch) { + config_->stats_.exists_rq_.inc(); + setDynamicMetadata({{"opname", "exists"}, {"path", path}, {"watch", watch ? "true" : "false"}}); +} + +void ZooKeeperFilter::onGetAclRequest(const std::string& path) { + config_->stats_.getacl_rq_.inc(); + setDynamicMetadata({{"opname", "getacl"}, {"path", path}}); +} + +void ZooKeeperFilter::onSetAclRequest(const std::string& path, const int32_t version) { + config_->stats_.setacl_rq_.inc(); + setDynamicMetadata({{"opname", "setacl"}, {"path", path}, {"version", std::to_string(version)}}); +} + +void ZooKeeperFilter::onSyncRequest(const std::string& path) { + config_->stats_.sync_rq_.inc(); + setDynamicMetadata({{"opname", "sync"}, {"path", path}}); +} + +void ZooKeeperFilter::onCheckRequest(const std::string&, const int32_t) { + config_->stats_.check_rq_.inc(); +} + +void ZooKeeperFilter::onCheckWatchesRequest(const std::string& path, const int32_t) { + config_->stats_.checkwatches_rq_.inc(); + setDynamicMetadata({{"opname", "checkwatches"}, {"path", path}}); +} + +void ZooKeeperFilter::onRemoveWatchesRequest(const std::string& path, const int32_t) { + config_->stats_.removewatches_rq_.inc(); + setDynamicMetadata({{"opname", "removewatches"}, {"path", path}}); +} + +void ZooKeeperFilter::onMultiRequest() { + config_->stats_.multi_rq_.inc(); + setDynamicMetadata("opname", "multi"); +} + +void ZooKeeperFilter::onReconfigRequest() { + config_->stats_.reconfig_rq_.inc(); + setDynamicMetadata("opname", "reconfig"); +} + +void ZooKeeperFilter::onSetWatchesRequest() { + config_->stats_.setwatches_rq_.inc(); + setDynamicMetadata("opname", "setwatches"); +} + +void ZooKeeperFilter::onGetEphemeralsRequest(const std::string& path) { + config_->stats_.getephemerals_rq_.inc(); + setDynamicMetadata({{"opname", "getephemerals"}, {"path", path}}); +} + +void ZooKeeperFilter::onGetAllChildrenNumberRequest(const std::string& path) { + config_->stats_.getallchildrennumber_rq_.inc(); + setDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", path}}); +} + +void ZooKeeperFilter::onCloseRequest() { + config_->stats_.close_rq_.inc(); + setDynamicMetadata("opname", "close"); +} + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.h b/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.h new file mode 100644 index 000000000000..20cdfec0a8f4 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_filter.h @@ -0,0 +1,141 @@ +#pragma once + +#include "envoy/access_log/access_log.h" +#include "envoy/network/connection.h" +#include "envoy/network/filter.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats.h" +#include "envoy/stats/stats_macros.h" + +#include "common/common/logger.h" + +#include "extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +/** + * All ZooKeeper proxy stats. @see stats_macros.h + */ +// clang-format off +#define ALL_ZOOKEEPER_PROXY_STATS(COUNTER) \ + COUNTER(decoder_error) \ + COUNTER(request_bytes) \ + COUNTER(connect_rq) \ + COUNTER(connect_readonly_rq) \ + COUNTER(getdata_rq) \ + COUNTER(create_rq) \ + COUNTER(create2_rq) \ + COUNTER(createcontainer_rq) \ + COUNTER(createttl_rq) \ + COUNTER(setdata_rq) \ + COUNTER(getchildren_rq) \ + COUNTER(getchildren2_rq) \ + COUNTER(getephemerals_rq) \ + COUNTER(getallchildrennumber_rq) \ + COUNTER(remove_rq) \ + COUNTER(exists_rq) \ + COUNTER(getacl_rq) \ + COUNTER(setacl_rq) \ + COUNTER(sync_rq) \ + COUNTER(ping_rq) \ + COUNTER(multi_rq) \ + COUNTER(reconfig_rq) \ + COUNTER(close_rq) \ + COUNTER(setauth_rq) \ + COUNTER(setwatches_rq) \ + COUNTER(checkwatches_rq) \ + COUNTER(removewatches_rq) \ + COUNTER(check_rq) \ +// clang-format on + +/** + * Struct definition for all ZooKeeper proxy stats. @see stats_macros.h + */ +struct ZooKeeperProxyStats { + ALL_ZOOKEEPER_PROXY_STATS(GENERATE_COUNTER_STRUCT) +}; + +/** + * Configuration for the ZooKeeper proxy filter. + */ +class ZooKeeperFilterConfig { +public: + ZooKeeperFilterConfig(const std::string &stat_prefix, uint32_t max_packet_bytes, Stats::Scope& scope); + + const ZooKeeperProxyStats& stats() { return stats_; } + uint32_t maxPacketBytes() const { return max_packet_bytes_; } + + Stats::Scope& scope_; + const uint32_t max_packet_bytes_; + const std::string stat_prefix_; + ZooKeeperProxyStats stats_; + +private: + ZooKeeperProxyStats generateStats(const std::string& prefix, + Stats::Scope& scope) { + return ZooKeeperProxyStats{ + ALL_ZOOKEEPER_PROXY_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; + } +}; + +using ZooKeeperFilterConfigSharedPtr = std::shared_ptr; + +/** + * Implementation of ZooKeeper proxy filter. + */ +class ZooKeeperFilter : public Network::Filter, DecoderCallbacks, Logger::Loggable { +public: + explicit ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config); + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; + Network::FilterStatus onNewConnection() override; + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; + + // Network::WriteFilter + Network::FilterStatus onWrite(Buffer::Instance& data, bool end_stream) override; + + // ZooKeeperProxy::DecoderCallback + void onDecodeError() override; + void onRequestBytes(uint64_t bytes) override; + void onConnect(bool readonly) override; + void onPing() override; + void onAuthRequest(const std::string& scheme) override; + void onGetDataRequest(const std::string& path, bool watch) override; + void onCreateRequest(const std::string& path, CreateFlags flags, OpCodes opcode) override; + void onSetRequest(const std::string& path) override; + void onGetChildrenRequest(const std::string& path, bool watch, bool v2) override; + void onDeleteRequest(const std::string& path, int32_t version) override; + void onExistsRequest(const std::string& path, bool watch) override; + void onGetAclRequest(const std::string& path) override; + void onSetAclRequest(const std::string& path, int32_t version) override; + void onSyncRequest(const std::string& path) override; + void onCheckRequest(const std::string& path, int32_t version) override; + void onMultiRequest() override; + void onReconfigRequest() override; + void onSetWatchesRequest() override; + void onCheckWatchesRequest(const std::string& path, int32_t type) override; + void onRemoveWatchesRequest(const std::string& path, int32_t type) override; + void onGetEphemeralsRequest(const std::string& path) override; + void onGetAllChildrenNumberRequest(const std::string& path) override; + void onCloseRequest() override; + + void doDecode(Buffer::Instance& buffer); + DecoderPtr createDecoder(DecoderCallbacks& callbacks); + void setDynamicMetadata(const std::string& key, const std::string& value); + void setDynamicMetadata(const std::vector>& data); + void clearDynamicMetadata(); + +private: + Network::ReadFilterCallbacks* read_callbacks_{}; + ZooKeeperFilterConfigSharedPtr config_; + std::unique_ptr decoder_; +}; + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.cc b/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.cc new file mode 100644 index 000000000000..1a4ad1c7af4d --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.cc @@ -0,0 +1,71 @@ +#include "extensions/filters/network/zookeeper_proxy/zookeeper_utils.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +int32_t BufferHelper::peekInt32(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(sizeof(int32_t)); + + int32_t val = buffer.peekBEInt(offset); + offset += sizeof(int32_t); + return val; +} + +int64_t BufferHelper::peekInt64(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(sizeof(int64_t)); + + int64_t val = buffer.peekBEInt(offset); + offset += sizeof(int64_t); + return val; +} + +bool BufferHelper::peekBool(Buffer::Instance& buffer, uint64_t& offset) { + ensureMaxLen(1); + + const char byte = buffer.peekInt(offset); + const bool val = static_cast(byte); + offset += 1; + return val; +} + +std::string BufferHelper::peekString(Buffer::Instance& buffer, uint64_t& offset) { + std::string val; + uint32_t len = peekInt32(buffer, offset); + + if (len == 0) { + return val; + } + + if (buffer.length() < (offset + len)) { + throw EnvoyException("peekString: buffer is smaller than string length"); + } + + ensureMaxLen(len); + + std::unique_ptr data(new char[len]); + buffer.copyOut(offset, len, data.get()); + val.assign(data.get(), len); + offset += len; + + return val; +} + +void BufferHelper::skip(const uint32_t len, uint64_t& offset) { + offset += len; + current_ += len; +} + +void BufferHelper::ensureMaxLen(const uint32_t size) { + current_ += size; + + if (current_ > max_len_) { + throw EnvoyException("read beyond max length"); + } +} + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.h b/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.h new file mode 100644 index 000000000000..559ef0f63093 --- /dev/null +++ b/source/extensions/filters/network/zookeeper_proxy/zookeeper_utils.h @@ -0,0 +1,45 @@ +#pragma once +#include + +#include "envoy/common/platform.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/byte_order.h" +#include "common/common/logger.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +/** + * Helper for extracting ZooKeeper data from a buffer. + * + * If at any point a peek is tried beyond max_len, an EnvoyException + * will be thrown. This is important to protect Envoy against malformed + * requests (e.g.: when the declared and actual length don't match). + * + * Note: ZooKeeper's protocol uses network byte ordering (big-endian). + */ +class BufferHelper : public Logger::Loggable { +public: + BufferHelper(uint32_t max_len) : max_len_(max_len) {} + + int32_t peekInt32(Buffer::Instance& buffer, uint64_t& offset); + int64_t peekInt64(Buffer::Instance& buffer, uint64_t& offset); + std::string peekString(Buffer::Instance& buffer, uint64_t& offset); + bool peekBool(Buffer::Instance& buffer, uint64_t& offset); + void skip(uint32_t len, uint64_t& offset); + void reset() { current_ = 0; } + +private: + void ensureMaxLen(uint32_t size); + + uint32_t max_len_; + uint32_t current_{}; +}; + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/zookeeper_proxy/BUILD b/test/extensions/filters/network/zookeeper_proxy/BUILD new file mode 100644 index 000000000000..81af4151cf11 --- /dev/null +++ b/test/extensions/filters/network/zookeeper_proxy/BUILD @@ -0,0 +1,27 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test_library", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_mock", + "envoy_extension_cc_test", + "envoy_extension_cc_test_library", +) + +envoy_package() + +envoy_extension_cc_test( + name = "zookeeper_filter_test", + srcs = [ + "zookeeper_filter_test.cc", + ], + extension_name = "envoy.filters.network.zookeeper_proxy", + deps = [ + "//source/extensions/filters/network/zookeeper_proxy:config", + "//test/mocks/network:network_mocks", + ], +) diff --git a/test/extensions/filters/network/zookeeper_proxy/zookeeper_filter_test.cc b/test/extensions/filters/network/zookeeper_proxy/zookeeper_filter_test.cc new file mode 100644 index 000000000000..182ea1bba570 --- /dev/null +++ b/test/extensions/filters/network/zookeeper_proxy/zookeeper_filter_test.cc @@ -0,0 +1,874 @@ +#include "common/buffer/buffer_impl.h" + +#include "extensions/filters/network/zookeeper_proxy/zookeeper_decoder.h" +#include "extensions/filters/network/zookeeper_proxy/zookeeper_filter.h" + +#include "test/mocks/network/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::NiceMock; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ZooKeeperProxy { + +bool protoMapEq(const ProtobufWkt::Struct& obj, const std::map& rhs) { + EXPECT_TRUE(rhs.size() > 0); + for (auto const& entry : rhs) { + EXPECT_EQ(obj.fields().at(entry.first).string_value(), entry.second); + } + return true; +} + +MATCHER_P(MapEq, rhs, "") { return protoMapEq(arg, rhs); } + +class ZooKeeperFilterTest : public testing::Test { +public: + ZooKeeperFilterTest() { ENVOY_LOG_MISC(info, "test"); } + + void initialize() { + config_ = std::make_shared(stat_prefix_, 1048576, scope_); + filter_ = std::make_unique(config_); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + } + + Buffer::OwnedImpl encodeConnect(const bool readonly = false, const uint64_t zxid = 100, + const uint32_t session_timeout = 10, + const uint32_t session_id = 200, + const std::string& passwd = "") const { + Buffer::OwnedImpl buffer; + const uint32_t message_size = readonly ? 28 + passwd.length() + 1 : 28 + passwd.length(); + + buffer.writeBEInt(message_size); + buffer.writeBEInt(0); // Protocol version. + buffer.writeBEInt(zxid); + buffer.writeBEInt(session_timeout); + buffer.writeBEInt(session_id); + addString(buffer, passwd); + + if (readonly) { + const char readonly_flag = 0b1; + buffer.add(std::string(1, readonly_flag)); + } + + return buffer; + } + + Buffer::OwnedImpl encodeBadMessage() const { + Buffer::OwnedImpl buffer; + + // Bad length. + buffer.writeBEInt(1); + // Trailing int. + buffer.writeBEInt(3); + + return buffer; + } + + Buffer::OwnedImpl encodeTooBigMessage() const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(1048577); + + return buffer; + } + + Buffer::OwnedImpl encodeBiggerThanLengthMessage() const { + Buffer::OwnedImpl buffer; + + // Craft a delete request with a path that's longer than + // the declared message length. + buffer.writeBEInt(50); + buffer.writeBEInt(1000); + // Opcode. + buffer.writeBEInt(enumToSignedInt(OpCodes::DELETE)); + // Path. + addString(buffer, std::string(2 * 1024 * 1024, '*')); + // Version. + buffer.writeBEInt(-1); + + return buffer; + } + + Buffer::OwnedImpl encodePing() const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(8); + buffer.writeBEInt(enumToSignedInt(XidCodes::PING_XID)); + buffer.writeBEInt(enumToInt(OpCodes::PING)); + + return buffer; + } + + Buffer::OwnedImpl encodeUnknownOpcode() const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(8); + buffer.writeBEInt(1000); + buffer.writeBEInt(200); + + return buffer; + } + + Buffer::OwnedImpl encodeCloseRequest() const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(8); + buffer.writeBEInt(1000); + buffer.writeBEInt(enumToSignedInt(OpCodes::CLOSE)); + + return buffer; + } + + Buffer::OwnedImpl encodeAuth(const std::string& scheme) const { + const std::string credential = "p@sswd"; + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(28 + scheme.length() + credential.length()); + buffer.writeBEInt(enumToSignedInt(XidCodes::AUTH_XID)); + buffer.writeBEInt(enumToSignedInt(OpCodes::SETAUTH)); + // Type. + buffer.writeBEInt(0); + addString(buffer, scheme); + addString(buffer, credential); + + return buffer; + } + + Buffer::OwnedImpl + encodePathWatch(const std::string& path, const bool watch, + const int32_t opcode = enumToSignedInt(OpCodes::GETDATA)) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(13 + path.length()); + buffer.writeBEInt(1000); + // Opcode. + buffer.writeBEInt(opcode); + // Path. + addString(buffer, path); + // Watch. + const char watch_flag = watch ? 0b1 : 0b0; + buffer.add(std::string(1, watch_flag)); + + return buffer; + } + + Buffer::OwnedImpl encodePathVersion(const std::string& path, const int32_t version, + const int32_t opcode = enumToSignedInt(OpCodes::GETDATA), + const bool txn = false) const { + Buffer::OwnedImpl buffer; + + if (!txn) { + buffer.writeBEInt(16 + path.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(opcode); + } + + // Path. + addString(buffer, path); + // Version + buffer.writeBEInt(version); + + return buffer; + } + + Buffer::OwnedImpl encodePath(const std::string& path, const int32_t opcode) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(8 + path.length()); + buffer.writeBEInt(1000); + // Opcode. + buffer.writeBEInt(opcode); + // Path. + addString(buffer, path); + + return buffer; + } + + Buffer::OwnedImpl encodePathLongerThanBuffer(const std::string& path, + const int32_t opcode) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(8 + path.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(opcode); + buffer.writeBEInt(path.length() * 2); + buffer.add(path); + + return buffer; + } + + Buffer::OwnedImpl + encodeCreateRequest(const std::string& path, const std::string& data, const CreateFlags flags, + const bool txn = false, + const int32_t opcode = enumToSignedInt(OpCodes::CREATE)) const { + Buffer::OwnedImpl buffer; + + if (!txn) { + buffer.writeBEInt(24 + path.length() + data.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(opcode); + } + + // Path. + addString(buffer, path); + // Data. + addString(buffer, data); + // Acls. + buffer.writeBEInt(0); + // Flags. + buffer.writeBEInt(static_cast(flags)); + + return buffer; + } + + Buffer::OwnedImpl encodeSetRequest(const std::string& path, const std::string& data, + const int32_t version, const bool txn = false) const { + Buffer::OwnedImpl buffer; + + if (!txn) { + buffer.writeBEInt(20 + path.length() + data.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(enumToSignedInt(OpCodes::SETDATA)); + } + + // Path. + addString(buffer, path); + // Data. + addString(buffer, data); + // Version. + buffer.writeBEInt(version); + + return buffer; + } + + Buffer::OwnedImpl encodeDeleteRequest(const std::string& path, const int32_t version) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(16 + path.length()); + buffer.writeBEInt(1000); + // Opcode. + buffer.writeBEInt(enumToSignedInt(OpCodes::DELETE)); + // Path. + addString(buffer, path); + // Version. + buffer.writeBEInt(version); + + return buffer; + } + + Buffer::OwnedImpl encodeSetAclRequest(const std::string& path, const std::string& scheme, + const std::string& credential, + const int32_t version) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(32 + path.length() + scheme.length() + credential.length()); + buffer.writeBEInt(1000); + // Opcode. + buffer.writeBEInt(enumToSignedInt(OpCodes::SETACL)); + // Path. + addString(buffer, path); + + // Acls. + buffer.writeBEInt(1); + // Type. + buffer.writeBEInt(0); + // Scheme. + addString(buffer, scheme); + // Credential. + addString(buffer, credential); + + // Version. + buffer.writeBEInt(version); + + return buffer; + } + + Buffer::OwnedImpl encodeReconfigRequest(const std::string& joining, const std::string& leaving, + const std::string& new_members, int64_t config_id) const { + Buffer::OwnedImpl buffer; + + buffer.writeBEInt(28 + joining.length() + leaving.length() + new_members.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(enumToSignedInt(OpCodes::RECONFIG)); + addString(buffer, joining); + addString(buffer, leaving); + addString(buffer, new_members); + buffer.writeBEInt(config_id); + + return buffer; + } + + Buffer::OwnedImpl encodeSetWatchesRequest(const std::vector& dataw, + const std::vector& existw, + const std::vector& childw, + int32_t xid = 1000) const { + Buffer::OwnedImpl buffer; + Buffer::OwnedImpl watches_buffer; + + addStrings(watches_buffer, dataw); + addStrings(watches_buffer, existw); + addStrings(watches_buffer, childw); + + buffer.writeBEInt(8 + watches_buffer.length()); + buffer.writeBEInt(xid); + buffer.writeBEInt(enumToSignedInt(OpCodes::SETWATCHES)); + buffer.add(watches_buffer); + + return buffer; + } + + Buffer::OwnedImpl + encodeMultiRequest(const std::vector>& ops) const { + Buffer::OwnedImpl buffer; + Buffer::OwnedImpl requests; + + for (const auto& op_pair : ops) { + // Header. + requests.writeBEInt(op_pair.first); + requests.add(std::string(1, 0b0)); + requests.writeBEInt(-1); + + // Payload. + requests.add(op_pair.second); + } + + // Done header. + requests.writeBEInt(-1); + requests.add(std::string(1, 0b1)); + requests.writeBEInt(-1); + + // Multi prefix. + buffer.writeBEInt(8 + requests.length()); + buffer.writeBEInt(1000); + buffer.writeBEInt(enumToSignedInt(OpCodes::MULTI)); + + // Requests. + buffer.add(requests); + + return buffer; + } + + void addString(Buffer::OwnedImpl& buffer, const std::string& str) const { + buffer.writeBEInt(str.length()); + buffer.add(str); + } + + void addStrings(Buffer::OwnedImpl& buffer, const std::vector& watches) const { + buffer.writeBEInt(watches.size()); + + for (const auto& watch : watches) { + addString(buffer, watch); + } + } + + void expectSetDynamicMetadata(const std::map& expected) { + EXPECT_CALL(filter_callbacks_.connection_, streamInfo()) + .WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(stream_info_, + setDynamicMetadata("envoy.filters.network.zookeeper_proxy", MapEq(expected))); + } + + void expectSetDynamicMetadata(const std::map& first, + const std::map& second) { + EXPECT_CALL(filter_callbacks_.connection_, streamInfo()) + .WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(stream_info_, setDynamicMetadata(_, _)) + .WillOnce(Invoke([first](const std::string& key, const ProtobufWkt::Struct& obj) -> void { + EXPECT_STREQ(key.c_str(), "envoy.filters.network.zookeeper_proxy"); + protoMapEq(obj, first); + })) + .WillOnce(Invoke([second](const std::string& key, const ProtobufWkt::Struct& obj) -> void { + EXPECT_STREQ(key.c_str(), "envoy.filters.network.zookeeper_proxy"); + protoMapEq(obj, second); + })); + } + + void testCreate(CreateFlags flags, const OpCodes opcode = OpCodes::CREATE) { + initialize(); + Buffer::OwnedImpl data = + encodeCreateRequest("/foo", "bar", flags, false, enumToSignedInt(opcode)); + std::string opname = "create"; + + switch (opcode) { + case OpCodes::CREATECONTAINER: + opname = "createcontainer"; + break; + case OpCodes::CREATETTL: + opname = "createttl"; + break; + default: + break; + } + + expectSetDynamicMetadata( + {{"opname", opname}, {"path", "/foo"}, {"create_type", createFlagsToString(flags)}}, + {{"bytes", "35"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + + switch (opcode) { + case OpCodes::CREATE: + EXPECT_EQ(1UL, config_->stats().create_rq_.value()); + break; + case OpCodes::CREATECONTAINER: + EXPECT_EQ(1UL, config_->stats().createcontainer_rq_.value()); + break; + case OpCodes::CREATETTL: + EXPECT_EQ(1UL, config_->stats().createttl_rq_.value()); + break; + default: + break; + } + + EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); + } + + ZooKeeperFilterConfigSharedPtr config_; + std::unique_ptr filter_; + Stats::IsolatedStoreImpl scope_; + std::string stat_prefix_{"test.zookeeper"}; + NiceMock filter_callbacks_; + NiceMock stream_info_; +}; + +TEST_F(ZooKeeperFilterTest, Connect) { + initialize(); + + Buffer::OwnedImpl data = encodeConnect(); + + expectSetDynamicMetadata({{"opname", "connect"}}, {{"bytes", "32"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().connect_rq_.value()); + EXPECT_EQ(32UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, ConnectReadonly) { + initialize(); + + Buffer::OwnedImpl data = encodeConnect(true); + + expectSetDynamicMetadata({{"opname", "connect_readonly"}}, {{"bytes", "33"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(0UL, config_->stats().connect_rq_.value()); + EXPECT_EQ(1UL, config_->stats().connect_readonly_rq_.value()); + EXPECT_EQ(33UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, Fallback) { + initialize(); + + Buffer::OwnedImpl data = encodeBadMessage(); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(0UL, config_->stats().connect_rq_.value()); + EXPECT_EQ(0UL, config_->stats().connect_readonly_rq_.value()); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, PacketTooBig) { + initialize(); + + Buffer::OwnedImpl data = encodeTooBigMessage(); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, PacketBiggerThanLength) { + initialize(); + + Buffer::OwnedImpl data = encodeBiggerThanLengthMessage(); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, UnknownOpcode) { + initialize(); + + Buffer::OwnedImpl data = encodeUnknownOpcode(); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, BufferSmallerThanStringLength) { + initialize(); + + Buffer::OwnedImpl data = encodePathLongerThanBuffer("/foo", enumToSignedInt(OpCodes::SYNC)); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, PingRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePing(); + + expectSetDynamicMetadata({{"opname", "ping"}}, {{"bytes", "12"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().ping_rq_.value()); + EXPECT_EQ(12UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, AuthRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeAuth("digest"); + + expectSetDynamicMetadata({{"opname", "auth"}}, {{"bytes", "36"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(scope_.counter("test.zookeeper.auth.digest_rq").value(), 1); + EXPECT_EQ(36UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetDataRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathWatch("/foo", true); + + expectSetDynamicMetadata({{"opname", "getdata"}, {"path", "/foo"}, {"watch", "true"}}, + {{"bytes", "21"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(21UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(1UL, config_->stats().getdata_rq_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetDataRequestEmptyPath) { + initialize(); + + // It's valid to see an empty string as the path, which gets treated as / + // by the server. + Buffer::OwnedImpl data = encodePathWatch("", true); + + expectSetDynamicMetadata({{"opname", "getdata"}, {"path", ""}, {"watch", "true"}}, + {{"bytes", "17"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(17UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(1UL, config_->stats().getdata_rq_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, CreateRequestPersistent) { testCreate(CreateFlags::PERSISTENT); } + +TEST_F(ZooKeeperFilterTest, CreateRequestPersistentSequential) { + testCreate(CreateFlags::PERSISTENT_SEQUENTIAL); +} + +TEST_F(ZooKeeperFilterTest, CreateRequestEphemeral) { testCreate(CreateFlags::EPHEMERAL); } + +TEST_F(ZooKeeperFilterTest, CreateRequestEphemeralSequential) { + testCreate(CreateFlags::EPHEMERAL_SEQUENTIAL); +} + +TEST_F(ZooKeeperFilterTest, CreateRequestContainer) { + testCreate(CreateFlags::CONTAINER, OpCodes::CREATECONTAINER); +} + +TEST_F(ZooKeeperFilterTest, CreateRequestTTL) { + testCreate(CreateFlags::PERSISTENT_WITH_TTL, OpCodes::CREATETTL); +} + +TEST_F(ZooKeeperFilterTest, CreateRequestTTLSequential) { + testCreate(CreateFlags::PERSISTENT_SEQUENTIAL_WITH_TTL); +} + +TEST_F(ZooKeeperFilterTest, CreateRequest2) { + initialize(); + + Buffer::OwnedImpl data = encodeCreateRequest("/foo", "bar", CreateFlags::PERSISTENT, false, + enumToSignedInt(OpCodes::CREATE2)); + + expectSetDynamicMetadata({{"opname", "create2"}, {"path", "/foo"}, {"create_type", "persistent"}}, + {{"bytes", "35"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().create2_rq_.value()); + EXPECT_EQ(35UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, SetRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeSetRequest("/foo", "bar", -1); + + expectSetDynamicMetadata({{"opname", "setdata"}, {"path", "/foo"}}, {{"bytes", "31"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().setdata_rq_.value()); + EXPECT_EQ(31UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetChildrenRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathWatch("/foo", false, enumToSignedInt(OpCodes::GETCHILDREN)); + + expectSetDynamicMetadata({{"opname", "getchildren"}, {"path", "/foo"}, {"watch", "false"}}, + {{"bytes", "21"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().getchildren_rq_.value()); + EXPECT_EQ(21UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetChildrenRequest2) { + initialize(); + + Buffer::OwnedImpl data = encodePathWatch("/foo", false, enumToSignedInt(OpCodes::GETCHILDREN2)); + + expectSetDynamicMetadata({{"opname", "getchildren2"}, {"path", "/foo"}, {"watch", "false"}}, + {{"bytes", "21"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().getchildren2_rq_.value()); + EXPECT_EQ(21UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, DeleteRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeDeleteRequest("/foo", -1); + + expectSetDynamicMetadata({{"opname", "remove"}, {"path", "/foo"}, {"version", "-1"}}, + {{"bytes", "24"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().remove_rq_.value()); + EXPECT_EQ(24UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, ExistsRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathWatch("/foo", false, enumToSignedInt(OpCodes::EXISTS)); + + expectSetDynamicMetadata({{"opname", "exists"}, {"path", "/foo"}, {"watch", "false"}}, + {{"bytes", "21"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().exists_rq_.value()); + EXPECT_EQ(21UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetAclRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePath("/foo", enumToSignedInt(OpCodes::GETACL)); + + expectSetDynamicMetadata({{"opname", "getacl"}, {"path", "/foo"}}, {{"bytes", "20"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().getacl_rq_.value()); + EXPECT_EQ(20UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, SetAclRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeSetAclRequest("/foo", "digest", "passwd", -1); + + expectSetDynamicMetadata({{"opname", "setacl"}, {"path", "/foo"}, {"version", "-1"}}, + {{"bytes", "52"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().setacl_rq_.value()); + EXPECT_EQ(52UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, SyncRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePath("/foo", enumToSignedInt(OpCodes::SYNC)); + + expectSetDynamicMetadata({{"opname", "sync"}, {"path", "/foo"}}, {{"bytes", "20"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().sync_rq_.value()); + EXPECT_EQ(20UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetEphemeralsRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePath("/foo", enumToSignedInt(OpCodes::GETEPHEMERALS)); + + expectSetDynamicMetadata({{"opname", "getephemerals"}, {"path", "/foo"}}, {{"bytes", "20"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().getephemerals_rq_.value()); + EXPECT_EQ(20UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, GetAllChildrenNumberRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePath("/foo", enumToSignedInt(OpCodes::GETALLCHILDRENNUMBER)); + + expectSetDynamicMetadata({{"opname", "getallchildrennumber"}, {"path", "/foo"}}, + {{"bytes", "20"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().getallchildrennumber_rq_.value()); + EXPECT_EQ(20UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, CheckRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathVersion("/foo", 100, enumToSignedInt(OpCodes::CHECK)); + + expectSetDynamicMetadata({{"bytes", "24"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().check_rq_.value()); + EXPECT_EQ(24UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, MultiRequest) { + initialize(); + + Buffer::OwnedImpl create1 = encodeCreateRequest("/foo", "1", CreateFlags::PERSISTENT, true); + Buffer::OwnedImpl create2 = encodeCreateRequest("/bar", "1", CreateFlags::PERSISTENT, true); + Buffer::OwnedImpl check1 = encodePathVersion("/foo", 100, enumToSignedInt(OpCodes::CHECK), true); + Buffer::OwnedImpl set1 = encodeSetRequest("/bar", "2", -1, true); + + std::vector> ops; + ops.push_back(std::make_pair(enumToSignedInt(OpCodes::CREATE), std::move(create1))); + ops.push_back(std::make_pair(enumToSignedInt(OpCodes::CREATE), std::move(create2))); + ops.push_back(std::make_pair(enumToSignedInt(OpCodes::CHECK), std::move(check1))); + ops.push_back(std::make_pair(enumToSignedInt(OpCodes::SETDATA), std::move(set1))); + + Buffer::OwnedImpl data = encodeMultiRequest(ops); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().multi_rq_.value()); + EXPECT_EQ(128UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(2UL, config_->stats().create_rq_.value()); + EXPECT_EQ(1UL, config_->stats().setdata_rq_.value()); + EXPECT_EQ(1UL, config_->stats().check_rq_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, ReconfigRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeReconfigRequest("s1", "s2", "s3", 1000); + + expectSetDynamicMetadata({{"opname", "reconfig"}}, {{"bytes", "38"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().reconfig_rq_.value()); + EXPECT_EQ(38UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, SetWatchesRequestControlXid) { + initialize(); + + const std::vector dataw = {"/foo", "/bar"}; + const std::vector existw = {"/foo1", "/bar1"}; + const std::vector childw = {"/foo2", "/bar2"}; + + Buffer::OwnedImpl data = + encodeSetWatchesRequest(dataw, existw, childw, enumToSignedInt(XidCodes::SET_WATCHES_XID)); + + expectSetDynamicMetadata({{"opname", "setwatches"}}, {{"bytes", "76"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().setwatches_rq_.value()); + EXPECT_EQ(76UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, SetWatchesRequest) { + initialize(); + + const std::vector dataw = {"/foo", "/bar"}; + const std::vector existw = {"/foo1", "/bar1"}; + const std::vector childw = {"/foo2", "/bar2"}; + + Buffer::OwnedImpl data = encodeSetWatchesRequest(dataw, existw, childw); + + expectSetDynamicMetadata({{"opname", "setwatches"}}, {{"bytes", "76"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().setwatches_rq_.value()); + EXPECT_EQ(76UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, CheckWatchesRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathVersion("/foo", enumToSignedInt(WatcherType::CHILDREN), + enumToSignedInt(OpCodes::CHECKWATCHES)); + + expectSetDynamicMetadata({{"opname", "checkwatches"}, {"path", "/foo"}}, {{"bytes", "24"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().checkwatches_rq_.value()); + EXPECT_EQ(24UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, RemoveWatchesRequest) { + initialize(); + + Buffer::OwnedImpl data = encodePathVersion("/foo", enumToSignedInt(WatcherType::DATA), + enumToSignedInt(OpCodes::REMOVEWATCHES)); + + expectSetDynamicMetadata({{"opname", "removewatches"}, {"path", "/foo"}}, {{"bytes", "24"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().removewatches_rq_.value()); + EXPECT_EQ(24UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +TEST_F(ZooKeeperFilterTest, CloseRequest) { + initialize(); + + Buffer::OwnedImpl data = encodeCloseRequest(); + + expectSetDynamicMetadata({{"opname", "close"}}, {{"bytes", "12"}}); + + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false)); + EXPECT_EQ(1UL, config_->stats().close_rq_.value()); + EXPECT_EQ(12UL, config_->stats().request_bytes_.value()); + EXPECT_EQ(0UL, config_->stats().decoder_error_.value()); +} + +} // namespace ZooKeeperProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index 92bdf69c29f2..b94e12951480 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -275,6 +275,7 @@ abcd absl accessor accessors +acls addr agg alloc @@ -762,9 +763,11 @@ xDS xeon xform xhtml +xid xxhash xxs xyz zag zig zlib +zxid