From a9f0a190ed7ecc939225598c0e5ce96f23307ce9 Mon Sep 17 00:00:00 2001 From: sanposhiho <44139130+sanposhiho@users.noreply.github.com> Date: Sat, 13 Jul 2024 10:55:55 +0900 Subject: [PATCH] fix: eliminate the entire cache of rate limit configs Signed-off-by: sanposhiho <44139130+sanposhiho@users.noreply.github.com> --- internal/globalratelimit/runner/runner.go | 73 +++++++++++++++++++---- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go index 0a2d987c182..9271ed31f3e 100644 --- a/internal/globalratelimit/runner/runner.go +++ b/internal/globalratelimit/runner/runner.go @@ -10,12 +10,14 @@ import ( "crypto/rand" "crypto/tls" "crypto/x509" + "errors" "fmt" "math" "net" "os" "strconv" + "fortio.org/sets" discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" cachetype "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" @@ -120,8 +122,8 @@ func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Reso } func (r *Runner) subscribeAndTranslate(ctx context.Context) { - // rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key. - rateLimitConfigsCache := map[string][]cachetype.Resource{} + // which key has generated which RateLimitConfig. + keyToRateLimitCfg := map[string]sets.Set[string]{} // Subscribe to resources. message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx), @@ -129,8 +131,12 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { r.Logger.Info("received a notification") if update.Delete { - delete(rateLimitConfigsCache, update.Key) - r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) + var err error + keyToRateLimitCfg, err = r.updateSnapshot(ctx, update.Key, nil, keyToRateLimitCfg) + if err != nil { + r.Logger.Error(err, "failed to update snapshot") + errChan <- err + } } else { // Translate to ratelimit xDS Config. rvt, err := r.translate(update.Value) @@ -141,9 +147,11 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Update ratelimit xDS config cache. if rvt != nil { - // Build XdsResources to use for the snapshot update from the cache. - rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType] - r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) + keyToRateLimitCfg, err = r.updateSnapshot(ctx, update.Key, rvt.XdsResources, keyToRateLimitCfg) + if err != nil { + r.Logger.Error(err, "failed to update snapshot") + errChan <- err + } } } }, @@ -166,15 +174,56 @@ func (r *Runner) translate(xdsIR *ir.Xds) (*types.ResourceVersionTable, error) { return resourceVT, nil } -func (r *Runner) updateSnapshot(ctx context.Context, resource types.XdsResources) { +func (r *Runner) updateSnapshot(ctx context.Context, key string, resource types.XdsResources, keyToRateLimitCfg map[string]sets.Set[string]) (map[string]sets.Set[string], error) { if r.cache == nil { - r.Logger.Error(nil, "failed to init the snapshot cache") - return + return keyToRateLimitCfg, errors.New("empty cache in the runner, most likely the runner somehow failed to init the snapshot cache") + } + + resourceToUpdate := types.XdsResources{} + previousCfgKeys, ok := keyToRateLimitCfg[key] + if !ok || previousCfgKeys.Len() != 0 { + previousCfgKeys = sets.New[string]() + } + + // We have to merge the new resource with the existing ones. + + rs, err := r.cache.GetSnapshot(ratelimit.InfraName) + var configs map[string]cachetype.Resource + // If err is non-nil, most likely we haven't set any snapshot yet. + if err == nil { + configs = rs.GetResources(resourcev3.RateLimitConfigType) + } + + for k := range configs { + if previousCfgKeys.Has(key) { + // This config is generated by this key (i.e., gateway). + continue + } + + // This config is not generated by this key and not related to this update. + // Just keep them as they are. + resourceToUpdate[resourcev3.RateLimitConfigType] = append(resourceToUpdate[resourcev3.RateLimitConfigType], configs[k]) + } + + newKeyCache := sets.New[string]() + if resource != nil { + for _, rlc := range resource[resourcev3.RateLimitConfigType] { + resourceToUpdate[resourcev3.RateLimitConfigType] = append(resourceToUpdate[resourcev3.RateLimitConfigType], rlc) + + if withName, ok := rlc.(cachetype.ResourceWithName); ok { + newKeyCache.Add(withName.GetName()) + } else { + r.Logger.Error(nil, "RateLimitConfig does not have a name", "key", key) + } + } } + keyToRateLimitCfg[key] = newKeyCache - if err := r.addNewSnapshot(ctx, resource); err != nil { - r.Logger.Error(err, "failed to update the snapshot cache") + if err := r.addNewSnapshot(ctx, resourceToUpdate); err != nil { + return keyToRateLimitCfg, fmt.Errorf("failed to add a new snapshot: %w", err) } + + return keyToRateLimitCfg, nil } func (r *Runner) addNewSnapshot(ctx context.Context, resource types.XdsResources) error {