Skip to content

Commit

Permalink
fix: prevent xdsIR updates from overwriting RateLimit configs from ot…
Browse files Browse the repository at this point in the history
…her xdsIR (#3771)

* fix: prevent xdsIR updates from overwriting RateLimit configs from other xdsIR

Signed-off-by: Kensei Nakada <[email protected]>

* fix: handle deletion events appropriately

Signed-off-by: Kensei Nakada <[email protected]>

* test: add a unit test for subscribeAndTranslate

Signed-off-by: Kensei Nakada <[email protected]>

* chore: sort import order

Signed-off-by: Kensei Nakada <[email protected]>

---------

Signed-off-by: Kensei Nakada <[email protected]>
Co-authored-by: zirain <[email protected]>
  • Loading branch information
sanposhiho and zirain authored Jul 13, 2024
1 parent efb25d2 commit 762eb42
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 5 deletions.
23 changes: 18 additions & 5 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"

discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cachetype "github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
Expand Down Expand Up @@ -109,17 +110,27 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) {
}
}

func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Resource) types.XdsResources {
xdsResourcesToUpdate := types.XdsResources{}
for _, xdsR := range rateLimitConfigsCache {
xdsResourcesToUpdate[resourcev3.RateLimitConfigType] = append(xdsResourcesToUpdate[resourcev3.RateLimitConfigType], xdsR...)
}

return xdsResourcesToUpdate
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key.
rateLimitConfigsCache := map[string][]cachetype.Resource{}

// Subscribe to resources.
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
if err := r.addNewSnapshot(ctx, nil); err != nil {
r.Logger.Error(err, "failed to update the config snapshot")
errChan <- err
}
delete(rateLimitConfigsCache, update.Key)
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))
} else {
// Translate to ratelimit xDS Config.
rvt, err := r.translate(update.Value)
Expand All @@ -130,7 +141,9 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {

// Update ratelimit xDS config cache.
if rvt != nil {
r.updateSnapshot(ctx, rvt.XdsResources)
// Build XdsResources to use for the snapshot update from the cache.
rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType]
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))
}
}
},
Expand Down
244 changes: 244 additions & 0 deletions internal/globalratelimit/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"
"fmt"
"testing"
"time"

cachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
rlsconfv3 "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
)

func Test_subscribeAndTranslate(t *testing.T) {
t.Parallel()

testxds := func(gwName string) *ir.Xds {
return &ir.Xds{
HTTP: []*ir.HTTPListener{
{
CoreListenerDetails: ir.CoreListenerDetails{
Name: fmt.Sprintf("default/%s/listener-0", gwName),
},
Routes: []*ir.HTTPRoute{
{
Name: "route-0",
Traffic: &ir.TrafficFeatures{
RateLimit: &ir.RateLimit{
Global: &ir.GlobalRateLimit{
Rules: []*ir.RateLimitRule{
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 100,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute),
},
},
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-another-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 10,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitSecond),
},
},
},
},
},
},
},
{
Name: "route-1",
Traffic: &ir.TrafficFeatures{
RateLimit: &ir.RateLimit{
Global: &ir.GlobalRateLimit{
Rules: []*ir.RateLimitRule{
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 100,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute),
},
},
},
},
},
},
},
},
},
},
}
}

testRateLimitConfig := func(gwName string) *rlsconfv3.RateLimitConfig {
return &rlsconfv3.RateLimitConfig{
Name: fmt.Sprintf("default/%s/listener-0", gwName),
Domain: fmt.Sprintf("default/%s/listener-0", gwName),
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "route-0",
Value: "route-0",
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "rule-0-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_MINUTE,
RequestsPerUnit: 100,
},
},
{
Key: "rule-1-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_SECOND,
RequestsPerUnit: 10,
},
},
},
},
{
Key: "route-1",
Value: "route-1",
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "rule-0-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_MINUTE,
RequestsPerUnit: 100,
},
},
},
},
},
}
}

testCases := []struct {
name string
// xdsIRs contains a list of xds updates that the runner will receive.
xdsIRs []message.Update[string, *ir.Xds]
wantRateLimitConfigs map[string]cachetypes.Resource
}{
{
name: "one xds is added",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw0/listener-0": testRateLimitConfig("gw0"),
},
},
{
name: "two xds are added",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
{
Key: "gw1",
Value: testxds("gw1"),
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw0/listener-0": testRateLimitConfig("gw0"),
"default/gw1/listener-0": testRateLimitConfig("gw1"),
},
},
{
name: "one xds is deleted",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
{
Key: "gw1",
Value: testxds("gw1"),
},
{
Key: "gw0",
Delete: true,
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw1/listener-0": testRateLimitConfig("gw1"),
},
},
}

for _, tt := range testCases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
xdsIR := new(message.XdsIR)
defer xdsIR.Close()
cfg, err := config.New()
require.NoError(t, err)

r := New(&Config{
Server: *cfg,
XdsIR: xdsIR,
cache: cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil),
})

go r.subscribeAndTranslate(ctx)

for _, xds := range tt.xdsIRs {
if xds.Delete {
xdsIR.Delete(xds.Key)
continue
}
xdsIR.Store(xds.Key, xds.Value)
}

diff := ""
if !assert.Eventually(t, func() bool {
rs, err := r.cache.GetSnapshot(ratelimit.InfraName)
require.NoError(t, err)

diff = cmp.Diff(tt.wantRateLimitConfigs, rs.GetResources(resourcev3.RateLimitConfigType), cmpopts.IgnoreUnexported(rlsconfv3.RateLimitConfig{}, rlsconfv3.RateLimitDescriptor{}, rlsconfv3.RateLimitPolicy{}))
return diff == ""
}, time.Second*1, time.Millisecond*20) {
t.Fatalf("snapshot mismatch (-want +got):\n%s", diff)
}
})
}
}

0 comments on commit 762eb42

Please sign in to comment.