Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature : Listener Filter Chain Discovery Service #23096

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ proto_library(
"//envoy/service/event_reporting/v3:pkg",
"//envoy/service/ext_proc/v3:pkg",
"//envoy/service/extension/v3:pkg",
"//envoy/service/filter_chain/v3:pkg",
"//envoy/service/health/v3:pkg",
"//envoy/service/listener/v3:pkg",
"//envoy/service/load_stats/v3:pkg",
Expand Down
11 changes: 10 additions & 1 deletion api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import "udpa/annotations/security.proto";
import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
import "validate/validate.proto";
import "envoy/config/core/v3/config_source.proto";

option java_package = "io.envoyproxy.envoy.config.listener.v3";
option java_outer_classname = "ListenerProto";
Expand Down Expand Up @@ -54,7 +55,7 @@ message ListenerCollection {
repeated xds.core.v3.CollectionEntry entries = 1;
}

// [#next-free-field: 34]
// [#next-free-field: 35]
message Listener {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener";

Expand Down Expand Up @@ -148,6 +149,9 @@ message Listener {
// :ref:`FAQ entry <faq_how_to_setup_sni>`.
repeated FilterChain filter_chains = 3;

// Listener FCDS: Listener Filter Chain Discovery Service config block. filter_chains and fcds are mutually exclusive
ListenerFcds fcds_cfg = 34;

// :ref:`Matcher API <arch_overview_matching_listener>` resolving the filter chain name from the
// network properties. This matcher is used as a replacement for the filter chain match condition
// :ref:`filter_chain_match
Expand Down Expand Up @@ -373,3 +377,8 @@ message Listener {
// :ref:`global_downstream_max_connections <config_overload_manager_limiting_connections>`.
bool ignore_global_conn_limit = 31;
}

message ListenerFcds{
string name = 1 [(validate.rules).string.min_len = 1];
envoy.config.core.v3.ConfigSource config_source = 2 [(validate.rules).message.required = true];
}
15 changes: 15 additions & 0 deletions api/envoy/service/filter_chain/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
has_services = True,
deps = [
"//envoy/annotations:pkg",
"//envoy/api/v2:pkg",
"//envoy/service/discovery/v3:pkg",
"@com_github_cncf_udpa//udpa/annotations:pkg",
],
)
46 changes: 46 additions & 0 deletions api/envoy/service/filter_chain/fcds.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
syntax = "proto3";

package envoy.service.filter_chain.v3;

import "envoy/service/discovery/v3/discovery.proto";

import "google/api/annotations.proto";

import "envoy/annotations/resource.proto";
import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";

option java_package = "io.envoyproxy.envoy.service.filter_chain.v3";
option java_outer_classname = "FcdsProto";
option java_multiple_files = true;
option java_generic_services = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: FCDS]

// The resource_names field in DiscoveryRequest specifies a route configuration.
// This allows an Envoy configuration with multiple HTTP listeners (and
// associated HTTP connection manager filters) to use different route
// configurations. Each listener will bind its HTTP connection manager filter to
// a route table via this identifier.
service ListenerFcdsDiscoveryService {
option (envoy.annotations.resource).type = "envoy.config.litener.v3.FilterChain";

rpc StreamFilterChains(stream discovery.v3.DiscoveryRequest)
returns (stream discovery.v3.DiscoveryResponse) {
}

rpc DeltaFilterChains(stream discovery.v3.DeltaDiscoveryRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet implemented in this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @adisuissa Delta is not implemented in this PR. I am working on it and will submit using a subsequent PR. Hope that works.

returns (stream discovery.v3.DeltaDiscoveryResponse) {
}

rpc FetchFilterChains(discovery.v3.DiscoveryRequest) returns (discovery.v3.DiscoveryResponse) {
option (google.api.http).post = "/v3/discovery:filterchains";
option (google.api.http).body = "*";
}
}

// [#not-implemented-hide:] Not configuration. Workaround c++ protobuf issue with importing
// services: https://github.com/google/protobuf/issues/4221 and protoxform to upgrade the file.
message FcdsDummy {
}
1 change: 1 addition & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ proto_library(
"//envoy/service/rate_limit_quota/v3:pkg",
"//envoy/service/ratelimit/v3:pkg",
"//envoy/service/route/v3:pkg",
"//envoy/service/filter_chain/v3:pkg",
"//envoy/service/runtime/v3:pkg",
"//envoy/service/secret/v3:pkg",
"//envoy/service/status/v3:pkg",
Expand Down
6 changes: 6 additions & 0 deletions envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ class ListenerManager {
* @return TRUE if the worker has started or FALSE if not.
*/
virtual bool isWorkerStarted() PURE;

/*
* Listener FCDS: This method is invoke to drain removed or modified filter chains' connections
*/
virtual void startDrainingSequenceForListenerFcds(std::string draining_listener,
std::list<Network::DrainableFilterChainSharedPtr> filter_chains) PURE;
};

// overload operator| to allow ListenerManager::listeners(ListenerState) to be called using a
Expand Down
11 changes: 9 additions & 2 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,8 @@ envoy_cc_library(

envoy_cc_library(
name = "filter_chain_manager_lib",
srcs = ["filter_chain_manager_impl.cc"],
hdrs = ["filter_chain_manager_impl.h"],
srcs = ["filter_chain_manager_impl.cc", "fcds_impl.cc"],
hdrs = ["filter_chain_manager_impl.h", "fcds_impl.h"],
deps = [
":filter_chain_factory_context_callback",
"//envoy/config:typed_metadata_interface",
Expand All @@ -527,7 +527,13 @@ envoy_cc_library(
"//source/common/network/matching:data_impl_lib",
"//source/common/network/matching:inputs_lib",
"//source/server:configuration_lib",
"//source/common/config:api_version_lib",
"//source/common/common:callback_impl_lib",
"//source/common/config:subscription_base_interface",
"//source/common/protobuf:utility_lib",
"@envoy_api//envoy/config/listener/v3:pkg_cc_proto",
"@envoy_api//envoy/service/filter_chain/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

Expand Down Expand Up @@ -579,6 +585,7 @@ envoy_cc_library(
":guarddog_lib",
":listener_hooks_lib",
":listener_manager_lib",
":filter_chain_manager_lib",
":ssl_context_manager_lib",
":worker_lib",
"//envoy/event:dispatcher_interface",
Expand Down
211 changes: 211 additions & 0 deletions source/server/fcds_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>

#include "envoy/admin/v3/config_dump.pb.h"
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "source/common/common/assert.h"
#include "source/common/common/fmt.h"
#include "source/common/config/api_version.h"
#include "source/common/config/utility.h"
#include "source/common/http/header_map_impl.h"
#include "source/common/protobuf/utility.h"
#include "source/server/fcds_impl.h"
#include "source/server/filter_chain_manager_impl.h"

namespace Envoy {
namespace Server {

FcdsApi::FcdsApi(const envoy::config::listener::v3::Fcds& fcds,
Configuration::FactoryContext& parent_context, Init::Manager& init_manager,
FilterChainManagerImpl& filter_chain_manager,
FilterChainFactoryBuilder* fc_builder)
: Envoy::Config::SubscriptionBase<envoy::config::listener::v3::FilterChain>(
parent_context.messageValidationContext().dynamicValidationVisitor(), "name"),
fcds_name_(fcds.fcds_name()),
parent_init_target_(fmt::format("FcdsConfigSubscription parent_init_target {}", fcds_name_),
[this]() { local_init_manager_.initialize(local_init_watcher_); }),
local_init_watcher_(fmt::format("Fcds local_init_watcher {}", fcds.fcds_name()),
[this]() { parent_init_target_.ready(); }),
local_init_target_(fmt::format("FcdsConfigSubscription local_init_target {}", fcds_name_),
[this]() { fcds_subscription_->start({fcds_name_}); }),
local_init_manager_(fmt::format("FCDS local_init_manager {}", fcds_name_)),
init_manager_(init_manager), cluster_manager_(parent_context.clusterManager()),
scope_(parent_context.listenerScope().createScope("")),
filter_chain_manager_(filter_chain_manager), fc_builder_(fc_builder)
{

const auto resource_name = getResourceName();

fcds_subscription_ = cluster_manager_.subscriptionFactory().subscriptionFromConfigSource(
fcds.config_source(), Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_,
{});
}

FcdsApi::~FcdsApi() {
// If we get destroyed during initialization, make sure we signal that we "initialized".
local_init_target_.ready();
}

void FcdsApi::initialize() {
local_init_manager_.add(local_init_target_);
init_manager_.add(parent_init_target_);
}

void FcdsApi::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
const std::string& version_info) {

if (!validateUpdateSize(resources.size())) {
return;
}

// Construct the list of filter chains to be removed
std::unordered_set<std::string> fc_to_remove;
auto fc_name_to_resource_map = filter_chain_manager_.getFcdsResources();
auto itr = fc_name_to_resource_map.begin();
while (itr != fc_name_to_resource_map.end()) {
fc_to_remove.insert(itr->first);
++itr;
}

// Remove the list of FCs in the incoming FCDS config from the above list
for (const auto& filter_chain : resources) {
fc_to_remove.erase(filter_chain.get().name());
}

Protobuf::RepeatedPtrField<std::string> to_be_deleted_fc_list;
for (const auto& filter_chain : fc_to_remove) {
*to_be_deleted_fc_list.Add() = filter_chain;
}

// Delete the old fcs and add the new fcs
onConfigUpdate(resources, to_be_deleted_fc_list, version_info);
}

void FcdsApi::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string&) {

// Get all the existing filter chains
auto& fc_name_to_resource_map = filter_chain_manager_.getFcdsResources();
auto& fc_msg_to_ptr_map = filter_chain_manager_.filterChainsByMessage();
auto& fc_name_to_hash_map = filter_chain_manager_.getFcHashes();

// Delete and drain the filter chains which are missing in this config update
std::vector<envoy::config::listener::v3::FilterChain> fc_to_del_vector;

for (const std::string& fc_name : removed_resources) {
ENVOY_LOG(debug, "fcds: removing filter chain from FCM: fcds name = {} resource={}", fcds_name_,
fc_name);
auto itr = fc_name_to_resource_map.find(fc_name);
if (itr != fc_name_to_resource_map.end()) {
const envoy::config::listener::v3::FilterChain& fc = itr->second;
// Add this fc to the to-be-deleted list
fc_to_del_vector.push_back(fc);

// Add this fc to the to-be-drained list
filter_chain_manager_.addFcToFcdsDrainingList(fc_msg_to_ptr_map.find(fc)->second);
}
}

if (fc_to_del_vector.size()) {
filter_chain_manager_.removeFilterChains(fc_to_del_vector);
}

// Add/Update the filter chains which are part of the new update
std::vector<const envoy::config::listener::v3::FilterChain*> fc_to_add_vector;

for (const auto& fcds_resource : added_resources) {
envoy::config::listener::v3::FilterChain fc_config;
fc_config = dynamic_cast<const envoy::config::listener::v3::FilterChain&>(
fcds_resource.get().resource());

// Ignore filter chains without a name
if (fc_config.name().size() == 0) {
ENVOY_LOG(debug,
"fcds: skipping this filter chain config update due to missing filter chain name");
continue;
}

fc_to_add_vector.clear();
fc_to_del_vector.clear();
std::string fc_name = fc_config.name();
ENVOY_LOG(debug, "fcds: processing config update for fc={}", fc_name);

// Updating an existing filter chain
if (fc_name_to_hash_map.find(fc_config.name()) != fc_name_to_hash_map.end()) {
// Ignore the config update if the old and new configs are same
auto hash = MessageUtil::hash(fc_config);
auto itr1 = fc_name_to_hash_map.find(fc_name);
if (hash == itr1->second) {
ENVOY_LOG(debug, "fcds: new and old filter chain config are same. Ignoring update for fc={}",
fc_name);
continue;
}

ENVOY_LOG(
debug,
"fcds: update for an existing filter chain, performing deletion and addition for fc={}",
fc_name);

auto itr2 = fc_name_to_resource_map.find(fc_name);
if (itr2 != fc_name_to_resource_map.end()) {
const envoy::config::listener::v3::FilterChain& fc = itr2->second;
filter_chain_manager_.addFcToFcdsDrainingList(fc_msg_to_ptr_map.find(fc)->second);

// Add the new filter chain
fc_to_add_vector.push_back(&fc_config);
// Delete the old filter chain
fc_to_del_vector.push_back(fc);
}

} else {
//Incoming filter chain is brand new
ENVOY_LOG(debug, "fcds: adding a brand new filter chain: fcds name = {} resource_name={}",
fcds_name_, fc_config.name());

// Add this filter chain
fc_to_add_vector.push_back(&fc_config);
}

// Finally invoke the deletion call
if (fc_to_del_vector.size()) {
filter_chain_manager_.removeFilterChains(fc_to_del_vector);
}

// Finally invoke the addition call
filter_chain_manager_.addFilterChains(fc_to_add_vector, nullptr, *fc_builder_,
filter_chain_manager_, true);
}

filter_chain_manager_.startDrainingSequenceForListenerFcds();

local_init_target_.ready();
}

void FcdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
local_init_target_.ready();
}

bool FcdsApi::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
ENVOY_LOG(debug, "Missing Filter Chain Configuration for {} in onConfigUpdate()", fcds_name_);
local_init_target_.ready();
return false;
}

ENVOY_LOG(debug, "fcds: recieved {} filter chain for fcds {}", num_resources, fcds_name_);
return true;
}

} // namespace Server
} // namespace Envoy

Loading