Skip to content

Commit

Permalink
fix: fixes store merge when selector is used (#1322)
Browse files Browse the repository at this point in the history
## This PR

This change address
open-feature/open-feature-operator#664.

**Background**

We allow using multiple flag source CRDs when using flagd-proxy with
OFO. Internally, this converts to flagd using gRPC syncs with scopes,
where scopes specify the CRD names that need to source flags from.

**Bug**

We had two bugs, first gRPC resyncs never contained `scope`. This is why
we observed `unable to build sync from URI for target` message.
Secondly, we triggered resyncs only considering source equility. This is
not valid with flagd-proxy as we always go through the proxy for any
CRD.

**Fix**

Fix here adds scope to gRPC resyncs and considers both source and
selector equality when triggering a resync

**How to validate the fix**

[Use this
image](https://hub.docker.com/repository/docker/kavindudodanduwa/flagd/general)
with OFO sidecar image override

```helm upgrade --install openfeature openfeature/open-feature-operator --set sidecarConfiguration.image.tag=1,sidecarConfiguration.image.repository=kavindudodanduwa/flagd```

Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan authored Jun 6, 2024
1 parent 20bcb78 commit ed5025d
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 69 deletions.
6 changes: 3 additions & 3 deletions core/pkg/evaluator/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func (je *JSON) SetState(payload sync.DataSync) (map[string]interface{}, bool, e

switch payload.Type {
case sync.ALL:
events, reSync = je.store.Merge(je.Logger, payload.Source, newFlags.Flags)
events, reSync = je.store.Merge(je.Logger, payload.Source, payload.Selector, newFlags.Flags)
case sync.ADD:
events = je.store.Add(je.Logger, payload.Source, newFlags.Flags)
events = je.store.Add(je.Logger, payload.Source, payload.Selector, newFlags.Flags)
case sync.UPDATE:
events = je.store.Update(je.Logger, payload.Source, newFlags.Flags)
events = je.store.Update(je.Logger, payload.Source, payload.Selector, newFlags.Flags)
case sync.DELETE:
events = je.store.DeleteFlags(je.Logger, payload.Source, newFlags.Flags)
default:
Expand Down
5 changes: 4 additions & 1 deletion core/pkg/evaluator/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ func TestState_Evaluator(t *testing.T) {
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"selector":"",
"targeting": {
"if": [
{
Expand Down Expand Up @@ -965,6 +966,7 @@ func TestState_Evaluator(t *testing.T) {
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"selector":"",
"targeting": {
"if": [
{
Expand Down Expand Up @@ -1023,7 +1025,6 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"targeting": {
"if": [
{
Expand Down Expand Up @@ -1077,6 +1078,7 @@ func TestState_Evaluator(t *testing.T) {
"defaultVariant": "recursive",
"state": "ENABLED",
"source":"",
"selector":"",
"targeting": {
"if": [
{
Expand All @@ -1095,6 +1097,7 @@ func TestState_Evaluator(t *testing.T) {
},
"defaultVariant": "off",
"source":"",
"selector":"",
"targeting": {
"if": [
{
Expand Down
1 change: 1 addition & 0 deletions core/pkg/model/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Flag struct {
Variants map[string]any `json:"variants"`
Targeting json.RawMessage `json:"targeting,omitempty"`
Source string `json:"source"`
Selector string `json:"selector"`
}

type Evaluators struct {
Expand Down
13 changes: 10 additions & 3 deletions core/pkg/store/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func (f *Flags) GetAll(_ context.Context) map[string]model.Flag {
}

// Add new flags from source.
func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
func (f *Flags) Add(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}

for k, newFlag := range flags {
Expand All @@ -127,14 +128,16 @@ func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model

// Store the new version of the flag
newFlag.Source = source
newFlag.Selector = selector
f.Set(k, newFlag)
}

return notifications
}

// Update existing flags from source.
func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
func (f *Flags) Update(logger *logger.Logger, source string, selector string, flags map[string]model.Flag,
) map[string]interface{} {
notifications := map[string]interface{}{}

for k, flag := range flags {
Expand Down Expand Up @@ -165,6 +168,7 @@ func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]mo
}

flag.Source = source
flag.Selector = selector
f.Set(k, flag)
}

Expand Down Expand Up @@ -228,16 +232,18 @@ func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[stri
}

// Merge provided flags from source with currently stored flags.
// nolint: funlen
func (f *Flags) Merge(
logger *logger.Logger,
source string,
selector string,
flags map[string]model.Flag,
) (map[string]interface{}, bool) {
notifications := map[string]interface{}{}
resyncRequired := false
f.mx.Lock()
for k, v := range f.Flags {
if v.Source == source {
if v.Source == source && v.Selector == selector {
if _, ok := flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
Expand All @@ -259,6 +265,7 @@ func (f *Flags) Merge(
f.mx.Unlock()
for k, newFlag := range flags {
newFlag.Source = source
newFlag.Selector = selector
storedFlag, ok := f.Get(context.Background(), k)
if ok {
if !f.hasPriority(storedFlag.Source, source) {
Expand Down
116 changes: 55 additions & 61 deletions core/pkg/store/flags_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"reflect"
"testing"

"github.com/open-feature/flagd/core/pkg/logger"
Expand Down Expand Up @@ -73,52 +74,38 @@ func TestHasPriority(t *testing.T) {
func TestMergeFlags(t *testing.T) {
t.Parallel()
tests := []struct {
name string
current *Flags
new map[string]model.Flag
newSource string
want *Flags
wantNotifs map[string]interface{}
wantResync bool
name string
current *Flags
new map[string]model.Flag
newSource string
newSelector string
want *Flags
wantNotifs map[string]interface{}
wantResync bool
}{
{
name: "both nil",
current: &Flags{
Flags: nil,
},
name: "both nil",
current: &Flags{Flags: nil},
new: nil,
want: &Flags{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "both empty flags",
current: &Flags{
Flags: map[string]model.Flag{},
},
new: map[string]model.Flag{},
want: &Flags{Flags: map[string]model.Flag{}},
want: &Flags{Flags: nil},
wantNotifs: map[string]interface{}{},
},
{
name: "empty current",
current: &Flags{
Flags: nil,
},
name: "both empty flags",
current: &Flags{Flags: map[string]model.Flag{}},
new: map[string]model.Flag{},
want: &Flags{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "empty new",
current: &Flags{
Flags: map[string]model.Flag{},
},
name: "empty new",
current: &Flags{Flags: map[string]model.Flag{}},
new: nil,
want: &Flags{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{},
},
{
name: "extra fields on each",
name: "merging with new source",
current: &Flags{
Flags: map[string]model.Flag{
"waka": {
Expand All @@ -143,15 +130,14 @@ func TestMergeFlags(t *testing.T) {
Source: "2",
},
}},
wantNotifs: map[string]interface{}{
"paka": map[string]interface{}{"type": "write", "source": "2"},
},
wantNotifs: map[string]interface{}{"paka": map[string]interface{}{"type": "write", "source": "2"}},
},
{
name: "override",
current: &Flags{
Flags: map[string]model.Flag{"waka": {DefaultVariant: "off"}},
},
name: "override by new update",
current: &Flags{Flags: map[string]model.Flag{
"waka": {DefaultVariant: "off"},
"paka": {DefaultVariant: "off"},
}},
new: map[string]model.Flag{
"waka": {DefaultVariant: "on"},
"paka": {DefaultVariant: "on"},
Expand All @@ -162,11 +148,11 @@ func TestMergeFlags(t *testing.T) {
}},
wantNotifs: map[string]interface{}{
"waka": map[string]interface{}{"type": "update", "source": ""},
"paka": map[string]interface{}{"type": "write", "source": ""},
"paka": map[string]interface{}{"type": "update", "source": ""},
},
},
{
name: "identical",
name: "identical update so empty notifications",
current: &Flags{
Flags: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
},
Expand All @@ -179,20 +165,26 @@ func TestMergeFlags(t *testing.T) {
wantNotifs: map[string]interface{}{},
},
{
name: "deleted flag",
current: &Flags{
Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}},
},
new: map[string]model.Flag{},
newSource: "A",
want: &Flags{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{
"hello": map[string]interface{}{"type": "delete", "source": "A"},
},
name: "deleted flag & trigger resync for same source",
current: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A"}}},
new: map[string]model.Flag{},
newSource: "A",
want: &Flags{Flags: map[string]model.Flag{}},
wantNotifs: map[string]interface{}{"hello": map[string]interface{}{"type": "delete", "source": "A"}},
wantResync: true,
},
{
name: "no merge priority",
name: "no deleted & no resync for same source but different selector",
current: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
new: map[string]model.Flag{},
newSource: "A",
newSelector: "Y",
want: &Flags{Flags: map[string]model.Flag{"hello": {DefaultVariant: "off", Source: "A", Selector: "X"}}},
wantResync: false,
wantNotifs: map[string]interface{}{},
},
{
name: "no merge due to low priority",
current: &Flags{
FlagSources: []string{
"B",
Expand All @@ -205,9 +197,7 @@ func TestMergeFlags(t *testing.T) {
},
},
},
new: map[string]model.Flag{
"hello": {DefaultVariant: "off"},
},
new: map[string]model.Flag{"hello": {DefaultVariant: "off"}},
newSource: "B",
want: &Flags{
FlagSources: []string{
Expand All @@ -229,8 +219,9 @@ func TestMergeFlags(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.new)
require.Equal(t, tt.want, tt.want)
gotNotifs, resyncRequired := tt.current.Merge(logger.NewLogger(nil, false), tt.newSource, tt.newSelector, tt.new)

require.True(t, reflect.DeepEqual(tt.want, tt.current))
require.Equal(t, tt.wantNotifs, gotNotifs)
require.Equal(t, tt.wantResync, resyncRequired)
})
Expand All @@ -243,8 +234,9 @@ func TestFlags_Add(t *testing.T) {
mockOverrideSource := "source-2"

type request struct {
source string
flags map[string]model.Flag
source string
selector string
flags map[string]model.Flag
}

tests := []struct {
Expand Down Expand Up @@ -321,7 +313,7 @@ func TestFlags_Add(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.flags)
messages := tt.storedState.Add(mockLogger, tt.addRequest.source, tt.addRequest.selector, tt.addRequest.flags)

require.Equal(t, tt.storedState, tt.expectedState)

Expand All @@ -339,8 +331,9 @@ func TestFlags_Update(t *testing.T) {
mockOverrideSource := "source-2"

type request struct {
source string
flags map[string]model.Flag
source string
selector string
flags map[string]model.Flag
}

tests := []struct {
Expand Down Expand Up @@ -437,7 +430,8 @@ func TestFlags_Update(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source, tt.UpdateRequest.flags)
messages := tt.storedState.Update(mockLogger, tt.UpdateRequest.source,
tt.UpdateRequest.selector, tt.UpdateRequest.flags)

require.Equal(t, tt.storedState, tt.expectedState)

Expand Down
3 changes: 2 additions & 1 deletion core/pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (g *Sync) Init(ctx context.Context) error {
}

func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector})
if err != nil {
err = fmt.Errorf("error fetching all flags: %w", err)
g.Logger.Error(err.Error())
Expand Down Expand Up @@ -181,6 +181,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient,
dataSync <- sync.DataSync{
FlagData: data.FlagConfiguration,
Source: g.URI,
Selector: g.Selector,
Type: sync.ALL,
}

Expand Down
1 change: 1 addition & 0 deletions core/pkg/sync/isync.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ISync interface {
type DataSync struct {
FlagData string
Source string
Selector string
Type
}

Expand Down

0 comments on commit ed5025d

Please sign in to comment.