Skip to content

Commit

Permalink
feat(gateway): add listener connection limits (#4755)
Browse files Browse the repository at this point in the history
* feat(gateway): set listener connection limits
* test(gateway): update golden files after change
* test(gateway): add test for connection limit and include more in golden output
* docs(generated): update with MeshGateway change
* docs(gateway): add field comment
* test(e2e): add MeshGateway connection limit e2e test
* feat(gateway): validate resources.connectionLimit for collapsibility
* refactor(gateway): function and variable names
* test(e2e): fix gateway resources test
* test(e2e): increase resources timeout
* test(e2e): test multiple connections both with and without limit

Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont authored Aug 8, 2022
1 parent 7787822 commit 5a5b18d
Show file tree
Hide file tree
Showing 82 changed files with 1,696 additions and 729 deletions.
191 changes: 135 additions & 56 deletions api/mesh/v1alpha1/gateway.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/mesh/v1alpha1/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ message MeshGateway {
}

message Listener {
message Resources { uint32 connection_limit = 1; }

enum Protocol {
NONE = 0;
TCP = 1;
Expand Down Expand Up @@ -121,6 +123,9 @@ message MeshGateway {
// CrossMesh enables traffic to flow to this listener only from other
// meshes.
bool crossMesh = 6;

// Resources is used to specify listener-specific resource settings.
Resources resources = 7;
}

// Conf defines the desired state of MeshGateway.
Expand Down
8 changes: 7 additions & 1 deletion docs/generated/resources/policy_meshgateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,11 @@
- `crossMesh` (optional)

CrossMesh enables traffic to flow to this listener only from other
meshes.
meshes.

- `resources` (optional)

Resources is used to specify listener-specific resource settings.

- `connectionLimit` (optional)

46 changes: 37 additions & 9 deletions pkg/core/resources/apis/mesh/gateway_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,52 @@ func (g *MeshGatewayResource) Validate() error {
return err.OrNil()
}

func validateListenerCompatibility(path validators.PathBuilder, listeners []*mesh_proto.MeshGateway_Listener) validators.ValidationError {
type resourceLimits struct {
connectionLimits map[uint32]struct{}
listeners []int
}

func validateListenerCollapsibility(path validators.PathBuilder, listeners []*mesh_proto.MeshGateway_Listener) validators.ValidationError {
protocolsForPort := map[uint32]map[string][]int{}
hostnamesForPort := map[uint32]map[string][]int{}
limitedListenersForPort := map[uint32]resourceLimits{}

for i, ep := range listeners {
protocols, ok := protocolsForPort[ep.GetPort()]
for i, listener := range listeners {
protocols, ok := protocolsForPort[listener.GetPort()]
if !ok {
protocols = map[string][]int{}
}

hostnames, ok := hostnamesForPort[ep.GetPort()]
hostnames, ok := hostnamesForPort[listener.GetPort()]
if !ok {
hostnames = map[string][]int{}
}

protocols[ep.GetProtocol().String()] = append(protocols[ep.GetProtocol().String()], i)
limitedListeners, ok := limitedListenersForPort[listener.GetPort()]
if !ok {
limitedListeners = resourceLimits{
connectionLimits: map[uint32]struct{}{},
}
}

protocols[listener.GetProtocol().String()] = append(protocols[listener.GetProtocol().String()], i)

// An empty hostname is the same as "*", i.e. matches all hosts.
hostname := ep.GetHostname()
hostname := listener.GetHostname()
if hostname == "" {
hostname = mesh_proto.WildcardHostname
}

hostnames[hostname] = append(hostnames[hostname], i)

hostnamesForPort[ep.GetPort()] = hostnames
protocolsForPort[ep.GetPort()] = protocols
if l := listener.GetResources().GetConnectionLimit(); l != 0 {
limitedListeners.listeners = append(limitedListeners.listeners, i)
limitedListeners.connectionLimits[l] = struct{}{}
}

hostnamesForPort[listener.GetPort()] = hostnames
protocolsForPort[listener.GetPort()] = protocols
limitedListenersForPort[listener.GetPort()] = limitedListeners
}

err := validators.ValidationError{}
Expand Down Expand Up @@ -104,6 +123,15 @@ func validateListenerCompatibility(path validators.PathBuilder, listeners []*mes
}
}

for _, listeners := range limitedListenersForPort {
if len(listeners.connectionLimits) <= 1 {
continue
}
for _, index := range listeners.listeners {
err.AddViolationAt(path.Index(index).Field("resources").Field("connectionLimit"), "conflicting values for this port")
}
}

return err
}

Expand Down Expand Up @@ -198,7 +226,7 @@ func validateMeshGatewayConf(path validators.PathBuilder, conf *mesh_proto.MeshG
}))
}

err.Add(validateListenerCompatibility(path, conf.GetListeners()))
err.Add(validateListenerCollapsibility(path, conf.GetListeners()))

return err
}
35 changes: 34 additions & 1 deletion pkg/core/resources/apis/mesh/gateway_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,29 @@ conf:
tags:
name: http`,
),
Entry("listeners with connectionLimits", `
type: MeshGateway
name: gateway
mesh: default
selectors:
- match:
kuma.io/service: gateway
tags:
product: edge
conf:
listeners:
- protocol: HTTP
hostname: one.com
port: 99
resources:
connectionLimit: 2
- protocol: HTTP
hostname: two.com
port: 99
resources:
connectionLimit: 2
`,
),
)

DescribeErrorCases(
Expand Down Expand Up @@ -398,7 +421,7 @@ conf:
protocol: TCP
`),

ErrorCases("hostname and protocol conflict",
ErrorCases("hostname, protocol and resource conflict",
[]validators.Violation{{
Field: "conf.listeners[0]",
Message: "protocol conflicts with other listeners on this port",
Expand All @@ -411,6 +434,12 @@ conf:
}, {
Field: "conf.listeners[1]",
Message: "multiple listeners for hostname on this port",
}, {
Field: "conf.listeners[0].resources.connectionLimit",
Message: "conflicting values for this port",
}, {
Field: "conf.listeners[1].resources.connectionLimit",
Message: "conflicting values for this port",
}}, `
type: MeshGateway
name: gateway
Expand All @@ -423,11 +452,15 @@ conf:
- hostname: www-1.example.com
port: 443
protocol: TCP
resources:
connectionLimit: 2
- hostname: www-1.example.com
port: 443
protocol: HTTPS
tls:
mode: PASSTHROUGH
resources:
connectionLimit: 1
`),
)
})
43 changes: 37 additions & 6 deletions pkg/plugins/runtime/gateway/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"strings"

envoy_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/match"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/merge"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_listeners "github.com/kumahq/kuma/pkg/xds/envoy/listeners"
envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names"
Expand Down Expand Up @@ -60,6 +62,7 @@ type GatewayListener struct {
// CrossMesh is important because for generation we need to treat such a
// listener as if we have HTTPS with the Mesh cert for this Dataplane
CrossMesh bool
Resources *mesh_proto.MeshGateway_Listener_Resources // TODO verify these don't conflict when merging
}

// GatewayListenerInfo holds everything needed to generate resources for a
Expand Down Expand Up @@ -197,6 +200,8 @@ func (g Generator) Generate(ctx xds_context.Context, proxy *core_xds.Proxy) (*co
return nil, errors.Wrap(err, "error generating listener info from Proxy")
}

var limits []RuntimeResoureLimitListener

for _, info := range listenerInfos {
// This is checked by the gateway validator
if !SupportsProtocol(info.Listener.Protocol) {
Expand All @@ -209,25 +214,50 @@ func (g Generator) Generate(ctx xds_context.Context, proxy *core_xds.Proxy) (*co
}
resources.AddSet(cdsResources)

ldsResources, err := g.generateLDS(ctx, info, info.HostInfos)
ldsResources, limit, err := g.generateLDS(ctx, info, info.HostInfos)
if err != nil {
return nil, err
}
resources.AddSet(ldsResources)

if limit != nil {
limits = append(limits, *limit)
}

rdsResources, err := g.generateRDS(ctx, info, info.HostInfos)
if err != nil {
return nil, err
}
resources.AddSet(rdsResources)
}

resources.Add(g.generateRTDS(limits))

return resources, nil
}

func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, error) {
func (g Generator) generateRTDS(limits []RuntimeResoureLimitListener) *core_xds.Resource {
layer := map[string]interface{}{}
for _, limit := range limits {
layer[fmt.Sprintf("envoy.resource_limits.listener.%s.connection_limit", limit.Name)] = limit.ConnectionLimit
}

res := &core_xds.Resource{
Name: "gateway.listeners",
Origin: OriginGateway,
Resource: &envoy_service_runtime_v3.Runtime{
Name: "gateway.listeners",
Layer: util_proto.MustStruct(layer),
},
}

return res
}

func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, *RuntimeResoureLimitListener, error) {
resources := core_xds.NewResourceSet()
listenerBuilder := GenerateListener(info)

listenerBuilder, limit := GenerateListener(info)

var gatewayHosts []GatewayHost
for _, hostInfo := range hostInfos {
Expand All @@ -240,7 +270,7 @@ func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo
}
res, filterChainBuilders, err := g.FilterChainGenerators.FilterChainGenerators[protocol].Generate(ctx, info, gatewayHosts)
if err != nil {
return nil, err
return nil, limit, err
}
resources.AddSet(res)

Expand All @@ -250,11 +280,11 @@ func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo

res, err = BuildResourceSet(listenerBuilder)
if err != nil {
return nil, errors.Wrapf(err, "failed to build listener resource")
return nil, limit, errors.Wrapf(err, "failed to build listener resource")
}
resources.AddSet(res)

return resources, nil
return resources, limit, nil
}

func (g Generator) generateCDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, error) {
Expand Down Expand Up @@ -323,6 +353,7 @@ func MakeGatewayListener(
listeners[0].GetPort(),
),
CrossMesh: listeners[0].CrossMesh,
Resources: listeners[0].GetResources(),
}

// Hostnames must be unique to a listener to remove ambiguity
Expand Down
24 changes: 20 additions & 4 deletions pkg/plugins/runtime/gateway/listener_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ func SupportsProtocol(p mesh_proto.MeshGateway_Listener_Protocol) bool {
}
}

func GenerateListener(info GatewayListenerInfo) *envoy_listeners.ListenerBuilder {
type RuntimeResoureLimitListener struct {
Name string
ConnectionLimit uint32
}

func GenerateListener(info GatewayListenerInfo) (*envoy_listeners.ListenerBuilder, *RuntimeResoureLimitListener) {
// TODO(jpeach) what we really need to do here is to
// generate a HTTP filter chain for each
// host on the same HTTPConnectionManager. Each HTTP filter
Expand All @@ -48,18 +53,29 @@ func GenerateListener(info GatewayListenerInfo) *envoy_listeners.ListenerBuilder
"protocol", protocol,
)

// TODO(jpeach) if proxy protocol is enabled, add the proxy protocol listener filter.
name := envoy_names.GetGatewayListenerName(info.Gateway.Meta.GetName(), protocol.String(), port)

var limits *RuntimeResoureLimitListener
if resources := info.Listener.Resources; resources != nil {
if resources.ConnectionLimit > 0 {
limits = &RuntimeResoureLimitListener{
Name: name,
ConnectionLimit: resources.ConnectionLimit,
}
}
}

// TODO(jpeach) if proxy protocol is enabled, add the proxy protocol listener filter.
return envoy_listeners.NewListenerBuilder(info.Proxy.APIVersion).
Configure(
envoy_listeners.InboundListener(
envoy_names.GetGatewayListenerName(info.Gateway.Meta.GetName(), protocol.String(), port),
name,
address, port, core_xds.SocketAddressProtocolTCP),
// Limit default buffering for edge connections.
envoy_listeners.ConnectionBufferLimit(DefaultConnectionBuffer),
// Roughly balance incoming connections.
envoy_listeners.EnableReusePort(true),
// Always sniff for TLS.
envoy_listeners.TLSInspector(),
)
), limits
}
20 changes: 18 additions & 2 deletions pkg/plugins/runtime/gateway/listener_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gateway_test
import (
"path"

envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/ghodss/yaml"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -84,7 +83,7 @@ data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBM3ZWM1cvNX
snap, err := Do(gateway)
Expect(err).To(Succeed())

out, err := yaml.Marshal(MakeProtoResource(snap.Resources[envoy_types.Listener]))
out, err := yaml.Marshal(MakeProtoSnapshot(snap))
Expect(err).To(Succeed())

Expect(out).To(matchers.MatchGoldenYAML(path.Join("testdata", golden)))
Expand Down Expand Up @@ -218,5 +217,22 @@ conf:
tags:
name: example.com
`),

Entry("should add connection limits",
"connection-limited-listener.yaml", `
type: MeshGateway
mesh: default
name: default-gateway
selectors:
- match:
kuma.io/service: gateway-default
conf:
listeners:
- port: 443
protocol: TCP
hostname: bar.example.com
resources:
connectionLimit: 10000
`),
)
})
Loading

0 comments on commit 5a5b18d

Please sign in to comment.