Skip to content

Commit

Permalink
feat: add cache to Koanf.validatePipelineConfig (#1042)
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Wobrock authored Jan 30, 2023
1 parent 3cd0550 commit e7fb605
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions driver/configuration/provider_koanf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package configuration
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -14,6 +15,8 @@ import (
"testing"
"time"

"github.com/dgraph-io/ristretto"

"github.com/google/uuid"
"github.com/knadh/koanf"
"github.com/pkg/errors"
Expand All @@ -40,11 +43,7 @@ type (
l *logrusx.Logger
ctx context.Context

enabledMutex sync.RWMutex
enabledCache map[uint64]bool

configMutex sync.RWMutex
configCache map[uint64]json.RawMessage
configValidationCache *ristretto.Cache

subscriptions subscriptions
}
Expand All @@ -60,13 +59,24 @@ type (
var _ Provider = new(KoanfProvider)

func NewKoanfProvider(ctx context.Context, flags *pflag.FlagSet, l *logrusx.Logger, opts ...configx.OptionModifier) (kp *KoanfProvider, err error) {
maxItems := int64(5000)
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: maxItems * 10,
MaxCost: maxItems,
BufferItems: 64,
Metrics: false,
IgnoreInternalCost: true,
Cost: func(value interface{}) int64 {
return 1
},
})

kp = &KoanfProvider{
ctx: ctx,
l: l,

enabledCache: make(map[uint64]bool),
configCache: make(map[uint64]json.RawMessage),
subscriptions: subscriptions{data: make(map[SubscriptionID]callback)},
configValidationCache: cache,
subscriptions: subscriptions{data: make(map[SubscriptionID]callback)},
}
kp.source, err = configx.New(
ctx,
Expand Down Expand Up @@ -283,6 +293,20 @@ func (v *KoanfProvider) pipelineIsEnabled(prefix, id string) bool {
return v.source.Bool(fmt.Sprintf("%s.%s.enabled", prefix, id))
}

func (v *KoanfProvider) hashPipelineConfig(prefix, id string, marshalled []byte) string {
slices := [][]byte{
[]byte(prefix),
[]byte(id),
marshalled,
}

var hashSlices []byte
for _, s := range slices {
hashSlices = append(hashSlices, s...)
}
return fmt.Sprintf("%x", sha256.Sum256(hashSlices))
}

func (v *KoanfProvider) PipelineConfig(prefix, id string, override json.RawMessage, dest interface{}) error {
if dest == nil {
return nil
Expand All @@ -302,8 +326,13 @@ func (v *KoanfProvider) PipelineConfig(prefix, id string, override json.RawMessa
return errors.WithStack(err)
}

if err = v.validatePipelineConfig(prefix, id, marshalled); err != nil {
return errors.WithStack(err)
hash := v.hashPipelineConfig(prefix, id, marshalled)
item, found := v.configValidationCache.Get(hash)
if !found || !item.(bool) {
if err = v.validatePipelineConfig(prefix, id, marshalled); err != nil {
return errors.WithStack(err)
}
v.configValidationCache.Set(hash, true, 0)
}

if err := json.NewDecoder(bytes.NewBuffer(marshalled)).Decode(dest); err != nil {
Expand Down

0 comments on commit e7fb605

Please sign in to comment.