diff --git a/internal/ir/xds.go b/internal/ir/xds.go index 7afee1b39d5a..5c010858f62a 100644 --- a/internal/ir/xds.go +++ b/internal/ir/xds.go @@ -205,6 +205,9 @@ type HTTPRoute struct { Redirect *Redirect // Destinations associated with this matched route. Destinations []*RouteDestination + // RateLimit defines the more specific match conditions as well as limits for ratelimiting + // the requests on this route. + RateLimit *RateLimit } // Validate the fields within the HTTPRoute structure @@ -527,3 +530,44 @@ func (h UDPListener) Validate() error { } return errs } + +// RateLimit holds the rate limiting configuration. +// +k8s:deepcopy-gen=true +type RateLimit struct { + // Global rate limit settings. + Global *GlobalRateLimit +} + +// GlobalRateLimit holds the global rate limiting configuration. +// +k8s:deepcopy-gen=true +type GlobalRateLimit struct { + // Rules for rate limiting. + Rules []*RateLimitRule +} + +// RateLimitRule holds the match and limit configuration for ratelimiting. +// +k8s:deepcopy-gen=true +type RateLimitRule struct { + // HeaderMatches define the match conditions on the request headers for this route. + HeaderMatches []*StringMatch + // Limit holds the rate limit values. + Limit *RateLimitValue +} + +type RateLimitUnit string + +const ( + Second RateLimitUnit = "second" + Minute RateLimitUnit = "minute" + Hour RateLimitUnit = "hour" + Day RateLimitUnit = "day" +) + +// RateLimitValue holds the +// +k8s:deepcopy-gen=true +type RateLimitValue struct { + // Requests are the number of requests that need to be rate limited. + Requests uint32 + // Unit of rate limiting. + Unit RateLimitUnit +} diff --git a/internal/ir/zz_generated.deepcopy.go b/internal/ir/zz_generated.deepcopy.go index 6458241fdc0f..a48a4818a271 100644 --- a/internal/ir/zz_generated.deepcopy.go +++ b/internal/ir/zz_generated.deepcopy.go @@ -49,6 +49,32 @@ func (in *DirectResponse) DeepCopy() *DirectResponse { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GlobalRateLimit) DeepCopyInto(out *GlobalRateLimit) { + *out = *in + if in.Rules != nil { + in, out := &in.Rules, &out.Rules + *out = make([]*RateLimitRule, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(RateLimitRule) + (*in).DeepCopyInto(*out) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalRateLimit. +func (in *GlobalRateLimit) DeepCopy() *GlobalRateLimit { + if in == nil { + return nil + } + out := new(GlobalRateLimit) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPListener) DeepCopyInto(out *HTTPListener) { *out = *in @@ -172,6 +198,11 @@ func (in *HTTPRoute) DeepCopyInto(out *HTTPRoute) { } } } + if in.RateLimit != nil { + in, out := &in.RateLimit, &out.RateLimit + *out = new(RateLimit) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPRoute. @@ -293,6 +324,72 @@ func (in *ProxyListener) DeepCopy() *ProxyListener { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimit) DeepCopyInto(out *RateLimit) { + *out = *in + if in.Global != nil { + in, out := &in.Global, &out.Global + *out = new(GlobalRateLimit) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimit. +func (in *RateLimit) DeepCopy() *RateLimit { + if in == nil { + return nil + } + out := new(RateLimit) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitRule) DeepCopyInto(out *RateLimitRule) { + *out = *in + if in.HeaderMatches != nil { + in, out := &in.HeaderMatches, &out.HeaderMatches + *out = make([]*StringMatch, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(StringMatch) + (*in).DeepCopyInto(*out) + } + } + } + if in.Limit != nil { + in, out := &in.Limit, &out.Limit + *out = new(RateLimitValue) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitRule. +func (in *RateLimitRule) DeepCopy() *RateLimitRule { + if in == nil { + return nil + } + out := new(RateLimitRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RateLimitValue) DeepCopyInto(out *RateLimitValue) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RateLimitValue. +func (in *RateLimitValue) DeepCopy() *RateLimitValue { + if in == nil { + return nil + } + out := new(RateLimitValue) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Redirect) DeepCopyInto(out *Redirect) { *out = *in diff --git a/internal/xds/translator/listener.go b/internal/xds/translator/listener.go index 7d63575ddf51..5ade82786e81 100644 --- a/internal/xds/translator/listener.go +++ b/internal/xds/translator/listener.go @@ -91,6 +91,11 @@ func addXdsHTTPFilterChain(xdsListener *listener.Listener, irListener *ir.HTTPLi }}, } + // TODO: Make this a generic interface for all API Gateway features. + if err := patchHCMWithRateLimit(mgr, irListener); err != nil { + return err + } + mgrAny, err := anypb.New(mgr) if err != nil { return err diff --git a/internal/xds/translator/ratelimit.go b/internal/xds/translator/ratelimit.go new file mode 100644 index 000000000000..1f6388327a90 --- /dev/null +++ b/internal/xds/translator/ratelimit.go @@ -0,0 +1,243 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package translator + +import ( + "strconv" + "time" + + cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + ratelimit "github.com/envoyproxy/go-control-plane/envoy/config/ratelimit/v3" + route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + ratelimitfilter "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3" + hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" + wkt "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/envoyproxy/gateway/internal/ir" +) + +func patchHCMWithRateLimit(mgr *hcm.HttpConnectionManager, irListener *ir.HTTPListener) error { + // Return early if rate limits dont exist + if !isRateLimitPresent(irListener) { + return nil + } + + // Return early if filter already exists. + for _, httpFilter := range mgr.HttpFilters { + if httpFilter.Name == wkt.HTTPRateLimit { + return nil + } + } + + rateLimitFilter := buildRateLimitFilter(irListener) + // Make sure the router filter is the terminal filter in the chain + mgr.HttpFilters = append([]*hcm.HttpFilter{rateLimitFilter}, mgr.HttpFilters...) + return nil +} + +func isRateLimitPresent(irListener *ir.HTTPListener) bool { + // Return true if rate limit config exists. + for _, route := range irListener.Routes { + if route.RateLimit != nil && route.RateLimit.Global != nil { + return true + } + } + return false +} + +func buildRateLimitFilter(irListener *ir.HTTPListener) *hcm.HttpFilter { + rateLimitFilterProto := &ratelimitfilter.RateLimit{ + Domain: getRateLimitDomain(irListener), + RateLimitService: &ratelimit.RateLimitServiceConfig{ + GrpcService: &core.GrpcService{ + TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ + ClusterName: getRateLimitServiceClusterName(), + }, + }, + }, + TransportApiVersion: core.ApiVersion_V3, + }, + } + + any, err := anypb.New(rateLimitFilterProto) + if err != nil { + return nil + } + + rateLimitFilter := &hcm.HttpFilter{ + Name: wkt.HTTPRateLimit, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: any, + }, + } + return rateLimitFilter +} + +func PatchRouteWithRateLimit(xdsRouteAction *route.RouteAction, irRoute *ir.HTTPRoute) error { + // Return early if no rate limit config exists. + if irRoute.RateLimit == nil || irRoute.RateLimit.Global == nil { + return nil + } + + rateLimits := buildRouteRateLimits(irRoute.Name, irRoute.RateLimit.Global) + xdsRouteAction.RateLimits = rateLimits + return nil +} + +func buildRouteRateLimits(descriptorPrefix string, global *ir.GlobalRateLimit) []*route.RateLimit { + rateLimits := []*route.RateLimit{} + // Rules are ORed + for rIdx, rule := range global.Rules { + rlActions := []*route.RateLimit_Action{} + // Matches are ANDed + for mIdx, match := range rule.HeaderMatches { + if match.Exact == nil && match.Prefix == nil && match.SafeRegex == nil { + // Setup RequestHeader actions + descriptorKey := getRateLimitDescriptorKey(descriptorPrefix, rIdx, mIdx) + action := &route.RateLimit_Action{ + ActionSpecifier: &route.RateLimit_Action_RequestHeaders_{ + RequestHeaders: &route.RateLimit_Action_RequestHeaders{ + HeaderName: match.Name, + DescriptorKey: descriptorKey, + }, + }, + } + rlActions = append(rlActions, action) + } else { + // Setup HeaderValueMatch actions + descriptorVal := getRateLimitDescriptorValue(descriptorPrefix, rIdx, mIdx) + headerMatcher := buildHeaderMatcher(match) + action := &route.RateLimit_Action{ + ActionSpecifier: &route.RateLimit_Action_HeaderValueMatch_{ + HeaderValueMatch: &route.RateLimit_Action_HeaderValueMatch{ + DescriptorValue: descriptorVal, + ExpectMatch: &wrapperspb.BoolValue{ + Value: true, + }, + Headers: []*route.HeaderMatcher{headerMatcher}, + }, + }, + } + rlActions = append(rlActions, action) + } + } + + rateLimit := &route.RateLimit{Actions: rlActions} + rateLimits = append(rateLimits, rateLimit) + } + + return rateLimits +} + +func buildHeaderMatcher(match *ir.StringMatch) *route.HeaderMatcher { + var stringMatcher *matcher.StringMatcher + + if match.Exact != nil { + stringMatcher = &matcher.StringMatcher{ + MatchPattern: &matcher.StringMatcher_Exact{ + Exact: *match.Exact, + }, + } + } + if match.Prefix != nil { + stringMatcher = &matcher.StringMatcher{ + MatchPattern: &matcher.StringMatcher_Prefix{ + Prefix: *match.Prefix, + }, + } + } + if match.SafeRegex != nil { + stringMatcher = &matcher.StringMatcher{ + MatchPattern: &matcher.StringMatcher_SafeRegex{ + SafeRegex: &matcher.RegexMatcher{ + Regex: *match.SafeRegex, + EngineType: &matcher.RegexMatcher_GoogleRe2{ + GoogleRe2: &matcher.RegexMatcher_GoogleRE2{}, + }, + }, + }, + } + } + + return &route.HeaderMatcher{ + Name: match.Name, + HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ + StringMatch: stringMatcher, + }, + } +} + +func buildRateLimitServiceCluster(irListener *ir.HTTPListener) (*cluster.Cluster, error) { + // Return early if rate limits dont exist. + if !isRateLimitPresent(irListener) { + return nil, nil + } + + clusterName := getRateLimitServiceClusterName() + host, port := getRateLimitServiceGrpcHostPort() + rateLimitServerCluster := &cluster.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STRICT_DNS}, + ConnectTimeout: durationpb.New(10 * time.Second), + LbPolicy: cluster.Cluster_RANDOM, + LoadAssignment: &endpoint.ClusterLoadAssignment{ + ClusterName: clusterName, + Endpoints: []*endpoint.LocalityLbEndpoints{ + { + LbEndpoints: []*endpoint.LbEndpoint{ + { + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: host, + PortSpecifier: &core.SocketAddress_PortValue{PortValue: uint32(port)}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Http2ProtocolOptions: &core.Http2ProtocolOptions{}, + DnsRefreshRate: durationpb.New(30 * time.Second), + RespectDnsTtl: true, + DnsLookupFamily: cluster.Cluster_V4_ONLY, + } + return rateLimitServerCluster, nil +} + +func getRateLimitDescriptorKey(prefix string, ruleIndex, matchIndex int) string { + return prefix + "-key-rule-" + strconv.Itoa(ruleIndex) + "-match-" + strconv.Itoa(matchIndex) +} + +func getRateLimitDescriptorValue(prefix string, ruleIndex, matchIndex int) string { + return prefix + "-value-rule-" + strconv.Itoa(ruleIndex) + "-match-" + strconv.Itoa(matchIndex) +} + +func getRateLimitServiceClusterName() string { + return "ratelimit_cluster" +} + +func getRateLimitDomain(irListener *ir.HTTPListener) string { + // Use IR listener name as domain + return irListener.Name +} + +func getRateLimitServiceGrpcHostPort() (string, int) { + return "TODO", 0 +} diff --git a/internal/xds/translator/route.go b/internal/xds/translator/route.go index 6ad00760bcf8..c7e7bece80f5 100644 --- a/internal/xds/translator/route.go +++ b/internal/xds/translator/route.go @@ -40,6 +40,11 @@ func buildXdsRoute(httpRoute *ir.HTTPRoute) (*route.Route, error) { } } + // TODO: convert this into a generic interface for API Gateway features + if err := PatchRouteWithRateLimit(ret.GetRoute(), httpRoute); err != nil { + return nil, err + } + return ret, nil } diff --git a/internal/xds/translator/translator.go b/internal/xds/translator/translator.go index eeaf4a192e1e..dd94178eae40 100644 --- a/internal/xds/translator/translator.go +++ b/internal/xds/translator/translator.go @@ -8,6 +8,7 @@ package translator import ( "errors" + cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -102,6 +103,19 @@ func Translate(ir *ir.Xds) (*types.ResourceVersionTable, error) { } xdsRouteCfg.VirtualHosts = append(xdsRouteCfg.VirtualHosts, vHost) + + // TODO: Make this into a generic interface for API Gateway features + // Check if a ratelimit cluster exists, if not, add it, if its needed. + if rlCluster := findXdsCluster(tCtx, getRateLimitServiceClusterName()); rlCluster == nil { + rlCluster, err := buildRateLimitServiceCluster(httpListener) + if err != nil { + return nil, multierror.Append(err, errors.New("error building ratelimit cluster")) + } + // Add cluster + if rlCluster != nil { + tCtx.AddXdsResource(resource.ClusterType, rlCluster) + } + } } for _, tcpListener := range ir.TCP { @@ -140,6 +154,7 @@ func Translate(ir *ir.Xds) (*types.ResourceVersionTable, error) { } tCtx.AddXdsResource(resource.ListenerType, xdsListener) } + return tCtx, nil } @@ -162,6 +177,22 @@ func findXdsListener(tCtx *types.ResourceVersionTable, address string, port uint return nil } +// findXdsCluster finds a xds cluster with the same name, and returns nil if there is no match. +func findXdsCluster(tCtx *types.ResourceVersionTable, name string) *cluster.Cluster { + if tCtx == nil || tCtx.XdsResources == nil || tCtx.XdsResources[resource.ClusterType] == nil { + return nil + } + + for _, r := range tCtx.XdsResources[resource.ClusterType] { + cluster := r.(*cluster.Cluster) + if cluster.Name == name { + return cluster + } + } + + return nil +} + // findXdsRouteConfig finds an xds route with the name and returns nil if there is no match. func findXdsRouteConfig(tCtx *types.ResourceVersionTable, name string) *route.RouteConfiguration { if tCtx == nil || tCtx.XdsResources == nil || tCtx.XdsResources[resource.RouteType] == nil {