Skip to content

Commit

Permalink
fix: eliminate the entire cache of rate limit configs
Browse files Browse the repository at this point in the history
Signed-off-by: sanposhiho <[email protected]>
  • Loading branch information
sanposhiho committed Jul 13, 2024
1 parent 74281cf commit a9f0a19
Showing 1 changed file with 61 additions and 12 deletions.
73 changes: 61 additions & 12 deletions internal/globalratelimit/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"math"
"net"
"os"
"strconv"

"fortio.org/sets"
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cachetype "github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
Expand Down Expand Up @@ -120,17 +122,21 @@ func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Reso
}

func (r *Runner) subscribeAndTranslate(ctx context.Context) {
// rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key.
rateLimitConfigsCache := map[string][]cachetype.Resource{}
// which key has generated which RateLimitConfig.
keyToRateLimitCfg := map[string]sets.Set[string]{}

// Subscribe to resources.
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
func(update message.Update[string, *ir.Xds], errChan chan error) {
r.Logger.Info("received a notification")

if update.Delete {
delete(rateLimitConfigsCache, update.Key)
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))
var err error
keyToRateLimitCfg, err = r.updateSnapshot(ctx, update.Key, nil, keyToRateLimitCfg)
if err != nil {
r.Logger.Error(err, "failed to update snapshot")
errChan <- err
}
} else {
// Translate to ratelimit xDS Config.
rvt, err := r.translate(update.Value)
Expand All @@ -141,9 +147,11 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {

// Update ratelimit xDS config cache.
if rvt != nil {
// Build XdsResources to use for the snapshot update from the cache.
rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType]
r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache))
keyToRateLimitCfg, err = r.updateSnapshot(ctx, update.Key, rvt.XdsResources, keyToRateLimitCfg)
if err != nil {
r.Logger.Error(err, "failed to update snapshot")
errChan <- err
}
}
}
},
Expand All @@ -166,15 +174,56 @@ func (r *Runner) translate(xdsIR *ir.Xds) (*types.ResourceVersionTable, error) {
return resourceVT, nil
}

func (r *Runner) updateSnapshot(ctx context.Context, resource types.XdsResources) {
func (r *Runner) updateSnapshot(ctx context.Context, key string, resource types.XdsResources, keyToRateLimitCfg map[string]sets.Set[string]) (map[string]sets.Set[string], error) {
if r.cache == nil {
r.Logger.Error(nil, "failed to init the snapshot cache")
return
return keyToRateLimitCfg, errors.New("empty cache in the runner, most likely the runner somehow failed to init the snapshot cache")
}

resourceToUpdate := types.XdsResources{}
previousCfgKeys, ok := keyToRateLimitCfg[key]
if !ok || previousCfgKeys.Len() != 0 {
previousCfgKeys = sets.New[string]()
}

// We have to merge the new resource with the existing ones.

rs, err := r.cache.GetSnapshot(ratelimit.InfraName)
var configs map[string]cachetype.Resource
// If err is non-nil, most likely we haven't set any snapshot yet.
if err == nil {
configs = rs.GetResources(resourcev3.RateLimitConfigType)
}

for k := range configs {
if previousCfgKeys.Has(key) {
// This config is generated by this key (i.e., gateway).
continue
}

// This config is not generated by this key and not related to this update.
// Just keep them as they are.
resourceToUpdate[resourcev3.RateLimitConfigType] = append(resourceToUpdate[resourcev3.RateLimitConfigType], configs[k])
}

newKeyCache := sets.New[string]()
if resource != nil {
for _, rlc := range resource[resourcev3.RateLimitConfigType] {
resourceToUpdate[resourcev3.RateLimitConfigType] = append(resourceToUpdate[resourcev3.RateLimitConfigType], rlc)

if withName, ok := rlc.(cachetype.ResourceWithName); ok {
newKeyCache.Add(withName.GetName())
} else {
r.Logger.Error(nil, "RateLimitConfig does not have a name", "key", key)
}
}
}
keyToRateLimitCfg[key] = newKeyCache

if err := r.addNewSnapshot(ctx, resource); err != nil {
r.Logger.Error(err, "failed to update the snapshot cache")
if err := r.addNewSnapshot(ctx, resourceToUpdate); err != nil {
return keyToRateLimitCfg, fmt.Errorf("failed to add a new snapshot: %w", err)
}

return keyToRateLimitCfg, nil
}

func (r *Runner) addNewSnapshot(ctx context.Context, resource types.XdsResources) error {
Expand Down

0 comments on commit a9f0a19

Please sign in to comment.