Skip to content

Commit

Permalink
redis: upstream client draining and temp host connection limit and cl… (
Browse files Browse the repository at this point in the history
#7104)

Signed-off-by: Mitch Sukalski <[email protected]>
  • Loading branch information
msukalski authored and mattklein123 committed Jul 13, 2019
1 parent 12ef480 commit 80a4ed7
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ option go_package = "v2";
import "envoy/api/v2/core/base.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";
import "gogoproto/gogo.proto";
Expand Down Expand Up @@ -82,6 +83,13 @@ message RedisProxy {
// If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter
// defaults to 3ms.
google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true];

// `max_upstream_unknown_connections` controls how many upstream connections to unknown hosts
// can be created at any given time by any given worker thread (see `enable_redirection` for
// more details). If the host is unknown and a connection cannot be created due to enforcing
// this limit, then redirection will fail and the original redirection error will be passed
// downstream unchanged. This limit defaults to 100.
google.protobuf.UInt32Value max_upstream_unknown_connections = 6;
}

// Network settings for the connection pool to the upstream clusters.
Expand Down
9 changes: 9 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ following information:
For topology configuration details, see the Redis Cluster
:ref:`v2 API reference <envoy_api_msg_config.cluster.redis.RedisClusterConfig>`.

Every Redis cluster has its own extra statistics tree rooted at *cluster.<name>.redis_cluster.* with the following statistics:

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit
upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed

Supported commands
------------------

Expand Down
1 change: 1 addition & 0 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
bool enableRedirection() const override { return false; }
uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
uint32_t maxUpstreamUnknownConnections() const override { return 0; }

// Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ envoy_cc_library(
"//source/common/network:filter_lib",
"//source/common/protobuf:utility_lib",
"//source/common/upstream:load_balancer_lib",
"//source/common/upstream:upstream_lib",
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
],
)
Expand Down
23 changes: 23 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class Client : public Event::DeferredDeletable {
*/
virtual void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) PURE;

/**
* Called to determine if the client has pending requests.
* @return bool true if the client is processing requests or false if it is currently idle.
*/
virtual bool active() PURE;

/**
* Closes the underlying network connection.
*/
Expand Down Expand Up @@ -134,6 +140,23 @@ class Config {
* @return timeout for batching commands for a single upstream host.
*/
virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE;

/**
* @return the maximum number of upstream connections to unknown hosts when enableRedirection() is
* true.
*
* This value acts as an upper bound on the number of servers in a cluster if only a subset
* of the cluster's servers are known via configuration (cluster size - number of servers in
* cluster known to cluster manager <= maxUpstreamUnknownConnections() for proper operation).
* Redirection errors are processed if enableRedirection() is true, and a new upstream connection
* to a previously unknown server will be made as a result of redirection if the number of unknown
* server connections is currently less than maxUpstreamUnknownConnections(). If a connection
* cannot be made, then the original redirection error will be passed though unchanged to the
* downstream client. If a cluster is using the Redis cluster protocol (RedisCluster), then the
* cluster logic will periodically discover all of the servers in the cluster; this should
* minimize the need for a large maxUpstreamUnknownConnections() value.
*/
virtual uint32_t maxUpstreamUnknownConnections() const PURE;
};

/**
Expand Down
15 changes: 7 additions & 8 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ ConfigImpl::ConfigImpl(
config.max_buffer_size_before_flush()), // This is a scalar, so default is zero.
buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(
config, buffer_flush_timeout,
3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
{}
3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
max_upstream_unknown_connections_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) {}

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
Expand Down Expand Up @@ -124,14 +125,12 @@ void ClientImpl::putOutlierEvent(Upstream::Outlier::Result result) {
void ClientImpl::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {

Upstream::reportUpstreamCxDestroy(host_, event);
if (!pending_requests_.empty()) {
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();
Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
if (event == Network::ConnectionEvent::RemoteClose) {
putOutlierEvent(Upstream::Outlier::Result::LOCAL_ORIGIN_CONNECT_FAILED);
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
}
if (event == Network::ConnectionEvent::LocalClose) {
host_->cluster().stats().upstream_cx_destroy_local_with_active_rq_.inc();
}
}

Expand Down
6 changes: 6 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/protobuf/utility.h"
#include "common/singleton/const_singleton.h"
#include "common/upstream/load_balancer_impl.h"
#include "common/upstream/upstream_impl.h"

#include "extensions/filters/network/common/redis/client.h"

Expand Down Expand Up @@ -45,13 +46,17 @@ class ConfigImpl : public Config {
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return buffer_flush_timeout_;
}
uint32_t maxUpstreamUnknownConnections() const override {
return max_upstream_unknown_connections_;
}

private:
const std::chrono::milliseconds op_timeout_;
const bool enable_hashtagging_;
const bool enable_redirection_;
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
const uint32_t max_upstream_unknown_connections_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
Expand All @@ -68,6 +73,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
}
void close() override;
PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override;
bool active() override { return !pending_requests_.empty(); }
void flushBufferAndResetTimer();

private:
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ envoy_cc_library(
deps = [
":config_interface",
":conn_pool_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/network/redis_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP

Upstreams upstreams;
for (auto& cluster : unique_clusters) {
Stats::ScopePtr stats_scope =
context.scope().createScope(fmt::format("cluster.{}.redis_cluster", cluster));
upstreams.emplace(cluster, std::make_shared<ConnPool::InstanceImpl>(
cluster, context.clusterManager(),
Common::Redis::Client::ClientFactoryImpl::instance_,
context.threadLocal(), proto_config.settings(), context.api(),
context.scope().symbolTable()));
std::move(stats_scope)));
}

auto router =
Expand Down
Loading

0 comments on commit 80a4ed7

Please sign in to comment.