From 77d5b4eaeec7d96dd0b6763b99d2570e33829cd1 Mon Sep 17 00:00:00 2001 From: David Wobrock Date: Sat, 28 Jan 2023 16:53:25 +0100 Subject: [PATCH] feat: add cache to Koanf.validatePipelineConfig --- driver/configuration/provider_koanf.go | 49 ++++++++++++++++++++------ 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/driver/configuration/provider_koanf.go b/driver/configuration/provider_koanf.go index 0b972aa688..844d626460 100644 --- a/driver/configuration/provider_koanf.go +++ b/driver/configuration/provider_koanf.go @@ -6,6 +6,7 @@ package configuration import ( "bytes" "context" + "crypto/sha256" "encoding/json" "fmt" "net/url" @@ -14,6 +15,8 @@ import ( "testing" "time" + "github.com/dgraph-io/ristretto" + "github.com/google/uuid" "github.com/knadh/koanf" "github.com/pkg/errors" @@ -40,11 +43,7 @@ type ( l *logrusx.Logger ctx context.Context - enabledMutex sync.RWMutex - enabledCache map[uint64]bool - - configMutex sync.RWMutex - configCache map[uint64]json.RawMessage + configValidationCache *ristretto.Cache subscriptions subscriptions } @@ -60,13 +59,24 @@ type ( var _ Provider = new(KoanfProvider) func NewKoanfProvider(ctx context.Context, flags *pflag.FlagSet, l *logrusx.Logger, opts ...configx.OptionModifier) (kp *KoanfProvider, err error) { + maxItems := int64(5000) + cache, _ := ristretto.NewCache(&ristretto.Config{ + NumCounters: maxItems * 10, + MaxCost: maxItems, + BufferItems: 64, + Metrics: false, + IgnoreInternalCost: true, + Cost: func(value interface{}) int64 { + return 1 + }, + }) + kp = &KoanfProvider{ ctx: ctx, l: l, - enabledCache: make(map[uint64]bool), - configCache: make(map[uint64]json.RawMessage), - subscriptions: subscriptions{data: make(map[SubscriptionID]callback)}, + configValidationCache: cache, + subscriptions: subscriptions{data: make(map[SubscriptionID]callback)}, } kp.source, err = configx.New( ctx, @@ -283,6 +293,20 @@ func (v *KoanfProvider) pipelineIsEnabled(prefix, id string) bool { return v.source.Bool(fmt.Sprintf("%s.%s.enabled", prefix, id)) } +func (v *KoanfProvider) hashPipelineConfig(prefix, id string, marshalled []byte) string { + slices := [][]byte{ + []byte(prefix), + []byte(id), + marshalled, + } + + var hashSlices []byte + for _, s := range slices { + hashSlices = append(hashSlices, s...) + } + return fmt.Sprintf("%x", sha256.Sum256(hashSlices)) +} + func (v *KoanfProvider) PipelineConfig(prefix, id string, override json.RawMessage, dest interface{}) error { if dest == nil { return nil @@ -302,8 +326,13 @@ func (v *KoanfProvider) PipelineConfig(prefix, id string, override json.RawMessa return errors.WithStack(err) } - if err = v.validatePipelineConfig(prefix, id, marshalled); err != nil { - return errors.WithStack(err) + hash := v.hashPipelineConfig(prefix, id, marshalled) + item, found := v.configValidationCache.Get(hash) + if !found || !item.(bool) { + if err = v.validatePipelineConfig(prefix, id, marshalled); err != nil { + return errors.WithStack(err) + } + v.configValidationCache.Set(hash, true, 0) } if err := json.NewDecoder(bytes.NewBuffer(marshalled)).Decode(dest); err != nil {