Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filters: install network filters on upstream connections #173 #6173

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/envoy/api/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ api_proto_library_internal(
":eds",
"//envoy/api/v2/auth:cert",
"//envoy/api/v2/cluster:circuit_breaker",
"//envoy/api/v2/cluster:filter",
"//envoy/api/v2/cluster:outlier_detection",
"//envoy/api/v2/core:address",
"//envoy/api/v2/core:base",
Expand Down
6 changes: 6 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "envoy/api/v2/discovery.proto";
import "envoy/api/v2/core/health_check.proto";
import "envoy/api/v2/core/protocol.proto";
import "envoy/api/v2/cluster/circuit_breaker.proto";
import "envoy/api/v2/cluster/filter.proto";
import "envoy/api/v2/cluster/outlier_detection.proto";
import "envoy/api/v2/eds.proto";
import "envoy/type/percent.proto";
Expand Down Expand Up @@ -574,6 +575,11 @@ message Cluster {
// If this flag is not set to true, Envoy will wait until the hosts fail active health
// checking before removing it from the cluster.
bool drain_connections_on_host_removal = 32;

// An optional list of network filters that make up the filter chain for
// outgoing connections made by the cluster. Order matters as the filters are
// processed sequentially as connection events happen.
repeated cluster.Filter filters = 38 [(gogoproto.nullable) = false];
}

// An extensible structure containing the address Envoy should bind to when
Expand Down
23 changes: 23 additions & 0 deletions api/envoy/api/v2/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,26 @@ api_go_proto_library(
name = "outlier_detection",
proto = ":outlier_detection",
)

api_proto_library_internal(
name = "filter",
srcs = ["filter.proto"],
visibility = [
"//envoy/api/v2:__pkg__",
],
deps = [
"//envoy/api/v2/auth:cert",
"//envoy/api/v2/core:address",
"//envoy/api/v2/core:base",
],
)

api_go_proto_library(
name = "filter",
proto = ":filter",
deps = [
"//envoy/api/v2/auth:cert_go_proto",
"//envoy/api/v2/core:address_go_proto",
"//envoy/api/v2/core:base_go_proto",
],
)
34 changes: 34 additions & 0 deletions api/envoy/api/v2/cluster/filter.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
syntax = "proto3";

package envoy.api.v2.cluster;

option java_outer_classname = "FilterProto";
option java_multiple_files = true;
option java_package = "io.envoyproxy.envoy.api.v2.cluster";
option go_package = "cluster";
option csharp_namespace = "Envoy.Api.V2.ClusterNS";

import "envoy/api/v2/core/address.proto";
import "envoy/api/v2/auth/cert.proto";
import "envoy/api/v2/core/base.proto";

import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";

import "validate/validate.proto";
import "gogoproto/gogo.proto";

option (gogoproto.equal_all) = true;

// [#protodoc-title: Upstream filters]
message Filter {
// The name of the upstream filter to instantiate. The name must match a supported filter.
string name = 1 [(validate.rules).string.min_bytes = 1];

// Filter specific configuration which depends on the filter being
// instantiated. See the supported filters for further documentation.
oneof config_type {
google.protobuf.Struct config = 2 [deprecated = true];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this new feature we can have only typed_config

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but I need more guidance. This pattern is used by most every .proto file under api/v2. Is there an example I can follow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at #6118 which is adding a new extension and includes only typed config.

google.protobuf.Any typed_config = 3;
}
}
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,11 @@ class ClusterInfo {
*/
virtual absl::optional<std::string> eds_service_name() const PURE;

/**
* Create network filters on a new upstream connection.
*/
virtual void createNetworkFilterChain(Network::Connection& connection) const PURE;

protected:
/**
* Invoked by extensionProtocolOptionsTyped.
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ envoy_cc_library(
"//include/envoy/local_info:local_info_interface",
"//include/envoy/network:dns_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/server:filter_config_interface",
"//include/envoy/server:transport_socket_config_interface",
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/health_discovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ProdClusterInfoFactory::createClusterInfo(const CreateClusterInfoParams& params)

return std::make_unique<ClusterInfoImpl>(params.cluster_, params.bind_config_, params.runtime_,
std::move(socket_factory), std::move(scope),
params.added_via_api_);
params.added_via_api_, factory_context);
}

void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager,
Expand Down
98 changes: 91 additions & 7 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu
cluster.transportSocketFactory().createTransportSocket(transport_socket_options),
connection_options);
connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
cluster.createNetworkFilterChain(*connection);
return connection;
}

Expand Down Expand Up @@ -509,11 +510,63 @@ ClusterLoadReportStats ClusterInfoImpl::generateLoadReportStats(Stats::Scope& sc
return {ALL_CLUSTER_LOAD_REPORT_STATS(POOL_COUNTER(scope))};
}

ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config,
Runtime::Loader& runtime,
Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api)
// Implements the FactoryContext interface required by network filters.
class ClusterInfoImpl::FactoryContextImpl : public Server::Configuration::FactoryContext {
public:
// Create from a TransportSocketFactoryContext using parent stats_scope and runtime
// other contexts taken from TransportSocketFactoryContext.
FactoryContextImpl(Stats::Scope& stats_scope, Envoy::Runtime::Loader& runtime,
Server::Configuration::TransportSocketFactoryContext& c)
: admin_(c.admin()), stats_scope_(stats_scope), cluster_manager_(c.clusterManager()),
local_info_(c.localInfo()), dispatcher_(c.dispatcher()), random_(c.random()),
runtime_(runtime), singleton_manager_(c.singletonManager()), tls_(c.threadLocal()),
init_manager_(c.initManager()), api_(c.api()) {}

// TODO(aconway) some contexts are not obviously available in the upstream
// code, and will throw NOT_IMPLEMENTED. These should be implemented by
// someone with a better understanding of the lifecycle and role of each context.
AccessLog::AccessLogManager& accessLogManager() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Upstream::ClusterManager& clusterManager() override { return cluster_manager_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
const Network::DrainDecision& drainDecision() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
bool healthCheckFailed() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Tracing::HttpTracer& httpTracer() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Init::Manager& initManager() override { return *init_manager_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
Envoy::Runtime::RandomGenerator& random() override { return random_; }
virtual Envoy::Runtime::Loader& runtime() override { return runtime_; }
Stats::Scope& scope() override { return stats_scope_; }
Singleton::Manager& singletonManager() override { return singleton_manager_; }
ThreadLocal::SlotAllocator& threadLocal() override { return tls_; }
Server::Admin& admin() override { return admin_; }
Stats::Scope& listenerScope() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const envoy::api::v2::core::Metadata& listenerMetadata() const override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
TimeSource& timeSource() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Server::OverloadManager& overloadManager() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Http::Context& httpContext() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
Api::Api& api() override { return api_; }

private:
Server::Admin& admin_;
Stats::Scope& stats_scope_;
Upstream::ClusterManager& cluster_manager_;
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Envoy::Runtime::RandomGenerator& random_;
Envoy::Runtime::Loader& runtime_;
Singleton::Manager& singleton_manager_;
ThreadLocal::SlotAllocator& tls_;
Init::Manager* init_manager_{};
Api::Api& api_;
};

ClusterInfoImpl::ClusterInfoImpl(
const envoy::api::v2::Cluster& config, const envoy::api::v2::core::BindConfig& bind_config,
Runtime::Loader& runtime, Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api,
Server::Configuration::TransportSocketFactoryContext& factory_context)
: runtime_(runtime), name_(config.name()), type_(config.type()),
max_requests_per_connection_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_requests_per_connection, 0)),
Expand All @@ -537,7 +590,9 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
metadata_(config.metadata()), typed_metadata_(config.metadata()),
common_lb_config_(config.common_lb_config()),
cluster_socket_options_(parseClusterSocketOptions(config, bind_config)),
drain_connections_on_host_removal_(config.drain_connections_on_host_removal()) {
drain_connections_on_host_removal_(config.drain_connections_on_host_removal()),
factory_context_(
std::make_unique<FactoryContextImpl>(*stats_scope_, runtime, factory_context)) {
switch (config.lb_policy()) {
case envoy::api::v2::Cluster::ROUND_ROBIN:
lb_type_ = LoadBalancerType::RoundRobin;
Expand Down Expand Up @@ -595,6 +650,29 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config,
// https://github.com/lyft/protoc-gen-validate/issues/97 resolved. This just provides early
// validation of sanity of fields that we should catch at config ingestion.
DurationUtil::durationToMilliseconds(common_lb_config_.update_merge_window());

// Create upstream filter factories
auto filters = config.filters();
for (ssize_t i = 0; i < filters.size(); i++) {
const auto& proto_config = filters[i];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to run a sanity check of disallowed filters? Like tcp proxy/redis?

const ProtobufTypes::String name = proto_config.name();
const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, "filter #{} name: {} config: {}", i, name, filter_config->asJsonString());
// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Server::Configuration::NamedNetworkFilterConfigFactory>(
name);
Network::FilterFactoryCb callback;
if (filter_config->getBoolean("deprecated_v1", false)) {
callback =
factory.createFilterFactory(*filter_config->getObject("value", true), *factory_context_);
} else {
auto message = Config::Utility::translateToFactoryConfig(proto_config, factory);
callback = factory.createFilterFactoryFromProto(*message, *factory_context_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need this. deprecated_v1 was only for listener filters (mostly tcp) that needed old functionality. In your case, its all new..

}
filter_factories_.push_back(callback);
}
}

ProtocolOptionsConfigConstSharedPtr
Expand Down Expand Up @@ -640,6 +718,12 @@ Network::TransportSocketFactoryPtr createTransportSocketFactory(
return config_factory.createTransportSocketFactory(*message, factory_context);
}

void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
for (const auto& factory : filter_factories_) {
factory(connection);
}
}

ClusterSharedPtr ClusterImplBase::create(
const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Stats::Store& stats,
ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver,
Expand Down Expand Up @@ -736,7 +820,7 @@ ClusterImplBase::ClusterImplBase(
auto socket_factory = createTransportSocketFactory(cluster, factory_context);
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
runtime, std::move(socket_factory),
std::move(stats_scope), added_via_api);
std::move(stats_scope), added_via_api, factory_context);
// Create the default (empty) priority set before registering callbacks to
// avoid getting an update the first time it is accessed.
priority_set_.getOrCreateHostSet(0);
Expand Down
13 changes: 11 additions & 2 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "envoy/event/timer.h"
#include "envoy/local_info/local_info.h"
#include "envoy/network/dns.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/secret/secret_manager.h"
#include "envoy/server/filter_config.h"
#include "envoy/server/transport_socket_config.h"
#include "envoy/ssl/context_manager.h"
#include "envoy/stats/scope.h"
Expand Down Expand Up @@ -472,12 +474,13 @@ class PrioritySetImpl : public PrioritySet {
/**
* Implementation of ClusterInfo that reads from JSON.
*/
class ClusterInfoImpl : public ClusterInfo {
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
public:
ClusterInfoImpl(const envoy::api::v2::Cluster& config,
const envoy::api::v2::core::BindConfig& bind_config, Runtime::Loader& runtime,
Network::TransportSocketFactoryPtr&& socket_factory,
Stats::ScopePtr&& stats_scope, bool added_via_api);
Stats::ScopePtr&& stats_scope, bool added_via_api,
Server::Configuration::TransportSocketFactoryContext&);

static ClusterStats generateStats(Stats::Scope& scope);
static ClusterLoadReportStats generateLoadReportStats(Stats::Scope& scope);
Expand Down Expand Up @@ -539,6 +542,8 @@ class ClusterInfoImpl : public ClusterInfo {

absl::optional<std::string> eds_service_name() const override { return eds_service_name_; }

void createNetworkFilterChain(Network::Connection&) const;

private:
struct ResourceManagers {
ResourceManagers(const envoy::api::v2::Cluster& config, Runtime::Loader& runtime,
Expand Down Expand Up @@ -582,6 +587,10 @@ class ClusterInfoImpl : public ClusterInfo {
const Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options_;
const bool drain_connections_on_host_removal_;
absl::optional<std::string> eds_service_name_;

class FactoryContextImpl;
std::unique_ptr<Server::Configuration::FactoryContext> factory_context_;
std::vector<Network::FilterFactoryCb> filter_factories_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does filter get called onData?

};

/**
Expand Down
2 changes: 2 additions & 0 deletions test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ envoy_cc_test(
"//source/common/network:utility_lib",
"//source/common/stats:stats_lib",
"//source/common/upstream:cluster_manager_lib",
"//source/extensions/filters/network/echo",
"//source/extensions/filters/network/echo:config",
"//source/extensions/transport_sockets/raw_buffer:config",
"//source/extensions/transport_sockets/tls:context_lib",
"//test/mocks/access_log:access_log_mocks",
Expand Down
29 changes: 29 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2418,6 +2418,35 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) {
EXPECT_EQ(0, factory_.stats_.gauge("cluster_manager.warming_clusters").value());
}

// Verify that configured upstream filters are added to client connections.
TEST_F(ClusterManagerImplTest, AddUpstreamFilters) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
hosts:
- socket_address:
address: "127.0.0.1"
port_value: 11001
filters:
- name: envoy.echo
)EOF";

create(parseBootstrapFromV2Yaml(yaml));
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, addReadFilter(_)).Times(1); // echo is a read filter.
EXPECT_CALL(*connection, addWriteFilter(_)).Times(0);
EXPECT_CALL(*connection, addFilter(_)).Times(0);
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_, _, _, _))
.WillOnce(Return(connection));
auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1", nullptr, nullptr);
EXPECT_EQ(connection, conn_data.connection_.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a test here to check if filter gets invoked when data passes through?

factory_.tls_.shutdownThread();
}

class ClusterManagerInitHelperTest : public testing::Test {
public:
MOCK_METHOD1(onClusterInit, void(Cluster& cluster));
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class MockClusterInfo : public ClusterInfo {
MOCK_CONST_METHOD0(clusterSocketOptions, const Network::ConnectionSocket::OptionsSharedPtr&());
MOCK_CONST_METHOD0(drainConnectionsOnHostRemoval, bool());
MOCK_CONST_METHOD0(eds_service_name, absl::optional<std::string>());
MOCK_CONST_METHOD1(createNetworkFilterChain, void(Network::Connection&));

std::string name_{"fake_cluster"};
absl::optional<std::string> eds_service_name_;
Expand Down