From cacc9a29b7571ff707b833b158ea79521355c024 Mon Sep 17 00:00:00 2001 From: Rakesh Datta Date: Tue, 13 Sep 2022 23:00:43 +0000 Subject: [PATCH 1/2] Feature : Filter Chain Discovery Service --- api/BUILD | 1 + api/envoy/config/listener/v3/listener.proto | 11 +- api/envoy/service/filter_chain/BUILD | 15 ++ api/envoy/service/filter_chain/fcds.proto | 46 ++++++ api/versioning/BUILD | 1 + source/server/BUILD | 11 +- source/server/fcds_impl.cc | 169 ++++++++++++++++++++ source/server/fcds_impl.h | 104 ++++++++++++ source/server/filter_chain_manager_impl.cc | 112 ++++++++++--- source/server/filter_chain_manager_impl.h | 45 ++++-- source/server/listener_impl.cc | 33 +++- source/server/listener_impl.h | 5 + source/server/proto_descriptors.cc | 3 + 13 files changed, 515 insertions(+), 41 deletions(-) create mode 100644 api/envoy/service/filter_chain/BUILD create mode 100644 api/envoy/service/filter_chain/fcds.proto create mode 100644 source/server/fcds_impl.cc create mode 100644 source/server/fcds_impl.h diff --git a/api/BUILD b/api/BUILD index 52c4178f6c38..e13dcd973a43 100644 --- a/api/BUILD +++ b/api/BUILD @@ -286,6 +286,7 @@ proto_library( "//envoy/service/event_reporting/v3:pkg", "//envoy/service/ext_proc/v3:pkg", "//envoy/service/extension/v3:pkg", + "//envoy/service/filter_chain/v3:pkg", "//envoy/service/health/v3:pkg", "//envoy/service/listener/v3:pkg", "//envoy/service/load_stats/v3:pkg", diff --git a/api/envoy/config/listener/v3/listener.proto b/api/envoy/config/listener/v3/listener.proto index b29047913e05..84fc37a117b0 100644 --- a/api/envoy/config/listener/v3/listener.proto +++ b/api/envoy/config/listener/v3/listener.proto @@ -23,6 +23,7 @@ import "udpa/annotations/security.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; import "validate/validate.proto"; +import "envoy/config/core/v3/config_source.proto"; option java_package = "io.envoyproxy.envoy.config.listener.v3"; option java_outer_classname = "ListenerProto"; @@ -44,7 +45,7 @@ message ListenerCollection { repeated xds.core.v3.CollectionEntry entries = 1; } -// [#next-free-field: 34] +// [#next-free-field: 35] message Listener { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener"; @@ -138,6 +139,9 @@ message Listener { // :ref:`FAQ entry `. repeated FilterChain filter_chains = 3; + // FCDS: Filter Chain Discovery Service config block. filter_chains and fcds are mutually exclusive + Fcds fcds = 34; + // :ref:`Matcher API ` resolving the filter chain name from the // network properties. This matcher is used as a replacement for the filter chain match condition // :ref:`filter_chain_match @@ -358,3 +362,8 @@ message Listener { // :ref:`global_downstream_max_connections `. bool ignore_global_conn_limit = 31; } + +message Fcds{ + string fcds_name = 1 [(validate.rules).string.min_len = 1]; + envoy.config.core.v3.ConfigSource config_source = 2 [(validate.rules).message.required = true]; +} diff --git a/api/envoy/service/filter_chain/BUILD b/api/envoy/service/filter_chain/BUILD new file mode 100644 index 000000000000..d3be4fae57fa --- /dev/null +++ b/api/envoy/service/filter_chain/BUILD @@ -0,0 +1,15 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + has_services = True, + deps = [ + "//envoy/annotations:pkg", + "//envoy/api/v2:pkg", + "//envoy/service/discovery/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/service/filter_chain/fcds.proto b/api/envoy/service/filter_chain/fcds.proto new file mode 100644 index 000000000000..b844a0f63ef5 --- /dev/null +++ b/api/envoy/service/filter_chain/fcds.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package envoy.service.filter_chain.v3; + +import "envoy/service/discovery/v3/discovery.proto"; + +import "google/api/annotations.proto"; + +import "envoy/annotations/resource.proto"; +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; + +option java_package = "io.envoyproxy.envoy.service.filter_chain.v3"; +option java_outer_classname = "FcdsProto"; +option java_multiple_files = true; +option java_generic_services = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: FCDS] + +// The resource_names field in DiscoveryRequest specifies a route configuration. +// This allows an Envoy configuration with multiple HTTP listeners (and +// associated HTTP connection manager filters) to use different route +// configurations. Each listener will bind its HTTP connection manager filter to +// a route table via this identifier. +service FcdsDiscoveryService { + option (envoy.annotations.resource).type = "envoy.config.litener.v3.FilterChain"; + + rpc StreamFilterChains(stream discovery.v3.DiscoveryRequest) + returns (stream discovery.v3.DiscoveryResponse) { + } + + rpc DeltaFilterChains(stream discovery.v3.DeltaDiscoveryRequest) + returns (stream discovery.v3.DeltaDiscoveryResponse) { + } + + rpc FetchFilterChains(discovery.v3.DiscoveryRequest) returns (discovery.v3.DiscoveryResponse) { + option (google.api.http).post = "/v3/discovery:filterchains"; + option (google.api.http).body = "*"; + } +} + +// [#not-implemented-hide:] Not configuration. Workaround c++ protobuf issue with importing +// services: https://github.com/google/protobuf/issues/4221 and protoxform to upgrade the file. +message FcdsDummy { +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 69f18bb825ad..0d61ea750d59 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -236,6 +236,7 @@ proto_library( "//envoy/service/rate_limit_quota/v3:pkg", "//envoy/service/ratelimit/v3:pkg", "//envoy/service/route/v3:pkg", + "//envoy/service/filter_chain/v3:pkg", "//envoy/service/runtime/v3:pkg", "//envoy/service/secret/v3:pkg", "//envoy/service/status/v3:pkg", diff --git a/source/server/BUILD b/source/server/BUILD index a1d7ac686fa0..602633cab5b0 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -509,8 +509,8 @@ envoy_cc_library( envoy_cc_library( name = "filter_chain_manager_lib", - srcs = ["filter_chain_manager_impl.cc"], - hdrs = ["filter_chain_manager_impl.h"], + srcs = ["filter_chain_manager_impl.cc", "fcds_impl.cc"], + hdrs = ["filter_chain_manager_impl.h", "fcds_impl.h"], deps = [ ":filter_chain_factory_context_callback", "//envoy/config:typed_metadata_interface", @@ -528,7 +528,13 @@ envoy_cc_library( "//source/common/network/matching:data_impl_lib", "//source/common/network/matching:inputs_lib", "//source/server:configuration_lib", + "//source/common/config:api_version_lib", + "//source/common/common:callback_impl_lib", + "//source/common/config:subscription_base_interface", + "//source/common/protobuf:utility_lib", "@envoy_api//envoy/config/listener/v3:pkg_cc_proto", + "@envoy_api//envoy/service/filter_chain/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) @@ -580,6 +586,7 @@ envoy_cc_library( ":guarddog_lib", ":listener_hooks_lib", ":listener_manager_lib", + ":filter_chain_manager_lib", ":ssl_context_manager_lib", ":worker_lib", "//envoy/event:dispatcher_interface", diff --git a/source/server/fcds_impl.cc b/source/server/fcds_impl.cc new file mode 100644 index 000000000000..00e4c0aea6b2 --- /dev/null +++ b/source/server/fcds_impl.cc @@ -0,0 +1,169 @@ +#include +#include +#include +#include + +#include "envoy/admin/v3/config_dump.pb.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/common/assert.h" +#include "source/common/common/fmt.h" +#include "source/common/config/api_version.h" +#include "source/common/config/utility.h" +#include "source/common/http/header_map_impl.h" +#include "source/common/protobuf/utility.h" +#include "source/server/fcds_impl.h" +#include "source/server/filter_chain_manager_impl.h" + +namespace Envoy { +namespace Server { + +FcdsApi::FcdsApi( + const envoy::config::listener::v3::Fcds& fcds, + Configuration::FactoryContext& parent_context, + Init::Manager& init_manager, + FilterChainManagerImpl& filter_chain_manager, + FilterChainFactoryBuilder* fc_builder) + : Envoy::Config::SubscriptionBase( + parent_context.messageValidationContext().dynamicValidationVisitor(), "name"), + fcds_name_(fcds.fcds_name()), + parent_init_target_(fmt::format("FcdsConfigSubscription parent_init_target {}", fcds_name_), + [this]() { local_init_manager_.initialize(local_init_watcher_); }), + local_init_watcher_(fmt::format("Fcds local_init_watcher {}", fcds.fcds_name()), + [this]() { parent_init_target_.ready(); }), + local_init_target_( + fmt::format("FcdsConfigSubscription local_init_target {}", fcds_name_), + [this]() { fcds_subscription_->start({fcds_name_}); }), + local_init_manager_(fmt::format("FCDS local_init_manager {}", fcds_name_)), + init_manager_(init_manager), + cluster_manager_(parent_context.clusterManager()), + scope_(parent_context.listenerScope().createScope("")), + filter_chain_manager_(filter_chain_manager), + fc_builder_(fc_builder) +{ + + const auto resource_name = getResourceName(); + + fcds_subscription_ = + cluster_manager_.subscriptionFactory().subscriptionFromConfigSource( + fcds.config_source(), Grpc::Common::typeUrl(resource_name), *scope_, *this, + resource_decoder_, {}); + +} + + +FcdsApi::~FcdsApi() +{ + // If we get destroyed during initialization, make sure we signal that we "initialized". + local_init_target_.ready(); +} + +void FcdsApi::initialize() +{ + local_init_manager_.add(local_init_target_); + init_manager_.add(parent_init_target_); +} + +void FcdsApi::onConfigUpdate( + const std::vector& resources, + const std::string& version_info) { + + if (!validateUpdateSize(resources.size())) { + return; + } + + // Construct the list of filter chains to be removed + std::unordered_set fc_to_remove; + auto fc_name_to_resource_map = filter_chain_manager_.getFcdsResources(); + auto itr = fc_name_to_resource_map.begin(); + while (itr != fc_name_to_resource_map.end()) { + fc_to_remove.insert(itr->first); + ++itr; + } + + // Remove the list of FCs in the incoming FCDS config from the above list + for (const auto& filter_chain : resources) + { + fc_to_remove.erase(filter_chain.get().name()); + } + + Protobuf::RepeatedPtrField to_be_deleted_fc_list; + for (const auto& filter_chain : fc_to_remove) { + *to_be_deleted_fc_list.Add() = filter_chain; + } + + // Delete the old fcs and add the new fcs + onConfigUpdate(resources, to_be_deleted_fc_list, version_info); +} + + +void FcdsApi::onConfigUpdate( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, const std::string&) { + + // Get all the existing filter chains + auto& fc_name_to_resource_map = filter_chain_manager_.getFcdsResources(); + + // Delete the filter chains which are missing in this config update + std::vector fc_to_del_vector; + + for (const std::string& fc_name : removed_resources) + { + ENVOY_LOG(debug, "fcds: removing filter chain from FCM: fcds name = {} resource={}", fcds_name_, fc_name); + auto itr = fc_name_to_resource_map.find(fc_name); + if (itr != fc_name_to_resource_map.end()) { + const envoy::config::listener::v3::FilterChain& fc = itr->second; + fc_to_del_vector.push_back(fc); + } + } + + if (fc_to_del_vector.size()) { + filter_chain_manager_.removeFilterChains(fc_to_del_vector); + } + + // Add/Update the filter chains which are part of the new update + std::vector fc_to_add_vector; + for (const auto& fcds_resource : added_resources) + { + envoy::config::listener::v3::FilterChain fc_config; + fc_config = dynamic_cast(fcds_resource.get().resource()); + + if (fc_config.name().size() == 0) { + ENVOY_LOG(debug, "fcds: skipping this filter chain config update due to missing filter chain name, resource_name={}", fc_config.name()); + continue; + } + fc_to_add_vector.clear(); + fc_to_add_vector.push_back(&fc_config); + + ENVOY_LOG(debug, "fcds: adding filter chain: fcds name = {} resource_name={}", fcds_name_, fc_config.name()); + filter_chain_manager_.addFilterChains(fc_to_add_vector, nullptr, *fc_builder_, filter_chain_manager_, true); + } + + local_init_target_.ready(); +} + +void FcdsApi::onConfigUpdateFailed( + Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) { + ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); + // We need to allow server startup to continue, even if we have a bad + // config. + local_init_target_.ready(); +} + +bool FcdsApi::validateUpdateSize(int num_resources) { + if (num_resources == 0) { + ENVOY_LOG(debug, "Missing Filter Chain Configuration for {} in onConfigUpdate()", fcds_name_); + local_init_target_.ready(); + return false; + } + + ENVOY_LOG(debug, "fcds: recived {} filter chain for fcds {}", num_resources, fcds_name_); + return true; +} + +} // namespace Server +} // namespace Envoy + + diff --git a/source/server/fcds_impl.h b/source/server/fcds_impl.h new file mode 100644 index 000000000000..2e398cbbf1ff --- /dev/null +++ b/source/server/fcds_impl.h @@ -0,0 +1,104 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "envoy/admin/v3/config_dump.pb.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/listener/v3/listener_components.pb.h" +#include "envoy/config/listener/v3/listener_components.pb.validate.h" +#include "envoy/config/subscription.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +#include "envoy/http/codes.h" +#include "envoy/local_info/local_info.h" +#include "envoy/server/admin.h" +#include "envoy/server/filter_config.h" +#include "envoy/service/discovery/v3/discovery.pb.h" +#include "envoy/singleton/instance.h" +#include "envoy/stats/scope.h" +#include "envoy/thread_local/thread_local.h" + +#include "source/common/common/callback_impl.h" +#include "source/common/common/cleanup.h" +#include "source/common/common/logger.h" +#include "source/common/init/manager_impl.h" +#include "source/common/init/target_impl.h" +#include "source/common/init/watcher_impl.h" +#include "source/common/protobuf/utility.h" +#include "source/common/config/subscription_base.h" +#include "absl/container/node_hash_map.h" +#include "absl/container/node_hash_set.h" + +namespace Envoy { +namespace Server { +class FilterChainManagerImpl; +class FilterChainFactoryBuilder; +/** + * A class that fetches the filter chain configuration dynamically using the FCDS API and updates them to + * FCDS config providers. + */ +class FcdsApi + : Envoy::Config::SubscriptionBase, + Logger::Loggable { + +public: + FcdsApi( + const envoy::config::listener::v3::Fcds& fcds, + Configuration::FactoryContext& parent_context, + Init::Manager& init_manager, + FilterChainManagerImpl& filter_chain_manager, + FilterChainFactoryBuilder* fc_builder); + + ~FcdsApi(); + std::string versionInfo() const { return system_version_info_; } + + bool validateUpdateSize(int num_resources); + const Init::Target& initTarget() { return parent_init_target_; } + void initialize(); +private: + const std::string fcds_name_; + Envoy::Config::SubscriptionPtr fcds_subscription_; + std::string system_version_info_; + + // Init target used to notify the parent init manager that the subscription [and its sub resource] + // is ready. + Init::SharedTargetImpl parent_init_target_; + // Init watcher on FCDS ready event. This watcher marks parent_init_target_ ready. + Init::WatcherImpl local_init_watcher_; + // Target which starts the FCDS subscription. + Init::TargetImpl local_init_target_; + Init::ManagerImpl local_init_manager_; + Init::Manager& init_manager_; + + Upstream::ClusterManager& cluster_manager_; + Stats::ScopePtr scope_; + Common::CallbackManager<> update_callback_manager_; + + FilterChainManagerImpl& filter_chain_manager_; + FilterChainFactoryBuilder* fc_builder_; + // Config::SubscriptionCallbacks + void onConfigUpdate(const std::vector& resources, + const std::string& version_info) override; + + void onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) override; + + void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException* e) override; + + ABSL_MUST_USE_RESULT Common::CallbackHandlePtr addUpdateCallback(std::function callback) + { + return update_callback_manager_.add(callback); + } +}; + +using FcdsApiPtr = std::shared_ptr; + +} // namespace Server +} // namespace Envoy + + diff --git a/source/server/filter_chain_manager_impl.cc b/source/server/filter_chain_manager_impl.cc index 6dde01909da4..5311acd5aa8b 100644 --- a/source/server/filter_chain_manager_impl.cc +++ b/source/server/filter_chain_manager_impl.cc @@ -205,12 +205,42 @@ bool FilterChainManagerImpl::isWildcardServerName(const std::string& name) { return absl::StartsWith(name, "*."); } +void FilterChainManagerImpl::removeFilterChains( + absl::Span filter_chain_span) +{ + int count = 0; + for (const auto& filter_chain : filter_chain_span) { + if (fc_contexts_.find(filter_chain) != fc_contexts_.end()) { + std::shared_ptr fc_to_delete = fc_contexts_[filter_chain]; + fc_to_delete.reset(); + + if (fc_to_delete == nullptr) { + // Clear the filter chain context map + auto itr1 = fc_contexts_.find(filter_chain); + fc_contexts_.erase(itr1); + + // Clear the filter chain name to filter chain obj map + auto itr2 = fc_name_to_obj_map_.find(filter_chain.name()); + fc_name_to_obj_map_.erase(itr2); + + ++count; + ENVOY_LOG(debug, "filter chain removed successfully; fc name: {}", filter_chain.name()); + } else { + ENVOY_LOG(debug, "filter chain could not be removed"); + continue; + } + } + } + ENVOY_LOG(debug, "new fc_contexts has {} filter chains, with {} jus deleted",fc_contexts_.size(), count); +} + void FilterChainManagerImpl::addFilterChains( const xds::type::matcher::v3::Matcher* filter_chain_matcher, absl::Span filter_chain_span, const envoy::config::listener::v3::FilterChain* default_filter_chain, FilterChainFactoryBuilder& filter_chain_factory_builder, - FilterChainFactoryContextCreator& context_creator) { + FilterChainFactoryContextCreator& context_creator, + bool is_fcds_config_update) { Cleanup cleanup([this]() { origin_ = absl::nullopt; }); absl::node_hash_map @@ -241,11 +271,19 @@ void FilterChainManagerImpl::addFilterChains( // Reuse created filter chain if possible. // FilterChainManager maintains the lifetime of FilterChainFactoryContext // ListenerImpl maintains the dependencies of FilterChainFactoryContext - auto filter_chain_impl = findExistingFilterChain(*filter_chain); + bool is_valid_fcds_update = false; + auto filter_chain_impl = findExistingFilterChain(*filter_chain, is_fcds_config_update); if (filter_chain_impl == nullptr) { filter_chain_impl = filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator); ++new_filter_chain_size; + } else if (is_fcds_config_update) { + is_valid_fcds_update = true; + } + + bool fcds_valid_update = false; + if (is_fcds_update && filter_chain_impl) { + fcds_valid_update = true; } // If using the matcher, require usage of "name" field and skip building the index. @@ -302,10 +340,12 @@ void FilterChainManagerImpl::addFilterChains( server_names, filter_chain_match.transport_protocol(), filter_chain_match.application_protocols(), direct_source_ips, filter_chain_match.source_type(), source_ips, filter_chain_match.source_ports(), - filter_chain_impl); + filter_chain_impl, + is_valid_fcds_update); } fc_contexts_[*filter_chain] = filter_chain_impl; + fc_name_to_obj_map_[filter_chain->name()] = *filter_chain; } convertIPsToTries(); copyOrRebuildDefaultFilterChain(default_filter_chain, filter_chain_factory_builder, @@ -363,7 +403,8 @@ void FilterChainManagerImpl::addFilterChainForDestinationPorts( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (destination_ports_map.find(destination_port) == destination_ports_map.end()) { destination_ports_map[destination_port] = std::make_pair(DestinationIPsMap{}, nullptr); @@ -382,7 +423,8 @@ void FilterChainManagerImpl::addFilterChainForDestinationIPs( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (destination_ips.empty()) { addFilterChainForServerNames(destination_ips_map[EMPTY_STRING], server_names, transport_protocol, application_protocols, direct_source_ips, @@ -404,7 +446,8 @@ void FilterChainManagerImpl::addFilterChainForServerNames( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (server_names_map_ptr == nullptr) { server_names_map_ptr = std::make_shared(); } @@ -437,7 +480,8 @@ void FilterChainManagerImpl::addFilterChainForApplicationProtocols( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (application_protocols.empty()) { addFilterChainForDirectSourceIPs(application_protocols_map[EMPTY_STRING].first, direct_source_ips, source_type, source_ips, source_ports, @@ -456,7 +500,8 @@ void FilterChainManagerImpl::addFilterChainForDirectSourceIPs( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (direct_source_ips.empty()) { addFilterChainForSourceTypes(direct_source_ips_map[EMPTY_STRING], source_type, source_ips, source_ports, filter_chain); @@ -473,7 +518,8 @@ void FilterChainManagerImpl::addFilterChainForSourceTypes( const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (source_types_array_ptr == nullptr) { source_types_array_ptr = std::make_shared(); } @@ -493,7 +539,8 @@ void FilterChainManagerImpl::addFilterChainForSourceTypes( void FilterChainManagerImpl::addFilterChainForSourceIPs( SourceIPsMap& source_ips_map, const std::string& source_ip, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (source_ports.empty()) { addFilterChainForSourcePorts(source_ips_map[source_ip], 0, filter_chain); } else { @@ -505,13 +552,14 @@ void FilterChainManagerImpl::addFilterChainForSourceIPs( void FilterChainManagerImpl::addFilterChainForSourcePorts( SourcePortsMapSharedPtr& source_ports_map_ptr, uint32_t source_port, - const Network::FilterChainSharedPtr& filter_chain) { + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update) { if (source_ports_map_ptr == nullptr) { source_ports_map_ptr = std::make_shared(); } auto& source_ports_map = *source_ports_map_ptr; - if (!source_ports_map.try_emplace(source_port, filter_chain).second) { + if (!source_ports_map.try_emplace(source_port, filter_chain).second && !is_valid_fcds_update) { // If we got here and found already configured branch, then it means that this FilterChainMatch // is a duplicate, and that there is some overlap in the repeated fields with already processed // FilterChainMatches. @@ -823,17 +871,34 @@ void FilterChainManagerImpl::convertIPsToTries() { } Network::DrainableFilterChainSharedPtr FilterChainManagerImpl::findExistingFilterChain( - const envoy::config::listener::v3::FilterChain& filter_chain_message) { + const envoy::config::listener::v3::FilterChain& filter_chain_message, + bool is_fcds_config_update) { // Origin filter chain manager could be empty if the current is the ancestor. - const auto* origin = getOriginFilterChainManager(); - if (origin == nullptr) { - return nullptr; + try { + const auto* origin = getOriginFilterChainManager(); + if (origin == nullptr) { + return nullptr; + } + auto iter = origin->fc_contexts_.find(filter_chain_message); + if (iter != origin->fc_contexts_.end()) { + // copy the context to this filter chain manager. + fc_contexts_.emplace(filter_chain_message, iter->second); + return iter->second; + } } - auto iter = origin->fc_contexts_.find(filter_chain_message); - if (iter != origin->fc_contexts_.end()) { - // copy the context to this filter chain manager. - fc_contexts_.emplace(filter_chain_message, iter->second); - return iter->second; + catch(const std::exception& e) + { + // FCDS: Filter Chains in the new config update has to be matched with the old config + // associated with the existing FCM. Once found this new filter chain obj is re-used + // instead of recreating + if (is_fcds_config_update){ + auto iter = this->fc_contexts_.find(filter_chain_message); + if (iter != this->fc_contexts_.end()) { + return iter->second; + } + } else { + throw; + } } return nullptr; } @@ -903,5 +968,10 @@ envoy::config::core::v3::TrafficDirection FactoryContextImpl::direction() const Network::DrainDecision& FactoryContextImpl::drainDecision() { return drain_decision_; } Stats::Scope& FactoryContextImpl::listenerScope() { return listener_scope_; } bool FactoryContextImpl::isQuicListener() const { return is_quic_; } + +FcdsApiPtr FilterChainManagerImpl::createFcdsApi(const envoy::config::listener::v3::Fcds& fcds_config, + FilterChainFactoryBuilder* fc_builder){ + return std::make_shared(fcds_config, parent_context_, init_manager_, *this, fc_builder); +}; } // namespace Server } // namespace Envoy diff --git a/source/server/filter_chain_manager_impl.h b/source/server/filter_chain_manager_impl.h index c9503eafd8e9..159b9c48a364 100644 --- a/source/server/filter_chain_manager_impl.h +++ b/source/server/filter_chain_manager_impl.h @@ -22,6 +22,7 @@ #include "source/server/filter_chain_factory_context_callback.h" #include "absl/container/flat_hash_map.h" +#include "source/server/fcds_impl.h" namespace Envoy { namespace Server { @@ -199,6 +200,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, using FcContextMap = absl::flat_hash_map; + using FcdsResourcesMap = + absl::flat_hash_map; FilterChainManagerImpl(const std::vector& addresses, Configuration::FactoryContext& factory_context, Init::Manager& init_manager) @@ -223,7 +226,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, absl::Span filter_chain_span, const envoy::config::listener::v3::FilterChain* default_filter_chain, FilterChainFactoryBuilder& filter_chain_factory_builder, - FilterChainFactoryContextCreator& context_creator); + FilterChainFactoryContextCreator& context_creator, + bool is_fcds_update = false); static bool isWildcardServerName(const std::string& name); @@ -237,7 +241,14 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const Network::DrainableFilterChainSharedPtr& defaultFilterChain() const { return default_filter_chain_; } - + void removeFilterChains( + absl::Span filter_chain_span); + + const FcdsResourcesMap& getFcdsResources() {return fc_name_to_obj_map_;} + + void fcdsInit(); + FcdsApiPtr createFcdsApi(const envoy::config::listener::v3::Fcds& fcds_config, + FilterChainFactoryBuilder* fc_builder); private: void convertIPsToTries(); const Network::FilterChain* @@ -292,7 +303,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForDestinationIPs( DestinationIPsMap& destination_ips_map, const std::vector& destination_ips, const absl::Span server_names, const std::string& transport_protocol, @@ -301,7 +313,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForServerNames( ServerNamesMapSharedPtr& server_names_map_ptr, const absl::Span server_names, const std::string& transport_protocol, @@ -310,7 +323,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForApplicationProtocols( ApplicationProtocolsMap& application_protocol_map, const absl::Span application_protocols, @@ -318,25 +332,30 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForDirectSourceIPs( DirectSourceIPsMap& direct_source_ips_map, const std::vector& direct_source_ips, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForSourceTypes( SourceTypesArraySharedPtr& source_types_array_ptr, const envoy::config::listener::v3::FilterChainMatch::ConnectionSourceType source_type, const std::vector& source_ips, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForSourceIPs(SourceIPsMap& source_ips_map, const std::string& source_ip, const absl::Span source_ports, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); void addFilterChainForSourcePorts(SourcePortsMapSharedPtr& source_ports_map_ptr, uint32_t source_port, - const Network::FilterChainSharedPtr& filter_chain); + const Network::FilterChainSharedPtr& filter_chain, + bool is_valid_fcds_update); const Network::FilterChain* findFilterChainForDestinationIP(const DestinationIPsTrie& destination_ips_trie, @@ -364,7 +383,8 @@ class FilterChainManagerImpl : public Network::FilterChainManager, const FilterChainManagerImpl* getOriginFilterChainManager() { return origin_.value(); } // Duplicate the inherent factory context if any. Network::DrainableFilterChainSharedPtr - findExistingFilterChain(const envoy::config::listener::v3::FilterChain& filter_chain_message); + findExistingFilterChain(const envoy::config::listener::v3::FilterChain& filter_chain_message, + bool is_fcds_config_update = false); // Mapping from filter chain message to filter chain. This is used by LDS response handler to // detect the filter chains in the intersection of existing listener and new listener. @@ -395,6 +415,9 @@ class FilterChainManagerImpl : public Network::FilterChainManager, // Matcher selecting the filter chain name. Matcher::MatchTreePtr matcher_; + + // Hashmap to maintain fcds resource name (filter chain name) and filter chain config mapping + FcdsResourcesMap fc_name_to_obj_map_; }; } // namespace Server } // namespace Envoy diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 641e4ed3aa38..a2894b746f35 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -689,7 +689,7 @@ void ListenerImpl::createListenerFilterFactories() { } void ListenerImpl::validateFilterChains() { - if (config_.filter_chains().empty() && !config_.has_default_filter_chain() && + if (is_filterchain_missing() && (socket_type_ == Network::Socket::Type::Stream || !udp_listener_config_->listener_factory_->isTransportConnectionless())) { // If we got here, this is a tcp listener or connection-oriented udp listener, so ensure there @@ -697,6 +697,9 @@ void ListenerImpl::validateFilterChains() { throw EnvoyException( fmt::format("error adding listener '{}': no filter chains specified", absl::StrJoin(addresses_, ",", Network::AddressStrFormatter()))); + } else if (!config_.filter_chains().empty() && config_.has_fcds()) { + throw EnvoyException(fmt::format("error adding listener '{}': filter chains and fcds config are mutually exclusive", + absl::StrJoin(addresses_, ",", Network::AddressStrFormatter()))); } else if (udp_listener_config_ != nullptr && !udp_listener_config_->listener_factory_->isTransportConnectionless()) { // Early fail if any filter chain doesn't have transport socket configured. @@ -722,11 +725,18 @@ void ListenerImpl::validateFilterChains() { void ListenerImpl::buildFilterChains() { transport_factory_context_->setInitManager(*dynamic_init_manager_); ListenerFilterChainFactoryBuilder builder(*this, *transport_factory_context_); - filter_chain_manager_->addFilterChains( - config_.has_filter_chain_matcher() ? &config_.filter_chain_matcher() : nullptr, - config_.filter_chains(), - config_.has_default_filter_chain() ? &config_.default_filter_chain() : nullptr, builder, - *filter_chain_manager_); + if(config_.has_fcds()) { + ENVOY_LOG(debug, "buildFilterChains: adding the FCDS config watcher"); + ListenerFilterChainFactoryBuilder* fcds_builder = new ListenerFilterChainFactoryBuilder(*this, *transport_factory_context_); + fcds_api_ = filter_chain_manager_.createFcdsApi(config_.fcds(), fcds_builder); + } else if (!config_.filter_chains().empty()) { + ENVOY_LOG(debug, "buildFilterChains: processing filter_chain config"); + filter_chain_manager_->addFilterChains( + config_.has_filter_chain_matcher() ? &config_.filter_chain_matcher() : nullptr, + config_.filter_chains(), + config_.has_default_filter_chain() ? &config_.default_filter_chain() : nullptr, builder, + *filter_chain_manager_); + } } void ListenerImpl::buildConnectionBalancer(const Network::Address::Instance& address) { @@ -951,6 +961,10 @@ void ListenerImpl::initialize() { // manager directly. dynamic_init_manager_->initialize(local_init_watcher_); } + + if(config_.has_fcds()) { + fcds_api_->initialize(); + } } ListenerImpl::~ListenerImpl() { @@ -1137,5 +1151,12 @@ bool ListenerMessageUtil::filterChainOnlyChange(const envoy::config::listener::v return differencer.Compare(lhs, rhs); } +bool ListenerImpl::is_filterchain_missing() { + if (config_.filter_chains().empty() && !config_.has_default_filter_chain() && !config_.has_fcds()) + return true; + + return false; +} + } // namespace Server } // namespace Envoy diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 7ed29495567e..2a2d9114076c 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -442,6 +442,7 @@ class ListenerImpl final : public Network::ListenerConfig, void buildListenSocketOptions(); void createListenerFilterFactories(); void validateFilterChains(); + bool is_filterchain_missing(); void buildFilterChains(); void buildConnectionBalancer(const Network::Address::Instance& address); void buildSocketOptions(); @@ -515,6 +516,10 @@ class ListenerImpl final : public Network::ListenerConfig, // to access ListenerManagerImpl::factory_. friend class ListenerFilterChainFactoryBuilder; + + + // FCDS API object holds the fcds config class + FcdsApiPtr fcds_api_; }; } // namespace Server diff --git a/source/server/proto_descriptors.cc b/source/server/proto_descriptors.cc index e94492c531cc..6b942fe687ac 100644 --- a/source/server/proto_descriptors.cc +++ b/source/server/proto_descriptors.cc @@ -32,6 +32,9 @@ void validateProtoDescriptors() { "envoy.service.route.v3.RouteDiscoveryService.DeltaRoutes", "envoy.service.runtime.v3.RuntimeDiscoveryService.StreamRuntime", "envoy.service.runtime.v3.RuntimeDiscoveryService.FetchRuntime", + "envoy.service.filter_chain.v3.FcdsDiscoveryService.FetchFilterChains", + "envoy.service.filter_chain.v3.FcdsDiscoveryService.StreamFilterChains", + "envoy.service.filter_chain.v3.FcdsDiscoveryService.DeltaFilterChains", }; for (const auto& method : methods) { From 5bbb28461d57329b212ad8d3394d4e36d6b0f5b3 Mon Sep 17 00:00:00 2001 From: Rakesh Datta Date: Thu, 17 Nov 2022 12:49:50 -0800 Subject: [PATCH 2/2] Adding support for filter chain only draining during FCDS Signed-off-by: Rakesh Datta --- api/envoy/config/listener/v3/listener.proto | 8 +- api/envoy/service/filter_chain/fcds.proto | 2 +- envoy/server/listener_manager.h | 6 + source/server/fcds_impl.cc | 158 +++++++++++++------- source/server/fcds_impl.h | 3 +- source/server/filter_chain_manager_impl.cc | 62 ++++---- source/server/filter_chain_manager_impl.h | 25 +++- source/server/listener_manager_impl.cc | 68 +++++++++ source/server/listener_manager_impl.h | 6 +- 9 files changed, 245 insertions(+), 93 deletions(-) diff --git a/api/envoy/config/listener/v3/listener.proto b/api/envoy/config/listener/v3/listener.proto index 84fc37a117b0..0e0b13ea04a9 100644 --- a/api/envoy/config/listener/v3/listener.proto +++ b/api/envoy/config/listener/v3/listener.proto @@ -139,8 +139,8 @@ message Listener { // :ref:`FAQ entry `. repeated FilterChain filter_chains = 3; - // FCDS: Filter Chain Discovery Service config block. filter_chains and fcds are mutually exclusive - Fcds fcds = 34; + // Listener FCDS: Listener Filter Chain Discovery Service config block. filter_chains and fcds are mutually exclusive + ListenerFcds fcds_cfg = 34; // :ref:`Matcher API ` resolving the filter chain name from the // network properties. This matcher is used as a replacement for the filter chain match condition @@ -363,7 +363,7 @@ message Listener { bool ignore_global_conn_limit = 31; } -message Fcds{ - string fcds_name = 1 [(validate.rules).string.min_len = 1]; +message ListenerFcds{ + string name = 1 [(validate.rules).string.min_len = 1]; envoy.config.core.v3.ConfigSource config_source = 2 [(validate.rules).message.required = true]; } diff --git a/api/envoy/service/filter_chain/fcds.proto b/api/envoy/service/filter_chain/fcds.proto index b844a0f63ef5..2adabe5a672d 100644 --- a/api/envoy/service/filter_chain/fcds.proto +++ b/api/envoy/service/filter_chain/fcds.proto @@ -23,7 +23,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // associated HTTP connection manager filters) to use different route // configurations. Each listener will bind its HTTP connection manager filter to // a route table via this identifier. -service FcdsDiscoveryService { +service ListenerFcdsDiscoveryService { option (envoy.annotations.resource).type = "envoy.config.litener.v3.FilterChain"; rpc StreamFilterChains(stream discovery.v3.DiscoveryRequest) diff --git a/envoy/server/listener_manager.h b/envoy/server/listener_manager.h index b59c2a1ba7d8..c855e41a89ff 100644 --- a/envoy/server/listener_manager.h +++ b/envoy/server/listener_manager.h @@ -253,6 +253,12 @@ class ListenerManager { * @return TRUE if the worker has started or FALSE if not. */ virtual bool isWorkerStarted() PURE; + + /* + * Listener FCDS: This method is invoke to drain removed or modified filter chains' connections + */ + virtual void startDrainingSequenceForListenerFcds(std::string draining_listener, + std::list filter_chains) PURE; }; // overload operator| to allow ListenerManager::listeners(ListenerState) to be called using a diff --git a/source/server/fcds_impl.cc b/source/server/fcds_impl.cc index 00e4c0aea6b2..6278fd2c78af 100644 --- a/source/server/fcds_impl.cc +++ b/source/server/fcds_impl.cc @@ -20,55 +20,44 @@ namespace Envoy { namespace Server { -FcdsApi::FcdsApi( - const envoy::config::listener::v3::Fcds& fcds, - Configuration::FactoryContext& parent_context, - Init::Manager& init_manager, - FilterChainManagerImpl& filter_chain_manager, - FilterChainFactoryBuilder* fc_builder) +FcdsApi::FcdsApi(const envoy::config::listener::v3::Fcds& fcds, + Configuration::FactoryContext& parent_context, Init::Manager& init_manager, + FilterChainManagerImpl& filter_chain_manager, + FilterChainFactoryBuilder* fc_builder) : Envoy::Config::SubscriptionBase( - parent_context.messageValidationContext().dynamicValidationVisitor(), "name"), + parent_context.messageValidationContext().dynamicValidationVisitor(), "name"), fcds_name_(fcds.fcds_name()), parent_init_target_(fmt::format("FcdsConfigSubscription parent_init_target {}", fcds_name_), [this]() { local_init_manager_.initialize(local_init_watcher_); }), local_init_watcher_(fmt::format("Fcds local_init_watcher {}", fcds.fcds_name()), [this]() { parent_init_target_.ready(); }), - local_init_target_( - fmt::format("FcdsConfigSubscription local_init_target {}", fcds_name_), - [this]() { fcds_subscription_->start({fcds_name_}); }), + local_init_target_(fmt::format("FcdsConfigSubscription local_init_target {}", fcds_name_), + [this]() { fcds_subscription_->start({fcds_name_}); }), local_init_manager_(fmt::format("FCDS local_init_manager {}", fcds_name_)), - init_manager_(init_manager), - cluster_manager_(parent_context.clusterManager()), + init_manager_(init_manager), cluster_manager_(parent_context.clusterManager()), scope_(parent_context.listenerScope().createScope("")), - filter_chain_manager_(filter_chain_manager), - fc_builder_(fc_builder) + filter_chain_manager_(filter_chain_manager), fc_builder_(fc_builder) { const auto resource_name = getResourceName(); - - fcds_subscription_ = - cluster_manager_.subscriptionFactory().subscriptionFromConfigSource( - fcds.config_source(), Grpc::Common::typeUrl(resource_name), *scope_, *this, - resource_decoder_, {}); + fcds_subscription_ = cluster_manager_.subscriptionFactory().subscriptionFromConfigSource( + fcds.config_source(), Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, + {}); } - -FcdsApi::~FcdsApi() -{ +FcdsApi::~FcdsApi() { // If we get destroyed during initialization, make sure we signal that we "initialized". local_init_target_.ready(); } -void FcdsApi::initialize() -{ +void FcdsApi::initialize() { local_init_manager_.add(local_init_target_); init_manager_.add(parent_init_target_); } -void FcdsApi::onConfigUpdate( - const std::vector& resources, - const std::string& version_info) { +void FcdsApi::onConfigUpdate(const std::vector& resources, + const std::string& version_info) { if (!validateUpdateSize(resources.size())) { return; @@ -79,43 +68,47 @@ void FcdsApi::onConfigUpdate( auto fc_name_to_resource_map = filter_chain_manager_.getFcdsResources(); auto itr = fc_name_to_resource_map.begin(); while (itr != fc_name_to_resource_map.end()) { - fc_to_remove.insert(itr->first); - ++itr; + fc_to_remove.insert(itr->first); + ++itr; } - // Remove the list of FCs in the incoming FCDS config from the above list - for (const auto& filter_chain : resources) - { + // Remove the list of FCs in the incoming FCDS config from the above list + for (const auto& filter_chain : resources) { fc_to_remove.erase(filter_chain.get().name()); } Protobuf::RepeatedPtrField to_be_deleted_fc_list; for (const auto& filter_chain : fc_to_remove) { - *to_be_deleted_fc_list.Add() = filter_chain; + *to_be_deleted_fc_list.Add() = filter_chain; } // Delete the old fcs and add the new fcs onConfigUpdate(resources, to_be_deleted_fc_list, version_info); } +void FcdsApi::onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string&) { -void FcdsApi::onConfigUpdate( - const std::vector& added_resources, - const Protobuf::RepeatedPtrField& removed_resources, const std::string&) { - // Get all the existing filter chains auto& fc_name_to_resource_map = filter_chain_manager_.getFcdsResources(); - - // Delete the filter chains which are missing in this config update + auto& fc_msg_to_ptr_map = filter_chain_manager_.filterChainsByMessage(); + auto& fc_name_to_hash_map = filter_chain_manager_.getFcHashes(); + + // Delete and drain the filter chains which are missing in this config update std::vector fc_to_del_vector; - for (const std::string& fc_name : removed_resources) - { - ENVOY_LOG(debug, "fcds: removing filter chain from FCM: fcds name = {} resource={}", fcds_name_, fc_name); + for (const std::string& fc_name : removed_resources) { + ENVOY_LOG(debug, "fcds: removing filter chain from FCM: fcds name = {} resource={}", fcds_name_, + fc_name); auto itr = fc_name_to_resource_map.find(fc_name); if (itr != fc_name_to_resource_map.end()) { - const envoy::config::listener::v3::FilterChain& fc = itr->second; - fc_to_del_vector.push_back(fc); + const envoy::config::listener::v3::FilterChain& fc = itr->second; + // Add this fc to the to-be-deleted list + fc_to_del_vector.push_back(fc); + + // Add this fc to the to-be-drained list + filter_chain_manager_.addFcToFcdsDrainingList(fc_msg_to_ptr_map.find(fc)->second); } } @@ -125,27 +118,77 @@ void FcdsApi::onConfigUpdate( // Add/Update the filter chains which are part of the new update std::vector fc_to_add_vector; - for (const auto& fcds_resource : added_resources) - { + + for (const auto& fcds_resource : added_resources) { envoy::config::listener::v3::FilterChain fc_config; - fc_config = dynamic_cast(fcds_resource.get().resource()); - + fc_config = dynamic_cast( + fcds_resource.get().resource()); + + // Ignore filter chains without a name if (fc_config.name().size() == 0) { - ENVOY_LOG(debug, "fcds: skipping this filter chain config update due to missing filter chain name, resource_name={}", fc_config.name()); + ENVOY_LOG(debug, + "fcds: skipping this filter chain config update due to missing filter chain name"); continue; - } - fc_to_add_vector.clear(); - fc_to_add_vector.push_back(&fc_config); + } + + fc_to_add_vector.clear(); + fc_to_del_vector.clear(); + std::string fc_name = fc_config.name(); + ENVOY_LOG(debug, "fcds: processing config update for fc={}", fc_name); + + // Updating an existing filter chain + if (fc_name_to_hash_map.find(fc_config.name()) != fc_name_to_hash_map.end()) { + // Ignore the config update if the old and new configs are same + auto hash = MessageUtil::hash(fc_config); + auto itr1 = fc_name_to_hash_map.find(fc_name); + if (hash == itr1->second) { + ENVOY_LOG(debug, "fcds: new and old filter chain config are same. Ignoring update for fc={}", + fc_name); + continue; + } + + ENVOY_LOG( + debug, + "fcds: update for an existing filter chain, performing deletion and addition for fc={}", + fc_name); + + auto itr2 = fc_name_to_resource_map.find(fc_name); + if (itr2 != fc_name_to_resource_map.end()) { + const envoy::config::listener::v3::FilterChain& fc = itr2->second; + filter_chain_manager_.addFcToFcdsDrainingList(fc_msg_to_ptr_map.find(fc)->second); + + // Add the new filter chain + fc_to_add_vector.push_back(&fc_config); + // Delete the old filter chain + fc_to_del_vector.push_back(fc); + } + + } else { + //Incoming filter chain is brand new + ENVOY_LOG(debug, "fcds: adding a brand new filter chain: fcds name = {} resource_name={}", + fcds_name_, fc_config.name()); + + // Add this filter chain + fc_to_add_vector.push_back(&fc_config); + } - ENVOY_LOG(debug, "fcds: adding filter chain: fcds name = {} resource_name={}", fcds_name_, fc_config.name()); - filter_chain_manager_.addFilterChains(fc_to_add_vector, nullptr, *fc_builder_, filter_chain_manager_, true); + // Finally invoke the deletion call + if (fc_to_del_vector.size()) { + filter_chain_manager_.removeFilterChains(fc_to_del_vector); + } + + // Finally invoke the addition call + filter_chain_manager_.addFilterChains(fc_to_add_vector, nullptr, *fc_builder_, + filter_chain_manager_, true); } + filter_chain_manager_.startDrainingSequenceForListenerFcds(); + local_init_target_.ready(); } -void FcdsApi::onConfigUpdateFailed( - Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) { +void FcdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException*) { ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); // We need to allow server startup to continue, even if we have a bad // config. @@ -159,11 +202,10 @@ bool FcdsApi::validateUpdateSize(int num_resources) { return false; } - ENVOY_LOG(debug, "fcds: recived {} filter chain for fcds {}", num_resources, fcds_name_); + ENVOY_LOG(debug, "fcds: recieved {} filter chain for fcds {}", num_resources, fcds_name_); return true; } } // namespace Server } // namespace Envoy - diff --git a/source/server/fcds_impl.h b/source/server/fcds_impl.h index 2e398cbbf1ff..41f9fed9344c 100644 --- a/source/server/fcds_impl.h +++ b/source/server/fcds_impl.h @@ -31,11 +31,13 @@ #include "source/common/config/subscription_base.h" #include "absl/container/node_hash_map.h" #include "absl/container/node_hash_set.h" +//#include "envoy/server/listener_manager.h" namespace Envoy { namespace Server { class FilterChainManagerImpl; class FilterChainFactoryBuilder; + /** * A class that fetches the filter chain configuration dynamically using the FCDS API and updates them to * FCDS config providers. @@ -79,7 +81,6 @@ class FcdsApi FilterChainManagerImpl& filter_chain_manager_; FilterChainFactoryBuilder* fc_builder_; - // Config::SubscriptionCallbacks void onConfigUpdate(const std::vector& resources, const std::string& version_info) override; diff --git a/source/server/filter_chain_manager_impl.cc b/source/server/filter_chain_manager_impl.cc index 5311acd5aa8b..f99f77418205 100644 --- a/source/server/filter_chain_manager_impl.cc +++ b/source/server/filter_chain_manager_impl.cc @@ -211,24 +211,15 @@ void FilterChainManagerImpl::removeFilterChains( int count = 0; for (const auto& filter_chain : filter_chain_span) { if (fc_contexts_.find(filter_chain) != fc_contexts_.end()) { - std::shared_ptr fc_to_delete = fc_contexts_[filter_chain]; - fc_to_delete.reset(); - - if (fc_to_delete == nullptr) { - // Clear the filter chain context map - auto itr1 = fc_contexts_.find(filter_chain); - fc_contexts_.erase(itr1); - // Clear the filter chain name to filter chain obj map auto itr2 = fc_name_to_obj_map_.find(filter_chain.name()); fc_name_to_obj_map_.erase(itr2); + auto itr1 = fc_name_to_hash_map_.find(filter_chain.name()); + fc_name_to_hash_map_.erase(itr1); + ++count; - ENVOY_LOG(debug, "filter chain removed successfully; fc name: {}", filter_chain.name()); - } else { - ENVOY_LOG(debug, "filter chain could not be removed"); - continue; - } + //ENVOY_LOG(debug, "RAKESH Filter chain context removed from master; fc name: {}", filter_chain.name()); } } ENVOY_LOG(debug, "new fc_contexts has {} filter chains, with {} jus deleted",fc_contexts_.size(), count); @@ -271,19 +262,11 @@ void FilterChainManagerImpl::addFilterChains( // Reuse created filter chain if possible. // FilterChainManager maintains the lifetime of FilterChainFactoryContext // ListenerImpl maintains the dependencies of FilterChainFactoryContext - bool is_valid_fcds_update = false; auto filter_chain_impl = findExistingFilterChain(*filter_chain, is_fcds_config_update); if (filter_chain_impl == nullptr) { filter_chain_impl = filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator); ++new_filter_chain_size; - } else if (is_fcds_config_update) { - is_valid_fcds_update = true; - } - - bool fcds_valid_update = false; - if (is_fcds_update && filter_chain_impl) { - fcds_valid_update = true; } // If using the matcher, require usage of "name" field and skip building the index. @@ -341,11 +324,12 @@ void FilterChainManagerImpl::addFilterChains( filter_chain_match.application_protocols(), direct_source_ips, filter_chain_match.source_type(), source_ips, filter_chain_match.source_ports(), filter_chain_impl, - is_valid_fcds_update); + is_fcds_confgig_update); } fc_contexts_[*filter_chain] = filter_chain_impl; fc_name_to_obj_map_[filter_chain->name()] = *filter_chain; + fc_name_to_hash_map_[filter_chain->name()] = MessageUtil::hash(*filter_chain); } convertIPsToTries(); copyOrRebuildDefaultFilterChain(default_filter_chain, filter_chain_factory_builder, @@ -558,8 +542,13 @@ void FilterChainManagerImpl::addFilterChainForSourcePorts( source_ports_map_ptr = std::make_shared(); } auto& source_ports_map = *source_ports_map_ptr; + auto itr = source_ports_map.find(source_port); + if ((itr != source_ports_map.end()) && + (is_valid_fcds_update)) { + source_ports_map.erase(itr); + } - if (!source_ports_map.try_emplace(source_port, filter_chain).second && !is_valid_fcds_update) { + if (!source_ports_map.try_emplace(source_port, filter_chain).second) { // If we got here and found already configured branch, then it means that this FilterChainMatch // is a duplicate, and that there is some overlap in the repeated fields with already processed // FilterChainMatches. @@ -894,6 +883,8 @@ Network::DrainableFilterChainSharedPtr FilterChainManagerImpl::findExistingFilte if (is_fcds_config_update){ auto iter = this->fc_contexts_.find(filter_chain_message); if (iter != this->fc_contexts_.end()) { + // FC is unmodified so no need to drain + this->removeFcFromFcdsDrainingList(iter->second); return iter->second; } } else { @@ -970,8 +961,29 @@ Stats::Scope& FactoryContextImpl::listenerScope() { return listener_scope_; } bool FactoryContextImpl::isQuicListener() const { return is_quic_; } FcdsApiPtr FilterChainManagerImpl::createFcdsApi(const envoy::config::listener::v3::Fcds& fcds_config, - FilterChainFactoryBuilder* fc_builder){ + FilterChainFactoryBuilder* fc_builder, + ListenerManager* listener_mgr, + std::string listener_name){ + listener_manager_ = listener_mgr; + listener_name_ = listener_name; return std::make_shared(fcds_config, parent_context_, init_manager_, *this, fc_builder); -}; +} +void FilterChainManagerImpl::addFcToFcdsDrainingList(Network::DrainableFilterChainSharedPtr fc) +{ + this->draining_fc_list_.push_back(fc); +} +void FilterChainManagerImpl::removeFcFromFcdsDrainingList(Network::DrainableFilterChainSharedPtr fc) +{ + this->draining_fc_list_.remove(fc); +} +void FilterChainManagerImpl::startDrainingSequenceForListenerFcds() +{ + if (this->draining_fc_list_.size()) { + ENVOY_LOG(debug, "FCDS: {} filter chains to be drained", this->draining_fc_list_.size()); + listener_manager_->startDrainingSequenceForListenerFcds(listener_name_, + this->draining_fc_list_); + this->draining_fc_list_.clear(); + } +} } // namespace Server } // namespace Envoy diff --git a/source/server/filter_chain_manager_impl.h b/source/server/filter_chain_manager_impl.h index 159b9c48a364..175385d7842d 100644 --- a/source/server/filter_chain_manager_impl.h +++ b/source/server/filter_chain_manager_impl.h @@ -202,6 +202,10 @@ class FilterChainManagerImpl : public Network::FilterChainManager, Network::DrainableFilterChainSharedPtr, MessageUtil, MessageUtil>; using FcdsResourcesMap = absl::flat_hash_map; + + using FcHashValueMap = + absl::flat_hash_map; + FilterChainManagerImpl(const std::vector& addresses, Configuration::FactoryContext& factory_context, Init::Manager& init_manager) @@ -243,12 +247,14 @@ class FilterChainManagerImpl : public Network::FilterChainManager, } void removeFilterChains( absl::Span filter_chain_span); - const FcdsResourcesMap& getFcdsResources() {return fc_name_to_obj_map_;} - + const FcHashValueMap& getFcHashes() {return fc_name_to_hash_map_;} void fcdsInit(); FcdsApiPtr createFcdsApi(const envoy::config::listener::v3::Fcds& fcds_config, - FilterChainFactoryBuilder* fc_builder); + FilterChainFactoryBuilder* fc_builder);a + void addFcToFcdsDrainingList(Network::DrainableFilterChainSharedPtr); + void removeFcFromFcdsDrainingList(Network::DrainableFilterChainSharedPtr); + void startDrainingSequenceForListenerFcds(); private: void convertIPsToTries(); const Network::FilterChain* @@ -293,6 +299,7 @@ class FilterChainManagerImpl : public Network::FilterChainManager, using DestinationIPsTriePtr = std::unique_ptr; using DestinationPortsMap = absl::flat_hash_map>; + using FilterChainsList = std::list; void addFilterChainForDestinationPorts( DestinationPortsMap& destination_ports_map, uint16_t destination_port, @@ -418,6 +425,18 @@ class FilterChainManagerImpl : public Network::FilterChainManager, // Hashmap to maintain fcds resource name (filter chain name) and filter chain config mapping FcdsResourcesMap fc_name_to_obj_map_; + + // Hashmap to maintain fcds resource name (filter chain name) and filter chain config hash mapping + FcHashValueMap fc_name_to_hash_map_; + + // List of filters to be drained during Listener FCDS + FilterChainsList draining_fc_list_; + + // Pointer back to the listener manager that is managing this FCM + ListenerManager* listener_manager_; + + // Listener that uses this FCM + std::string listener_name_; }; } // namespace Server } // namespace Envoy diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index 9fa6bfb74a20..6899a8e445ae 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -276,6 +276,11 @@ DrainingFilterChainsManager::DrainingFilterChainsManager(ListenerImplPtr&& drain : draining_listener_(std::move(draining_listener)), workers_pending_removal_(workers_pending_removal) {} +DrainingFilterChainsManager::DrainingFilterChainsManager(ListenerImpl& draining_listener, + uint64_t workers_pending_removal) + : draining_listener_(&draining_listener), + workers_pending_removal_(workers_pending_removal) {} + ListenerManagerImpl::ListenerManagerImpl(Instance& server, ListenerComponentFactory& listener_factory, WorkerFactory& worker_factory, @@ -726,6 +731,69 @@ void ListenerManagerImpl::inPlaceFilterChainUpdate(ListenerImpl& listener) { updateWarmingActiveGauges(); } +void ListenerManagerImpl::startDrainingSequenceForListenerFcds(std::string draining_listener, + std::list filter_chains) { + ASSERT(!active_listeners_.empty()); + + auto existing_active_listener = getListenerByName(active_listeners_, draining_listener); + + ASSERT(existing_active_listener != active_listeners_.end()); + ASSERT(*existing_active_listener != nullptr); + (*existing_active_listener)->debugLog("FCDS: execute listener filter chain update"); + + if (existing_active_listener != active_listeners_.end()) { + ENVOY_LOG(debug, "FCDS: filter chain draining request for {} filter chains, accepted for listener name={}", + filter_chains.size(), draining_listener); + stats_.listener_in_place_updated_.inc(); + } else { + ENVOY_LOG(debug, "FCDS: filter chain draining request rejected for listener name={}", + draining_listener); + return; + } + + ListenerImpl& current_listener = **existing_active_listener; + + std::list::iterator draining_group = + draining_filter_chains_manager_.emplace(draining_filter_chains_manager_.begin(), + current_listener, workers_.size()); + getListenerByName(active_listeners_, draining_listener); + + for (Network::DrainableFilterChainSharedPtr fc: filter_chains) { + fc->startDraining(); + draining_group->addFilterChainToDrain(*fc); + } + + auto filter_chain_size = draining_group->numDrainingFilterChains(); + stats_.total_filter_chains_draining_.add(filter_chain_size); + + // Start the drain sequence which completes when the listener's drain manager has completed + // draining at whatever the server configured drain times are. + draining_group->startDrainSequence( + server_.options().drainTime(), server_.dispatcher(), [this, draining_group]() -> void { + draining_group->getDrainingListener().debugLog( + absl::StrCat("removing draining filter chains from listener ", + draining_group->getDrainingListener().name())); + for (const auto& worker : workers_) { + // Once the drain time has completed via the drain manager's timer, we tell the workers + // to remove the filter chains. + worker->removeFilterChains( + draining_group->getDrainingListenerTag(), draining_group->getDrainingFilterChains(), + [this, draining_group]() -> void { + server_.dispatcher().post([this, draining_group]() -> void { + if (draining_group->decWorkersPendingRemoval() == 0) { + draining_group->getDrainingListener().debugLog( + absl::StrCat("draining filter chains from listener ", + draining_group->getDrainingListener().name(), " complete")); + stats_.total_filter_chains_draining_.sub( + draining_group->numDrainingFilterChains()); + } + }); + }); + } + }); + updateWarmingActiveGauges(); +} + void ListenerManagerImpl::drainFilterChains(ListenerImplPtr&& draining_listener, ListenerImpl& new_listener) { // First add the listener to the draining list. diff --git a/source/server/listener_manager_impl.h b/source/server/listener_manager_impl.h index 45d8b65de7dc..9d252d49fb36 100644 --- a/source/server/listener_manager_impl.h +++ b/source/server/listener_manager_impl.h @@ -145,6 +145,9 @@ class DrainingFilterChainsManager { public: DrainingFilterChainsManager(ListenerImplPtr&& draining_listener, uint64_t workers_pending_removal); + DrainingFilterChainsManager(ListenerImpl& draining_listener, + uint64_t workers_pending_removal); + uint64_t getDrainingListenerTag() const { return draining_listener_->listenerTag(); } const std::list& getDrainingFilterChains() const { return draining_filter_chains_; @@ -214,7 +217,8 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable filter_chains) override; private: using ListenerList = std::list; /**