From 7fe5cf9ff19d43d747e89087785590692aa212c4 Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Tue, 31 Dec 2019 16:28:55 -0800 Subject: [PATCH] chore(kv): refactor kv.Entity to provide interface for PK and unique keys --- kv/check.go | 46 ++++++++-------- kv/encode.go | 49 ++++++++++++++++ kv/notification_endpoint.go | 50 +++++++---------- kv/store_base.go | 95 +++++++++++++------------------- kv/store_base_test.go | 29 +++++----- kv/store_index.go | 29 +++++----- kv/store_index_test.go | 9 +-- kv/variable.go | 34 +++++------- testing/notification_endpoint.go | 12 +--- 9 files changed, 177 insertions(+), 176 deletions(-) create mode 100644 kv/encode.go diff --git a/kv/check.go b/kv/check.go index 8c7092866f0..972b1967d73 100644 --- a/kv/check.go +++ b/kv/check.go @@ -15,27 +15,26 @@ func newCheckStore() *IndexStore { const resource = "check" var decEndpointEntFn DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) { - edp, err := check.UnmarshalJSON(val) - return key, edp, err + ch, err := check.UnmarshalJSON(val) + return key, ch, err } var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) { - edp, ok := v.(influxdb.Check) + ch, ok := v.(influxdb.Check) if err := errUnexpectedDecodeVal(ok); err != nil { return Entity{}, err } return Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(ch.GetID()), + UniqueKey: Encode(EncID(ch.GetOrgID()), EncString(ch.GetName())), + Body: ch, }, nil } return &IndexStore{ Resource: resource, EntStore: NewStoreBase(resource, []byte("checksv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn), - IndexStore: NewOrgNameKeyStore(resource, []byte("checkindexv1"), true), + IndexStore: NewOrgNameKeyStore(resource, []byte("checkindexv1"), false), } } @@ -61,7 +60,7 @@ func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.C } func (s *Service) findCheckByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.Check, error) { - chkVal, err := s.checkStore.FindEnt(ctx, tx, Entity{ID: id}) + chkVal, err := s.checkStore.FindEnt(ctx, tx, Entity{PK: EncID(id)}) if err != nil { return nil, err } @@ -73,8 +72,7 @@ func (s *Service) findCheckByName(ctx context.Context, tx Tx, orgID influxdb.ID, defer span.Finish() chVal, err := s.checkStore.FindEnt(ctx, tx, Entity{ - OrgID: orgID, - Name: name, + UniqueKey: Encode(EncID(orgID), EncString(name)), }) if IsNotFound(err) { return nil, &influxdb.Error{ @@ -118,7 +116,7 @@ func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (i var prefix []byte if filter.OrgID != nil { - ent := Entity{OrgID: *filter.OrgID} + ent := Entity{UniqueKey: EncID(*filter.OrgID)} prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent) } filterFn := filterChecksFn(nil, filter) @@ -203,11 +201,11 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o var prefix []byte if filter.OrgID != nil { - ent := Entity{OrgID: *filter.OrgID} + encs := []EncodeFn{EncID(*filter.OrgID)} if filter.Name != nil { - ent.Name = *filter.Name + encs = append(encs, EncString(*filter.Name)) } - prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent) + prefix, _ = s.checkStore.IndexStore.EntKey(ctx, Entity{UniqueKey: Encode(encs...)}) } var opt influxdb.FindOptions @@ -338,10 +336,9 @@ func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error { func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check) error { return s.checkStore.Put(ctx, tx, Entity{ - ID: c.GetID(), - Name: c.GetName(), - OrgID: c.GetOrgID(), - Body: c, + PK: EncID(c.GetID()), + UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())), + Body: c, }) } @@ -400,8 +397,7 @@ func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk in } ent := Entity{ - OrgID: current.GetOrgID(), - Name: current.GetName(), + UniqueKey: Encode(EncID(current.GetOrgID()), EncString(current.GetName())), } if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil { return nil, err @@ -468,8 +464,7 @@ func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd inf } ent := Entity{ - OrgID: c.GetOrgID(), - Name: c.GetName(), + UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())), } if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil { return nil, err @@ -513,7 +508,10 @@ func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error { } return s.kv.Update(ctx, func(tx Tx) error { - if err := s.checkStore.DeleteEnt(ctx, tx, Entity{ID: id}); err != nil { + err := s.checkStore.DeleteEnt(ctx, tx, Entity{ + PK: EncID(id), + }) + if err != nil { return err } diff --git a/kv/encode.go b/kv/encode.go new file mode 100644 index 00000000000..d39d5bd80b4 --- /dev/null +++ b/kv/encode.go @@ -0,0 +1,49 @@ +package kv + +import ( + "errors" + "strings" + + "github.com/influxdata/influxdb" +) + +// EncodeFn returns an encoding when called. Closures are your friend here. +type EncodeFn func() ([]byte, error) + +// Encode concatenates a list of encodings together. +func Encode(encodings ...EncodeFn) EncodeFn { + return func() ([]byte, error) { + var key []byte + for _, enc := range encodings { + part, err := enc() + if err != nil { + return key, err + } + key = append(key, part...) + } + return key, nil + } +} + +// EncString encodes a string. +func EncString(str string) EncodeFn { + return func() ([]byte, error) { + return []byte(str), nil + } +} + +// EncStringCaseInsensitive encodes a string and makes it case insensitive by lower casing +// everything. +func EncStringCaseInsensitive(str string) EncodeFn { + return EncString(strings.ToLower(str)) +} + +// EncID encodes an influx ID. +func EncID(id influxdb.ID) EncodeFn { + return func() ([]byte, error) { + if id == 0 { + return nil, errors.New("no ID was provided") + } + return id.Encode() + } +} diff --git a/kv/notification_endpoint.go b/kv/notification_endpoint.go index fa2dbe68e6b..2699a93b49e 100644 --- a/kv/notification_endpoint.go +++ b/kv/notification_endpoint.go @@ -33,17 +33,16 @@ func newEndpointStore() *IndexStore { return Entity{}, err } return Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(edp.GetID()), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), + Body: edp, }, nil } return &IndexStore{ Resource: resource, EntStore: NewStoreBase(resource, []byte("notificationEndpointv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn), - IndexStore: NewOrgNameKeyStore(resource, []byte("notificationEndpointIndexv1"), true), + IndexStore: NewOrgNameKeyStore(resource, []byte("notificationEndpointIndexv1"), false), } } @@ -85,10 +84,9 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf } ent := Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(edp.GetID()), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), + Body: edp, } if err := s.endpointStore.Put(ctx, tx, ent); err != nil { return err @@ -108,8 +106,7 @@ func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, org defer span.Finish() body, err := s.endpointStore.FindEnt(ctx, tx, Entity{ - OrgID: orgID, - Name: name, + UniqueKey: Encode(EncID(orgID), EncString(name)), }) if err != nil { return nil, err @@ -147,8 +144,7 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl } err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{ - OrgID: edp.GetOrgID(), - Name: curName, + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(curName)), }) if err != nil { return nil, err @@ -166,10 +162,9 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl } ent := Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(edp.GetID()), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), + Body: edp, } if err := s.endpointStore.Put(ctx, tx, ent); err != nil { return nil, err @@ -210,8 +205,7 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ } err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{ - OrgID: edp.GetOrgID(), - Name: edp.GetName(), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), }) if err != nil { return nil, err @@ -236,10 +230,9 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ // TODO(jsteenb2): every above here moves into service layer ent := Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(edp.GetID()), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), + Body: edp, } if err := s.endpointStore.Put(ctx, tx, ent); err != nil { return nil, err @@ -258,10 +251,9 @@ func (s *Service) PutNotificationEndpoint(ctx context.Context, edp influxdb.Noti return s.kv.Update(ctx, func(tx Tx) (err error) { ent := Entity{ - ID: edp.GetID(), - Name: edp.GetName(), - OrgID: edp.GetOrgID(), - Body: edp, + PK: EncID(edp.GetID()), + UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())), + Body: edp, } return s.endpointStore.Put(ctx, tx, ent) }) @@ -283,7 +275,7 @@ func (s *Service) FindNotificationEndpointByID(ctx context.Context, id influxdb. } func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationEndpoint, error) { - decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, Entity{ID: id}) + decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, Entity{PK: EncID(id)}) if err != nil { return nil, err } @@ -386,7 +378,7 @@ func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id infl return nil, 0, err } - if err := s.endpointStore.DeleteEnt(ctx, tx, Entity{ID: id}); err != nil { + if err := s.endpointStore.DeleteEnt(ctx, tx, Entity{PK: EncID(id)}); err != nil { return nil, 0, err } diff --git a/kv/store_base.go b/kv/store_base.go index 563abb84ede..058977cd284 100644 --- a/kv/store_base.go +++ b/kv/store_base.go @@ -6,17 +6,16 @@ import ( "encoding/json" "errors" "fmt" - "strings" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" ) type Entity struct { - ID influxdb.ID - Name string - OrgID influxdb.ID - Body interface{} + PK EncodeFn + UniqueKey EncodeFn + + Body interface{} } // EncodeEntFn encodes the entity. This is used both for the key and vals in the store base. @@ -24,16 +23,20 @@ type EncodeEntFn func(ent Entity) ([]byte, string, error) // EncIDKey encodes an entity into a key that represents the encoded ID provided. func EncIDKey(ent Entity) ([]byte, string, error) { - id, err := ent.ID.Encode() - return id, "ID", err + if ent.PK == nil { + return nil, "ID", errors.New("no ID provided") + } + key, err := ent.PK() + return key, "ID", err } -// EncOrgIDNameKey encodes an organization id and name key. -func EncOrgIDNameKey(caseSensitive bool) func(ent Entity) ([]byte, string, error) { - return func(ent Entity) ([]byte, string, error) { - key, err := indexByOrgNameKey(ent.OrgID, ent.Name, caseSensitive) - return key, "organization ID and name", err +// EncUniqKey encodes the unique key. +func EncUniqKey(ent Entity) ([]byte, string, error) { + if ent.UniqueKey == nil { + return nil, "Unique Key", errors.New("no unique key provided") } + key, err := ent.UniqueKey() + return key, "Unique Key", err } // EncBodyJSON JSON encodes the entity body and returns the raw bytes and indicates @@ -74,7 +77,7 @@ func NewOrgNameKeyStore(resource string, bktName []byte, caseSensitive bool) *St return Entity{}, err } - ent := Entity{ID: id} + ent := Entity{PK: Encode(EncID(id))} if len(k) == 0 { return ent, nil } @@ -83,12 +86,15 @@ func NewOrgNameKeyStore(resource string, bktName []byte, caseSensitive bool) *St if err != nil { return Entity{}, err } - ent.OrgID = orgID - ent.Name = name + nameEnc := EncString(name) + if !caseSensitive { + nameEnc = EncStringCaseInsensitive(name) + } + ent.UniqueKey = Encode(EncID(orgID), nameEnc) return ent, nil } - return NewStoreBase(resource, bktName, EncOrgIDNameKey(caseSensitive), EncIDKey, DecIndexID, decValToEntFn) + return NewStoreBase(resource, bktName, EncUniqKey, EncIDKey, DecIndexID, decValToEntFn) } // StoreBase is the base behavior for accessing buckets in kv. It provides mechanisms that can @@ -97,21 +103,21 @@ type StoreBase struct { Resource string BktName []byte - EncodeEntKeyFn EncodeEntFn - EncodeEntBodyFn EncodeEntFn - DecodeEntFn DecodeBucketValFn - DecodeToEntFn ConvertValToEntFn + EncodeEntKeyFn EncodeEntFn + EncodeEntBodyFn EncodeEntFn + DecodeEntFn DecodeBucketValFn + ConvertValToEntFn ConvertValToEntFn } // NewStoreBase creates a new store base. func NewStoreBase(resource string, bktName []byte, encKeyFn, encBodyFn EncodeEntFn, decFn DecodeBucketValFn, decToEntFn ConvertValToEntFn) *StoreBase { return &StoreBase{ - Resource: resource, - BktName: bktName, - EncodeEntKeyFn: encKeyFn, - EncodeEntBodyFn: encBodyFn, - DecodeEntFn: decFn, - DecodeToEntFn: decToEntFn, + Resource: resource, + BktName: bktName, + EncodeEntKeyFn: encKeyFn, + EncodeEntBodyFn: encBodyFn, + DecodeEntFn: decFn, + ConvertValToEntFn: decToEntFn, } } @@ -410,6 +416,13 @@ func (s *StoreBase) encodeEnt(ctx context.Context, ent Entity, fn EncodeEntFn) ( span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() + if fn == nil { + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("no key was provided for %s", s.Resource), + } + } + encoded, field, err := fn(ent) if err != nil { return encoded, &influxdb.Error{ @@ -535,36 +548,6 @@ func (i *iterator) seek(ctx context.Context) { } -func indexByOrgNameKey(orgID influxdb.ID, name string, caseSensitive bool) ([]byte, error) { - orgIDEncoded, err := orgID.Encode() - if err != nil { - return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("invalid org ID provided: %q", orgID.String()), - Err: err, - } - } - k := make([]byte, influxdb.IDLength+len(name)) - copy(k, orgIDEncoded) - - if name == "" { - // purposefully returning a partial key here b/c it allows for - // a key of just the orgID to be used. An Error can be ignored - // and then the key useful to the caller. It is used in the Index - // Store when needing to lookup by orgID and hte name isn't provided. - return k, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "name must be provided", - } - } - - if !caseSensitive { - name = strings.ToLower(name) - } - copy(k[influxdb.IDLength:], name) - return k, nil -} - func errUnexpectedDecodeVal(ok bool) error { if ok { return nil diff --git a/kv/store_base_test.go b/kv/store_base_test.go index dd6e0a0a61c..d3aad5b9f33 100644 --- a/kv/store_base_test.go +++ b/kv/store_base_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestStore(t *testing.T) { +func TestStoreBase(t *testing.T) { newStoreBase := func(t *testing.T, bktSuffix string, encKeyFn, encBodyFn kv.EncodeEntFn, decFn kv.DecodeBucketValFn, decToEntFn kv.ConvertValToEntFn) (*kv.StoreBase, func(), kv.Store) { t.Helper() @@ -79,10 +79,9 @@ func testPutBase(t *testing.T, kvStore kv.Store, base storeBase, bktName []byte) update(t, kvStore, func(tx kv.Tx) error { return base.Put(context.TODO(), tx, kv.Entity{ - ID: expected.ID, - Name: expected.Name, - OrgID: expected.OrgID, - Body: expected, + PK: kv.EncID(expected.ID), + UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)), + Body: expected, }) }) @@ -101,11 +100,11 @@ func testDeleteEntBase(t *testing.T, kvStore kv.Store, base storeBase) kv.Entity seedEnts(t, kvStore, base, expected) update(t, kvStore, func(tx kv.Tx) error { - return base.DeleteEnt(context.TODO(), tx, kv.Entity{ID: expected.ID}) + return base.DeleteEnt(context.TODO(), tx, kv.Entity{PK: expected.PK}) }) err := kvStore.View(context.TODO(), func(tx kv.Tx) error { - _, err := base.FindEnt(context.TODO(), tx, kv.Entity{ID: expected.ID}) + _, err := base.FindEnt(context.TODO(), tx, kv.Entity{PK: expected.PK}) return err }) isNotFoundErr(t, err) @@ -195,7 +194,7 @@ func testFindEnt(t *testing.T, kvStore kv.Store, base storeBase) kv.Entity { var actual interface{} view(t, kvStore, func(tx kv.Tx) error { - f, err := base.FindEnt(context.TODO(), tx, kv.Entity{ID: expected.ID}) + f, err := base.FindEnt(context.TODO(), tx, kv.Entity{PK: expected.PK}) actual = f return err }) @@ -360,20 +359,18 @@ func decFooEntFn(k []byte, v interface{}) (kv.Entity, error) { return kv.Entity{}, fmt.Errorf("invalid entry: %#v", v) } return kv.Entity{ - ID: f.ID, - Name: f.Name, - OrgID: f.OrgID, - Body: f, + PK: kv.EncID(f.ID), + UniqueKey: kv.Encode(kv.EncID(f.OrgID), kv.EncString(f.Name)), + Body: f, }, nil } func newFooEnt(id, orgID influxdb.ID, name string) kv.Entity { f := foo{ID: id, Name: name, OrgID: orgID} return kv.Entity{ - ID: f.ID, - Name: f.Name, - OrgID: f.OrgID, - Body: f, + PK: kv.EncID(f.ID), + UniqueKey: kv.Encode(kv.EncID(f.OrgID), kv.EncString(f.Name)), + Body: f, } } diff --git a/kv/store_index.go b/kv/store_index.go index 65248a458ab..0c58710ac34 100644 --- a/kv/store_index.go +++ b/kv/store_index.go @@ -2,7 +2,6 @@ package kv import ( "context" - "fmt" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" @@ -41,7 +40,7 @@ func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error { defer span.Finish() deleteIndexedRelationFn := func(k []byte, v interface{}) error { - ent, err := s.EntStore.DecodeToEntFn(k, v) + ent, err := s.EntStore.ConvertValToEntFn(k, v) if err != nil { return err } @@ -65,7 +64,7 @@ func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error { return err } - decodedEnt, err := s.EntStore.DecodeToEntFn(nil, existing) + decodedEnt, err := s.EntStore.ConvertValToEntFn(nil, existing) if err != nil { return err } @@ -111,6 +110,7 @@ func (s *IndexStore) indexFilterStream(ctx context.Context, tx Tx, entFilterFn F v interface{} err error }, func([]byte, interface{}) bool) { + kvStream := make(chan struct { k []byte v interface{} @@ -126,11 +126,7 @@ func (s *IndexStore) indexFilterStream(ctx context.Context, tx Tx, entFilterFn F send := func(key []byte, v interface{}, err error) bool { select { case <-ctx.Done(): - case kvStream <- kve{ - k: key, - v: v, - err: err, - }: + case kvStream <- kve{k: key, v: v, err: err}: } return true } @@ -141,7 +137,7 @@ func (s *IndexStore) indexFilterStream(ctx context.Context, tx Tx, entFilterFn F close(kvStream) } }() - ent, err := s.IndexStore.DecodeToEntFn(key, indexVal) + ent, err := s.IndexStore.ConvertValToEntFn(key, indexVal) if err != nil { return send(nil, nil, err) } @@ -175,14 +171,17 @@ func (s *IndexStore) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{ span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - if ent.ID == 0 && ent.OrgID == 0 && ent.Name == "" { - return nil, &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("no key was provided for %s", s.Resource), + _, err := s.EntStore.EntKey(ctx, ent) + if err != nil { + if _, idxErr := s.IndexStore.EntKey(ctx, ent); idxErr != nil { + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "no key was provided for " + s.Resource, + } } } - if ent.ID == 0 { + if err != nil { return s.findByIndex(ctx, tx, ent) } return s.EntStore.FindEnt(ctx, tx, ent) @@ -213,7 +212,7 @@ func (s *IndexStore) findByIndex(ctx context.Context, tx Tx, ent Entity) (interf return nil, err } - indexEnt, err := s.IndexStore.DecodeToEntFn(indexKey, idxEncodedID) + indexEnt, err := s.IndexStore.ConvertValToEntFn(indexKey, idxEncodedID) if err != nil { return nil, err } diff --git a/kv/store_index_test.go b/kv/store_index_test.go index 89184f2c76f..f2183d362f2 100644 --- a/kv/store_index_test.go +++ b/kv/store_index_test.go @@ -38,8 +38,7 @@ func TestIndexStore(t *testing.T) { expected := testPutBase(t, inmem, indexStore, indexStore.EntStore.BktName) key, err := indexStore.IndexStore.EntKey(context.TODO(), kv.Entity{ - OrgID: expected.OrgID, - Name: expected.Name, + UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)), }) require.NoError(t, err) @@ -55,8 +54,7 @@ func TestIndexStore(t *testing.T) { err := inmem.View(context.TODO(), func(tx kv.Tx) error { _, err := indexStore.IndexStore.FindEnt(context.TODO(), tx, kv.Entity{ - OrgID: expected.OrgID, - Name: expected.Name, + UniqueKey: expected.UniqueKey, }) return err }) @@ -109,8 +107,7 @@ func TestIndexStore(t *testing.T) { var actual interface{} view(t, kvStore, func(tx kv.Tx) error { f, err := base.FindEnt(context.TODO(), tx, kv.Entity{ - OrgID: expected.OrgID, - Name: expected.Name, + UniqueKey: expected.UniqueKey, }) actual = f return err diff --git a/kv/variable.go b/kv/variable.go index ba0d846dc85..f4cff7bb103 100644 --- a/kv/variable.go +++ b/kv/variable.go @@ -96,17 +96,16 @@ func newVariableStore() *IndexStore { return Entity{}, err } return Entity{ - ID: v.ID, - Name: v.Name, - OrgID: v.OrganizationID, - Body: v, + PK: EncID(v.ID), + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)), + Body: v, }, nil } return &IndexStore{ Resource: resource, EntStore: NewStoreBase(resource, []byte("variablesv1"), EncIDKey, EncBodyJSON, decodeVarEntFn, decValToEntFn), - IndexStore: NewOrgNameKeyStore(resource, []byte("variablesindexv1"), false), + IndexStore: NewOrgNameKeyStore(resource, []byte("variablesindexv1"), true), } } @@ -195,7 +194,7 @@ func (s *Service) FindVariableByID(ctx context.Context, id influxdb.ID) (*influx } func (s *Service) findVariableByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Variable, error) { - body, err := s.variableStore.FindEnt(ctx, tx, Entity{ID: id}) + body, err := s.variableStore.FindEnt(ctx, tx, Entity{PK: EncID(id)}) if err != nil { return nil, err } @@ -217,8 +216,7 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro v.Name = strings.TrimSpace(v.Name) // TODO: move to service layer _, err := s.variableStore.FindEnt(ctx, tx, Entity{ - OrgID: v.OrganizationID, - Name: v.Name, + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)), }) if err == nil { return &influxdb.Error{ @@ -240,9 +238,8 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error { return s.kv.Update(ctx, func(tx Tx) error { _, err := s.variableStore.FindEnt(ctx, tx, Entity{ - ID: v.ID, - OrgID: v.OrganizationID, - Name: v.Name, + PK: EncID(v.ID), + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)), }) if err == nil { return &influxdb.Error{ @@ -261,10 +258,9 @@ func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) } ent := Entity{ - ID: v.ID, - Name: v.Name, - OrgID: v.OrganizationID, - Body: v, + PK: EncID(v.ID), + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)), + Body: v, } return s.variableStore.Put(ctx, tx, ent) } @@ -286,8 +282,7 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in update.Name = strings.ToLower(strings.TrimSpace(update.Name)) vbytes, err := s.variableStore.FindEnt(ctx, tx, Entity{ - OrgID: v.OrganizationID, - Name: update.Name, + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(update.Name)), }) if err == nil { existingVar, ok := vbytes.(*influxdb.Variable) @@ -303,8 +298,7 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in } err = s.variableStore.IndexStore.DeleteEnt(ctx, tx, Entity{ - OrgID: v.OrganizationID, - Name: v.Name, + UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(update.Name)), }) if err != nil { return err @@ -331,7 +325,7 @@ func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error { if err := s.removeVariableOrgsIndex(tx, v); err != nil { return err } - return s.variableStore.DeleteEnt(ctx, tx, Entity{ID: id}) + return s.variableStore.DeleteEnt(ctx, tx, Entity{PK: EncID(id)}) }) } diff --git a/testing/notification_endpoint.go b/testing/notification_endpoint.go index 4cd6685d454..6c0212edbf6 100644 --- a/testing/notification_endpoint.go +++ b/testing/notification_endpoint.go @@ -423,15 +423,7 @@ func FindNotificationEndpointByID( ctx := context.Background() edp, err := s.FindNotificationEndpointByID(ctx, tt.args.id) - if err != nil { - if tt.wants.err == nil { - require.NoError(t, err) - } - iErr, ok := err.(*influxdb.Error) - require.True(t, ok) - assert.Equal(t, tt.wants.err.Code, iErr.Code) - assert.Truef(t, strings.HasPrefix(iErr.Error(), tt.wants.err.Error()), "got err: %s", err.Error()) - } + influxErrsEqual(t, tt.wants.err, err) if diff := cmp.Diff(edp, tt.wants.notificationEndpoint, notificationEndpointCmpOptions...); diff != "" { t.Errorf("notification endpoint is different -got/+want\ndiff %s", diff) } @@ -2175,5 +2167,5 @@ func influxErrsEqual(t *testing.T, expected *influxdb.Error, actual error) { iErr, ok := actual.(*influxdb.Error) require.True(t, ok) assert.Equal(t, expected.Code, iErr.Code) - assert.Truef(t, strings.HasPrefix(iErr.Error(), expected.Error()), "got err: %s", actual.Error()) + assert.Truef(t, strings.HasPrefix(iErr.Error(), expected.Error()), "expected: %s got err: %s", expected.Error(), actual.Error()) }