From 762eb4291738e99a849d5fe69e62ff11d64182b2 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Sat, 13 Jul 2024 11:48:26 +0900 Subject: [PATCH] fix: prevent xdsIR updates from overwriting RateLimit configs from other xdsIR (#3771) * fix: prevent xdsIR updates from overwriting RateLimit configs from other xdsIR Signed-off-by: Kensei Nakada * fix: handle deletion events appropriately Signed-off-by: Kensei Nakada * test: add a unit test for subscribeAndTranslate Signed-off-by: Kensei Nakada * chore: sort import order Signed-off-by: Kensei Nakada --------- Signed-off-by: Kensei Nakada Co-authored-by: zirain --- internal/globalratelimit/runner/runner.go | 23 +- .../globalratelimit/runner/runner_test.go | 244 ++++++++++++++++++ 2 files changed, 262 insertions(+), 5 deletions(-) create mode 100644 internal/globalratelimit/runner/runner_test.go diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go index d4c1d3fd50f..0a2d987c182 100644 --- a/internal/globalratelimit/runner/runner.go +++ b/internal/globalratelimit/runner/runner.go @@ -17,6 +17,7 @@ import ( "strconv" discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + cachetype "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" @@ -109,17 +110,27 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) { } } +func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Resource) types.XdsResources { + xdsResourcesToUpdate := types.XdsResources{} + for _, xdsR := range rateLimitConfigsCache { + xdsResourcesToUpdate[resourcev3.RateLimitConfigType] = append(xdsResourcesToUpdate[resourcev3.RateLimitConfigType], xdsR...) + } + + return xdsResourcesToUpdate +} + func (r *Runner) subscribeAndTranslate(ctx context.Context) { + // rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key. + rateLimitConfigsCache := map[string][]cachetype.Resource{} + // Subscribe to resources. message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx), func(update message.Update[string, *ir.Xds], errChan chan error) { r.Logger.Info("received a notification") if update.Delete { - if err := r.addNewSnapshot(ctx, nil); err != nil { - r.Logger.Error(err, "failed to update the config snapshot") - errChan <- err - } + delete(rateLimitConfigsCache, update.Key) + r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) } else { // Translate to ratelimit xDS Config. rvt, err := r.translate(update.Value) @@ -130,7 +141,9 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Update ratelimit xDS config cache. if rvt != nil { - r.updateSnapshot(ctx, rvt.XdsResources) + // Build XdsResources to use for the snapshot update from the cache. + rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType] + r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) } } }, diff --git a/internal/globalratelimit/runner/runner_test.go b/internal/globalratelimit/runner/runner_test.go new file mode 100644 index 00000000000..e25f714792b --- /dev/null +++ b/internal/globalratelimit/runner/runner_test.go @@ -0,0 +1,244 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package runner + +import ( + "context" + "fmt" + "testing" + "time" + + cachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + rlsconfv3 "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/envoygateway/config" + "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit" + "github.com/envoyproxy/gateway/internal/ir" + "github.com/envoyproxy/gateway/internal/message" +) + +func Test_subscribeAndTranslate(t *testing.T) { + t.Parallel() + + testxds := func(gwName string) *ir.Xds { + return &ir.Xds{ + HTTP: []*ir.HTTPListener{ + { + CoreListenerDetails: ir.CoreListenerDetails{ + Name: fmt.Sprintf("default/%s/listener-0", gwName), + }, + Routes: []*ir.HTTPRoute{ + { + Name: "route-0", + Traffic: &ir.TrafficFeatures{ + RateLimit: &ir.RateLimit{ + Global: &ir.GlobalRateLimit{ + Rules: []*ir.RateLimitRule{ + { + HeaderMatches: []*ir.StringMatch{ + { + Name: "x-user-id", + Distinct: true, + }, + }, + Limit: ir.RateLimitValue{ + Requests: 100, + Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute), + }, + }, + { + HeaderMatches: []*ir.StringMatch{ + { + Name: "x-another-user-id", + Distinct: true, + }, + }, + Limit: ir.RateLimitValue{ + Requests: 10, + Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitSecond), + }, + }, + }, + }, + }, + }, + }, + { + Name: "route-1", + Traffic: &ir.TrafficFeatures{ + RateLimit: &ir.RateLimit{ + Global: &ir.GlobalRateLimit{ + Rules: []*ir.RateLimitRule{ + { + HeaderMatches: []*ir.StringMatch{ + { + Name: "x-user-id", + Distinct: true, + }, + }, + Limit: ir.RateLimitValue{ + Requests: 100, + Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + } + + testRateLimitConfig := func(gwName string) *rlsconfv3.RateLimitConfig { + return &rlsconfv3.RateLimitConfig{ + Name: fmt.Sprintf("default/%s/listener-0", gwName), + Domain: fmt.Sprintf("default/%s/listener-0", gwName), + Descriptors: []*rlsconfv3.RateLimitDescriptor{ + { + Key: "route-0", + Value: "route-0", + Descriptors: []*rlsconfv3.RateLimitDescriptor{ + { + Key: "rule-0-match-0", + RateLimit: &rlsconfv3.RateLimitPolicy{ + Unit: rlsconfv3.RateLimitUnit_MINUTE, + RequestsPerUnit: 100, + }, + }, + { + Key: "rule-1-match-0", + RateLimit: &rlsconfv3.RateLimitPolicy{ + Unit: rlsconfv3.RateLimitUnit_SECOND, + RequestsPerUnit: 10, + }, + }, + }, + }, + { + Key: "route-1", + Value: "route-1", + Descriptors: []*rlsconfv3.RateLimitDescriptor{ + { + Key: "rule-0-match-0", + RateLimit: &rlsconfv3.RateLimitPolicy{ + Unit: rlsconfv3.RateLimitUnit_MINUTE, + RequestsPerUnit: 100, + }, + }, + }, + }, + }, + } + } + + testCases := []struct { + name string + // xdsIRs contains a list of xds updates that the runner will receive. + xdsIRs []message.Update[string, *ir.Xds] + wantRateLimitConfigs map[string]cachetypes.Resource + }{ + { + name: "one xds is added", + xdsIRs: []message.Update[string, *ir.Xds]{ + { + Key: "gw0", + Value: testxds("gw0"), + }, + }, + wantRateLimitConfigs: map[string]cachetypes.Resource{ + "default/gw0/listener-0": testRateLimitConfig("gw0"), + }, + }, + { + name: "two xds are added", + xdsIRs: []message.Update[string, *ir.Xds]{ + { + Key: "gw0", + Value: testxds("gw0"), + }, + { + Key: "gw1", + Value: testxds("gw1"), + }, + }, + wantRateLimitConfigs: map[string]cachetypes.Resource{ + "default/gw0/listener-0": testRateLimitConfig("gw0"), + "default/gw1/listener-0": testRateLimitConfig("gw1"), + }, + }, + { + name: "one xds is deleted", + xdsIRs: []message.Update[string, *ir.Xds]{ + { + Key: "gw0", + Value: testxds("gw0"), + }, + { + Key: "gw1", + Value: testxds("gw1"), + }, + { + Key: "gw0", + Delete: true, + }, + }, + wantRateLimitConfigs: map[string]cachetypes.Resource{ + "default/gw1/listener-0": testRateLimitConfig("gw1"), + }, + }, + } + + for _, tt := range testCases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + xdsIR := new(message.XdsIR) + defer xdsIR.Close() + cfg, err := config.New() + require.NoError(t, err) + + r := New(&Config{ + Server: *cfg, + XdsIR: xdsIR, + cache: cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil), + }) + + go r.subscribeAndTranslate(ctx) + + for _, xds := range tt.xdsIRs { + if xds.Delete { + xdsIR.Delete(xds.Key) + continue + } + xdsIR.Store(xds.Key, xds.Value) + } + + diff := "" + if !assert.Eventually(t, func() bool { + rs, err := r.cache.GetSnapshot(ratelimit.InfraName) + require.NoError(t, err) + + diff = cmp.Diff(tt.wantRateLimitConfigs, rs.GetResources(resourcev3.RateLimitConfigType), cmpopts.IgnoreUnexported(rlsconfv3.RateLimitConfig{}, rlsconfv3.RateLimitDescriptor{}, rlsconfv3.RateLimitPolicy{})) + return diff == "" + }, time.Second*1, time.Millisecond*20) { + t.Fatalf("snapshot mismatch (-want +got):\n%s", diff) + } + }) + } +}