Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent xdsIR updates from overwriting RateLimit configs from other xdsIR #3771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"strconv"

discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cachetype "github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
Expand Down Expand Up @@ -109,17 +110,27 @@
}
}

func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Resource) types.XdsResources {
xdsResourcesToUpdate := types.XdsResources{}
for _, xdsR := range rateLimitConfigsCache {
xdsResourcesToUpdate[resourcev3.RateLimitConfigType] = append(xdsResourcesToUpdate[resourcev3.RateLimitConfigType], xdsR...)
}

return xdsResourcesToUpdate
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key.
rateLimitConfigsCache := map[string][]cachetype.Resource{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to not maintain this cache here and instead create an API like rCache := r.getSnapshot(ctx) instead ? this will allow us to reduce one copy in memory

Copy link
Contributor Author

@sanposhiho sanposhiho Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try; depending on the complexity that would give though, that looks like a better idea.

Copy link
Contributor Author

@sanposhiho sanposhiho Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change would be like this sanposhiho@a9f0a19

So, either way, we have to have a cache to track which update.Key generates which rate limit configurations. (A cache size would be smaller than the current cache of entire configs, that's one good point though.)
The cache is required because otherwise we wouldn't know which part of snapshot we have to update. (A message.Update comes from watchable.Snapshot doesn't have a previous state of ir.)

So, I'm not very motivated to proceed that way; For me, it looks like only a little gain (less memory usage) would come with a certain downside (complexity).

Copy link
Contributor

@arkodg arkodg Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @sanposhiho thanks for writing up the quick diff, what I was suggesting was a little different, instead of holding the entire cache on the stack using rateLimitConfigsCache which incurs a constant memory hit, can we retrieve the cache snapshot using an API like
https://github.com/envoyproxy/go-control-plane/blob/1da4500d00e270d803caefbe0c20e4d3d162e586/pkg/cache/v3/snapshot.go#L124
every time we get a new new message, and then update it (add or delete key based on type of update) ?

Copy link
Contributor Author

@sanposhiho sanposhiho Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, what's the difference from my diff sanposhiho@a9f0a19? It fetches the previous snapshot with GetSnapshot -> GetResources, and try to rebuild a new snapshot from there already.

My point is that we, either way, need keyToRateLimitCfg cache to know which ir.Key has generated which part of snapshot so that we can calculate a correct snapshot after update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that 🙈
lets go ahead with diff to keep the code complexity low and we can revisit these if/when we hit a memory bottleneck


// Subscribe to resources.
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
if err := r.addNewSnapshot(ctx, nil); err != nil {
r.Logger.Error(err, "failed to update the config snapshot")
errChan <- err
}
delete(rateLimitConfigsCache, update.Key)
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))

Check warning on line 133 in internal/globalratelimit/runner/runner.go

View check run for this annotation

Codecov / codecov/patch

internal/globalratelimit/runner/runner.go#L132-L133

Added lines #L132 - L133 were not covered by tests
} else {
// Translate to ratelimit xDS Config.
rvt, err := r.translate(update.Value)
Expand All @@ -130,7 +141,9 @@

// Update ratelimit xDS config cache.
if rvt != nil {
r.updateSnapshot(ctx, rvt.XdsResources)
// Build XdsResources to use for the snapshot update from the cache.
rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType]
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))
}
}
},
Expand Down
244 changes: 244 additions & 0 deletions internal/globalratelimit/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package runner

import (
"context"
"fmt"
"testing"
"time"

cachetypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
rlsconfv3 "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/message"
)

func Test_subscribeAndTranslate(t *testing.T) {
t.Parallel()

testxds := func(gwName string) *ir.Xds {
return &ir.Xds{
HTTP: []*ir.HTTPListener{
{
CoreListenerDetails: ir.CoreListenerDetails{
Name: fmt.Sprintf("default/%s/listener-0", gwName),
},
Routes: []*ir.HTTPRoute{
{
Name: "route-0",
Traffic: &ir.TrafficFeatures{
RateLimit: &ir.RateLimit{
Global: &ir.GlobalRateLimit{
Rules: []*ir.RateLimitRule{
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 100,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute),
},
},
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-another-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 10,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitSecond),
},
},
},
},
},
},
},
{
Name: "route-1",
Traffic: &ir.TrafficFeatures{
RateLimit: &ir.RateLimit{
Global: &ir.GlobalRateLimit{
Rules: []*ir.RateLimitRule{
{
HeaderMatches: []*ir.StringMatch{
{
Name: "x-user-id",
Distinct: true,
},
},
Limit: ir.RateLimitValue{
Requests: 100,
Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute),
},
},
},
},
},
},
},
},
},
},
}
}

testRateLimitConfig := func(gwName string) *rlsconfv3.RateLimitConfig {
return &rlsconfv3.RateLimitConfig{
Name: fmt.Sprintf("default/%s/listener-0", gwName),
Domain: fmt.Sprintf("default/%s/listener-0", gwName),
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "route-0",
Value: "route-0",
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "rule-0-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_MINUTE,
RequestsPerUnit: 100,
},
},
{
Key: "rule-1-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_SECOND,
RequestsPerUnit: 10,
},
},
},
},
{
Key: "route-1",
Value: "route-1",
Descriptors: []*rlsconfv3.RateLimitDescriptor{
{
Key: "rule-0-match-0",
RateLimit: &rlsconfv3.RateLimitPolicy{
Unit: rlsconfv3.RateLimitUnit_MINUTE,
RequestsPerUnit: 100,
},
},
},
},
},
}
}

testCases := []struct {
name string
// xdsIRs contains a list of xds updates that the runner will receive.
xdsIRs []message.Update[string, *ir.Xds]
wantRateLimitConfigs map[string]cachetypes.Resource
}{
{
name: "one xds is added",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw0/listener-0": testRateLimitConfig("gw0"),
},
},
{
name: "two xds are added",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
{
Key: "gw1",
Value: testxds("gw1"),
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw0/listener-0": testRateLimitConfig("gw0"),
"default/gw1/listener-0": testRateLimitConfig("gw1"),
},
},
{
name: "one xds is deleted",
xdsIRs: []message.Update[string, *ir.Xds]{
{
Key: "gw0",
Value: testxds("gw0"),
},
{
Key: "gw1",
Value: testxds("gw1"),
},
{
Key: "gw0",
Delete: true,
},
},
wantRateLimitConfigs: map[string]cachetypes.Resource{
"default/gw1/listener-0": testRateLimitConfig("gw1"),
},
},
}

for _, tt := range testCases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
xdsIR := new(message.XdsIR)
defer xdsIR.Close()
cfg, err := config.New()
require.NoError(t, err)

r := New(&Config{
Server: *cfg,
XdsIR: xdsIR,
cache: cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil),
})

go r.subscribeAndTranslate(ctx)

for _, xds := range tt.xdsIRs {
if xds.Delete {
xdsIR.Delete(xds.Key)
continue
}
xdsIR.Store(xds.Key, xds.Value)
}

diff := ""
if !assert.Eventually(t, func() bool {
rs, err := r.cache.GetSnapshot(ratelimit.InfraName)
require.NoError(t, err)

diff = cmp.Diff(tt.wantRateLimitConfigs, rs.GetResources(resourcev3.RateLimitConfigType), cmpopts.IgnoreUnexported(rlsconfv3.RateLimitConfig{}, rlsconfv3.RateLimitDescriptor{}, rlsconfv3.RateLimitPolicy{}))
return diff == ""
}, time.Second*1, time.Millisecond*20) {
t.Fatalf("snapshot mismatch (-want +got):\n%s", diff)
}
})
}
}
Loading