Skip to content

Commit

Permalink
Upstream filters: add filter chains to upstream connections (envoypro…
Browse files Browse the repository at this point in the history
…xy#173)

(rebased on v1.9.0)

This is an initial pull request for review, it is not yet or ready to merge.

It is working for https://github.com/alanconway/envoy-amqp but has (at least)
the following issues:

1. The upstream uses a dummy NOT_IMPLEMENTED FactoryContext. FactoryContext has
   listener-specific methods, I'm not sure if/how they can be implemented by a
   Cluster.

2. The "Filter" configuration is defined in listener.proto. I made cds.proto and
   upstream_impl.cc depend on listener.proto and added server:configuration_lib
   to upstream_includes.  Probably the Filter definition and related bits should
   be moved to core.

3. Need automated unit and integration tests. The code works with the AMQP filters
   but needs independent tests in the envoy repo.

There are TODO(aconway) comments at the code in question.

I would appreciate help and/or direction on how to resolve these and any other
issues that come up during review.

Signed-off-by: Alan Conway <[email protected]>
  • Loading branch information
alanconway committed Jan 30, 2019
1 parent 37bfd8a commit b1bbec2
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 1 deletion.
1 change: 1 addition & 0 deletions api/envoy/api/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ api_proto_library_internal(
"//envoy/api/v2/core:health_check",
"//envoy/api/v2/core:protocol",
"//envoy/api/v2/endpoint",
"//envoy/api/v2/listener", # TODO(alanconway): for Filter definition
"//envoy/type:percent",
],
)
Expand Down
6 changes: 6 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "envoy/api/v2/core/config_source.proto";
import "envoy/api/v2/discovery.proto";
import "envoy/api/v2/core/health_check.proto";
import "envoy/api/v2/core/protocol.proto";
import "envoy/api/v2/listener/listener.proto"; // TODO(alanconway): for Filter definition
import "envoy/api/v2/cluster/circuit_breaker.proto";
import "envoy/api/v2/cluster/outlier_detection.proto";
import "envoy/api/v2/eds.proto";
Expand Down Expand Up @@ -546,6 +547,11 @@ message Cluster {
// If this flag is not set to true, Envoy will wait until the hosts fail active health
// checking before removing it from the cluster.
bool drain_connections_on_host_removal = 32;

// An optional list of network filters that make up the filter chain for
// outgoing connections made by the cluster. Order matters as the filters are
// processed sequentially as connection events happen.
repeated listener.Filter filters = 38 [(gogoproto.nullable) = false];
}

// An extensible structure containing the address Envoy should bind to when
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ class ClusterInfo {
*/
virtual bool drainConnectionsOnHostRemoval() const PURE;

/**
* Create network filters on a new upstream connection.
*/
virtual void createNetworkFilters(Network::Connection& connection) const PURE;

protected:
/**
* Invoked by extensionProtocolOptionsTyped.
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ envoy_cc_library(
"//include/envoy/local_info:local_info_interface",
"//include/envoy/network:dns_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/server:filter_config_interface",
"//include/envoy/server:transport_socket_config_interface",
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand All @@ -401,6 +402,7 @@ envoy_cc_library(
"//source/common/config:well_known_names",
"//source/common/stats:isolated_store_lib",
"//source/common/stats:stats_lib",
"//source/server:configuration_lib", # TODO(alanconway): bad dependency
"//source/server:init_manager_lib",
"//source/server:transport_socket_config_lib",
"@envoy_api//envoy/api/v2/core:base_cc",
Expand Down
54 changes: 54 additions & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "common/upstream/logical_dns_cluster.h"
#include "common/upstream/original_dst_cluster.h"

#include "server/configuration_impl.h" // TODO(alanconway): bad dependency
#include "server/transport_socket_config_impl.h"

#include "extensions/transport_sockets/well_known_names.h"
Expand Down Expand Up @@ -183,6 +184,7 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu
cluster.transportSocketFactory().createTransportSocket(transport_socket_options),
connection_options);
connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
cluster.createNetworkFilters(*connection);
return connection;
}

Expand Down Expand Up @@ -357,6 +359,31 @@ ClusterLoadReportStats ClusterInfoImpl::generateLoadReportStats(Stats::Scope& sc
return {ALL_CLUSTER_LOAD_REPORT_STATS(POOL_COUNTER(scope))};
}

// TODO(alanconway): dummy factory context, how do we implement this? Some of
// the methods are listener specific, should they throw, return null values, return
// values derived from the Cluster or be moved/removed?
class ClusterInfoImpl::FactoryContextImpl : public Server::Configuration::FactoryContext {
virtual AccessLog::AccessLogManager& accessLogManager() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Upstream::ClusterManager& clusterManager() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Event::Dispatcher& dispatcher() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual const Network::DrainDecision& drainDecision() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual bool healthCheckFailed() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Tracing::HttpTracer& httpTracer() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Init::Manager& initManager() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual const LocalInfo::LocalInfo& localInfo() const PURE;
virtual Envoy::Runtime::RandomGenerator& random() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Envoy::Runtime::Loader& runtime() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Stats::Scope& scope() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Singleton::Manager& singletonManager() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual ThreadLocal::SlotAllocator& threadLocal() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Server::Admin& admin() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Stats::Scope& listenerScope() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual const envoy::api::v2::core::Metadata& listenerMetadata() const PURE;
virtual TimeSource& timeSource() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Server::OverloadManager& overloadManager() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual Http::Context& httpContext() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
};

ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config,
Runtime::Loader& runtime,
Expand Down Expand Up @@ -431,6 +458,29 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
// https://github.com/lyft/protoc-gen-validate/issues/97 resolved. This just provides early
// validation of sanity of fields that we should catch at config ingestion.
DurationUtil::durationToMilliseconds(common_lb_config_.update_merge_window());

// Create upstream filter factories
auto filters = config.filters();
for (ssize_t i = 0; i < filters.size(); i++) {
const auto& proto_config = filters[i];
const ProtobufTypes::String name = proto_config.name();
const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, "filter #{} name: {} config: {}", i, name, filter_config->asJsonString());
// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Server::Configuration::NamedNetworkFilterConfigFactory>(
name);
Network::FilterFactoryCb callback;
if (filter_config->getBoolean("deprecated_v1", false)) {
callback =
factory.createFilterFactory(*filter_config->getObject("value", true), *factory_context_);
} else {
auto message = Config::Utility::translateToFactoryConfig(proto_config, factory);
callback = factory.createFilterFactoryFromProto(*message, *factory_context_);
}
filter_factories_.push_back(callback);
}
}

ProtocolOptionsConfigConstSharedPtr
Expand Down Expand Up @@ -476,6 +526,10 @@ Network::TransportSocketFactoryPtr createTransportSocketFactory(
return config_factory.createTransportSocketFactory(*message, factory_context);
}

void ClusterInfoImpl::createNetworkFilters(Network::Connection& connection) const {
Server::Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories_);
}

ClusterSharedPtr ClusterImplBase::create(
const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Stats::Store& stats,
ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver,
Expand Down
10 changes: 9 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "envoy/event/timer.h"
#include "envoy/local_info/local_info.h"
#include "envoy/network/dns.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/secret/secret_manager.h"
#include "envoy/server/filter_config.h"
#include "envoy/server/transport_socket_config.h"
#include "envoy/ssl/context_manager.h"
#include "envoy/stats/scope.h"
Expand Down Expand Up @@ -386,7 +388,7 @@ class PrioritySetImpl : public PrioritySet {
/**
* Implementation of ClusterInfo that reads from JSON.
*/
class ClusterInfoImpl : public ClusterInfo {
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
public:
ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config, Runtime::Loader& runtime,
Expand Down Expand Up @@ -451,6 +453,8 @@ class ClusterInfoImpl : public ClusterInfo {

bool drainConnectionsOnHostRemoval() const override { return drain_connections_on_host_removal_; }

void createNetworkFilters(Network::Connection&) const;

private:
struct ResourceManagers {
ResourceManagers(const envoy::api::v2::Cluster& config, Runtime::Loader& runtime,
Expand Down Expand Up @@ -493,6 +497,10 @@ class ClusterInfoImpl : public ClusterInfo {
const envoy::api::v2::Cluster::CommonLbConfig common_lb_config_;
const Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options_;
const bool drain_connections_on_host_removal_;

class FactoryContextImpl;
std::unique_ptr<Server::Configuration::FactoryContext> factory_context_;
std::vector<Network::FilterFactoryCb> filter_factories_;
};

/**
Expand Down

0 comments on commit b1bbec2

Please sign in to comment.