From 4fb855feff145d6d0af2b7cb4feb4c45404797cd Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Tue, 31 Dec 2019 10:28:26 -0800 Subject: [PATCH] fix(kv): add legacy variable orgs index back issue here is that the unique by name index for variables was implemented and has the same functionality about it that this orgs index has. The duplicative orgs index was nuked. The migration to hydrate the org/name index never happened. This is a stop gap until that migration is in place. --- kv/service.go | 4 ++ kv/variable.go | 171 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 150 insertions(+), 25 deletions(-) diff --git a/kv/service.go b/kv/service.go index 4f8ca8770ce..f2c576da083 100644 --- a/kv/service.go +++ b/kv/service.go @@ -147,6 +147,10 @@ func (s *Service) Initialize(ctx context.Context) error { return err } + if err := s.initializeVariablesOrgIndex(tx); err != nil { + return err + } + if err := s.initializeChecks(ctx, tx); err != nil { return err } diff --git a/kv/variable.go b/kv/variable.go index c81f46cd4a0..1589c6f24f7 100644 --- a/kv/variable.go +++ b/kv/variable.go @@ -1,6 +1,7 @@ package kv import ( + "bytes" "context" "encoding/json" "fmt" @@ -9,6 +10,78 @@ import ( "github.com/influxdata/influxdb" ) +// TODO: eradicate this with migration strategy +var variableOrgsIndex = []byte("variableorgsv1") + +func (s *Service) initializeVariablesOrgIndex(tx Tx) error { + if _, err := tx.Bucket(variableOrgsIndex); err != nil { + return err + } + return nil +} + +func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) { + if len(indexKey) != 2*influxdb.IDLength { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "malformed variable orgs index key (please report this error)", + } + } + + if err := (&orgID).Decode(indexKey[:influxdb.IDLength]); err != nil { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "bad org id", + Err: influxdb.ErrInvalidID, + } + } + + if err := (&variableID).Decode(indexKey[influxdb.IDLength:]); err != nil { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "bad variable id", + Err: influxdb.ErrInvalidID, + } + } + + return orgID, variableID, nil +} + +func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) { + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return nil, err + } + + // TODO(leodido): support find options + cur, err := idx.Cursor() + if err != nil { + return nil, err + } + + prefix, err := orgID.Encode() + if err != nil { + return nil, err + } + + variables := []*influxdb.Variable{} + for k, _ := cur.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = cur.Next() { + _, id, err := decodeVariableOrgsIndexKey(k) + if err != nil { + return nil, err + } + + m, err := s.findVariableByID(ctx, tx, id) + if err != nil { + return nil, err + } + + variables = append(variables, m) + } + + return variables, nil +} + func newVariableUniqueByNameStore() *IndexStore { const resource = "variable" @@ -37,31 +110,6 @@ func newVariableUniqueByNameStore() *IndexStore { } } -func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) { - prefix, err := orgID.Encode() - if err != nil { - return nil, err - } - - variables := []*influxdb.Variable{} - err = s.variableStore.Find(ctx, tx, FindOpts{ - Prefix: prefix, - CaptureFn: func(key []byte, decodedVal interface{}) error { - v, ok := decodedVal.(*influxdb.Variable) - if err := errUnexpectedDecodeVal(ok); err != nil { - return err - } - variables = append(variables, v) - return nil - }, - }) - if err != nil { - return nil, err - } - - return variables, nil -} - func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.VariableFilter) ([]*influxdb.Variable, error) { if filter.OrganizationID != nil { return s.findOrganizationVariables(ctx, tx, *filter.OrganizationID) @@ -176,6 +224,7 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro now := s.Now() v.CreatedAt = now v.UpdatedAt = now + return s.putVariable(ctx, tx, v) }) } @@ -200,6 +249,10 @@ func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) err } func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error { + if err := s.putVariableOrgsIndex(tx, v); err != nil { + return err + } + ent := Entity{ ID: v.ID, Name: v.Name, @@ -263,6 +316,74 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in // DeleteVariable removes a single variable from the store by its ID func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error { return s.kv.Update(ctx, func(tx Tx) error { + v, err := s.findVariableByID(ctx, tx, id) + if err != nil { + return err + } + + if err := s.removeVariableOrgsIndex(tx, v); err != nil { + return err + } return s.variableStore.DeleteEnt(ctx, tx, Entity{ID: id}) }) } + +func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) { + oID, err := variable.OrganizationID.Encode() + if err != nil { + return nil, &influxdb.Error{ + Err: err, + Msg: "bad organization id", + } + } + + mID, err := variable.ID.Encode() + if err != nil { + return nil, &influxdb.Error{ + Err: err, + Msg: "bad variable id", + } + } + + key := make([]byte, 0, influxdb.IDLength*2) + key = append(key, oID...) + key = append(key, mID...) + + return key, nil +} + +func (s *Service) putVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error { + key, err := encodeVariableOrgsIndex(variable) + if err != nil { + return err + } + + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + if err := idx.Put(key, nil); err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + return nil +} + +func (s *Service) removeVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error { + key, err := encodeVariableOrgsIndex(variable) + if err != nil { + return err + } + + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + if err := idx.Delete(key); err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + return nil +}