Skip to content

Commit

Permalink
chore(kv): refactor kv.Entity to provide interface for PK and unique …
Browse files Browse the repository at this point in the history
…keys
  • Loading branch information
jsteenb2 authored and alexpaxton committed Jan 9, 2020
1 parent 5560623 commit 7fe5cf9
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 176 deletions.
46 changes: 22 additions & 24 deletions kv/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,26 @@ func newCheckStore() *IndexStore {
const resource = "check"

var decEndpointEntFn DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
edp, err := check.UnmarshalJSON(val)
return key, edp, err
ch, err := check.UnmarshalJSON(val)
return key, ch, err
}

var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) {
edp, ok := v.(influxdb.Check)
ch, ok := v.(influxdb.Check)
if err := errUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(ch.GetID()),
UniqueKey: Encode(EncID(ch.GetOrgID()), EncString(ch.GetName())),
Body: ch,
}, nil
}

return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, []byte("checksv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, []byte("checkindexv1"), true),
IndexStore: NewOrgNameKeyStore(resource, []byte("checkindexv1"), false),
}
}

Expand All @@ -61,7 +60,7 @@ func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.C
}

func (s *Service) findCheckByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.Check, error) {
chkVal, err := s.checkStore.FindEnt(ctx, tx, Entity{ID: id})
chkVal, err := s.checkStore.FindEnt(ctx, tx, Entity{PK: EncID(id)})
if err != nil {
return nil, err
}
Expand All @@ -73,8 +72,7 @@ func (s *Service) findCheckByName(ctx context.Context, tx Tx, orgID influxdb.ID,
defer span.Finish()

chVal, err := s.checkStore.FindEnt(ctx, tx, Entity{
OrgID: orgID,
Name: name,
UniqueKey: Encode(EncID(orgID), EncString(name)),
})
if IsNotFound(err) {
return nil, &influxdb.Error{
Expand Down Expand Up @@ -118,7 +116,7 @@ func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (i

var prefix []byte
if filter.OrgID != nil {
ent := Entity{OrgID: *filter.OrgID}
ent := Entity{UniqueKey: EncID(*filter.OrgID)}
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent)
}
filterFn := filterChecksFn(nil, filter)
Expand Down Expand Up @@ -203,11 +201,11 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o

var prefix []byte
if filter.OrgID != nil {
ent := Entity{OrgID: *filter.OrgID}
encs := []EncodeFn{EncID(*filter.OrgID)}
if filter.Name != nil {
ent.Name = *filter.Name
encs = append(encs, EncString(*filter.Name))
}
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent)
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, Entity{UniqueKey: Encode(encs...)})
}

var opt influxdb.FindOptions
Expand Down Expand Up @@ -338,10 +336,9 @@ func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {

func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
return s.checkStore.Put(ctx, tx, Entity{
ID: c.GetID(),
Name: c.GetName(),
OrgID: c.GetOrgID(),
Body: c,
PK: EncID(c.GetID()),
UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())),
Body: c,
})
}

Expand Down Expand Up @@ -400,8 +397,7 @@ func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk in
}

ent := Entity{
OrgID: current.GetOrgID(),
Name: current.GetName(),
UniqueKey: Encode(EncID(current.GetOrgID()), EncString(current.GetName())),
}
if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil {
return nil, err
Expand Down Expand Up @@ -468,8 +464,7 @@ func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}

ent := Entity{
OrgID: c.GetOrgID(),
Name: c.GetName(),
UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())),
}
if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil {
return nil, err
Expand Down Expand Up @@ -513,7 +508,10 @@ func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error {
}

return s.kv.Update(ctx, func(tx Tx) error {
if err := s.checkStore.DeleteEnt(ctx, tx, Entity{ID: id}); err != nil {
err := s.checkStore.DeleteEnt(ctx, tx, Entity{
PK: EncID(id),
})
if err != nil {
return err
}

Expand Down
49 changes: 49 additions & 0 deletions kv/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kv

import (
"errors"
"strings"

"github.com/influxdata/influxdb"
)

// EncodeFn returns an encoding when called. Closures are your friend here.
type EncodeFn func() ([]byte, error)

// Encode concatenates a list of encodings together.
func Encode(encodings ...EncodeFn) EncodeFn {
return func() ([]byte, error) {
var key []byte
for _, enc := range encodings {
part, err := enc()
if err != nil {
return key, err
}
key = append(key, part...)
}
return key, nil
}
}

// EncString encodes a string.
func EncString(str string) EncodeFn {
return func() ([]byte, error) {
return []byte(str), nil
}
}

// EncStringCaseInsensitive encodes a string and makes it case insensitive by lower casing
// everything.
func EncStringCaseInsensitive(str string) EncodeFn {
return EncString(strings.ToLower(str))
}

// EncID encodes an influx ID.
func EncID(id influxdb.ID) EncodeFn {
return func() ([]byte, error) {
if id == 0 {
return nil, errors.New("no ID was provided")
}
return id.Encode()
}
}
50 changes: 21 additions & 29 deletions kv/notification_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ func newEndpointStore() *IndexStore {
return Entity{}, err
}
return Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}, nil
}

return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, []byte("notificationEndpointv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, []byte("notificationEndpointIndexv1"), true),
IndexStore: NewOrgNameKeyStore(resource, []byte("notificationEndpointIndexv1"), false),
}
}

Expand Down Expand Up @@ -85,10 +84,9 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf
}

ent := Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
return err
Expand All @@ -108,8 +106,7 @@ func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, org
defer span.Finish()

body, err := s.endpointStore.FindEnt(ctx, tx, Entity{
OrgID: orgID,
Name: name,
UniqueKey: Encode(EncID(orgID), EncString(name)),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,8 +144,7 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl
}

err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{
OrgID: edp.GetOrgID(),
Name: curName,
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(curName)),
})
if err != nil {
return nil, err
Expand All @@ -166,10 +162,9 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl
}

ent := Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
return nil, err
Expand Down Expand Up @@ -210,8 +205,7 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ
}

err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{
OrgID: edp.GetOrgID(),
Name: edp.GetName(),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
})
if err != nil {
return nil, err
Expand All @@ -236,10 +230,9 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ
// TODO(jsteenb2): every above here moves into service layer

ent := Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
return nil, err
Expand All @@ -258,10 +251,9 @@ func (s *Service) PutNotificationEndpoint(ctx context.Context, edp influxdb.Noti

return s.kv.Update(ctx, func(tx Tx) (err error) {
ent := Entity{
ID: edp.GetID(),
Name: edp.GetName(),
OrgID: edp.GetOrgID(),
Body: edp,
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
return s.endpointStore.Put(ctx, tx, ent)
})
Expand All @@ -283,7 +275,7 @@ func (s *Service) FindNotificationEndpointByID(ctx context.Context, id influxdb.
}

func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, Entity{ID: id})
decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, Entity{PK: EncID(id)})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -386,7 +378,7 @@ func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id infl
return nil, 0, err
}

if err := s.endpointStore.DeleteEnt(ctx, tx, Entity{ID: id}); err != nil {
if err := s.endpointStore.DeleteEnt(ctx, tx, Entity{PK: EncID(id)}); err != nil {
return nil, 0, err
}

Expand Down
Loading

0 comments on commit 7fe5cf9

Please sign in to comment.