Skip to content

Commit

Permalink
fix(kv): add legacy variable orgs index back
Browse files Browse the repository at this point in the history
issue here is that the unique by name index for variables was implemented
and has the same functionality about it that this orgs index has. The duplicative
orgs index was nuked. The migration to hydrate the org/name index never
happened. This is a stop gap until that migration is in place.
  • Loading branch information
jsteenb2 committed Dec 31, 2019
1 parent a926700 commit 4fb855f
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 25 deletions.
4 changes: 4 additions & 0 deletions kv/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (s *Service) Initialize(ctx context.Context) error {
return err
}

if err := s.initializeVariablesOrgIndex(tx); err != nil {
return err
}

if err := s.initializeChecks(ctx, tx); err != nil {
return err
}
Expand Down
171 changes: 146 additions & 25 deletions kv/variable.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -9,6 +10,78 @@ import (
"github.com/influxdata/influxdb"
)

// TODO: eradicate this with migration strategy
var variableOrgsIndex = []byte("variableorgsv1")

func (s *Service) initializeVariablesOrgIndex(tx Tx) error {
if _, err := tx.Bucket(variableOrgsIndex); err != nil {
return err
}
return nil
}

func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) {
if len(indexKey) != 2*influxdb.IDLength {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "malformed variable orgs index key (please report this error)",
}
}

if err := (&orgID).Decode(indexKey[:influxdb.IDLength]); err != nil {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "bad org id",
Err: influxdb.ErrInvalidID,
}
}

if err := (&variableID).Decode(indexKey[influxdb.IDLength:]); err != nil {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "bad variable id",
Err: influxdb.ErrInvalidID,
}
}

return orgID, variableID, nil
}

func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) {
idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return nil, err
}

// TODO(leodido): support find options
cur, err := idx.Cursor()
if err != nil {
return nil, err
}

prefix, err := orgID.Encode()
if err != nil {
return nil, err
}

variables := []*influxdb.Variable{}
for k, _ := cur.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = cur.Next() {
_, id, err := decodeVariableOrgsIndexKey(k)
if err != nil {
return nil, err
}

m, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return nil, err
}

variables = append(variables, m)
}

return variables, nil
}

func newVariableUniqueByNameStore() *IndexStore {
const resource = "variable"

Expand Down Expand Up @@ -37,31 +110,6 @@ func newVariableUniqueByNameStore() *IndexStore {
}
}

func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) {
prefix, err := orgID.Encode()
if err != nil {
return nil, err
}

variables := []*influxdb.Variable{}
err = s.variableStore.Find(ctx, tx, FindOpts{
Prefix: prefix,
CaptureFn: func(key []byte, decodedVal interface{}) error {
v, ok := decodedVal.(*influxdb.Variable)
if err := errUnexpectedDecodeVal(ok); err != nil {
return err
}
variables = append(variables, v)
return nil
},
})
if err != nil {
return nil, err
}

return variables, nil
}

func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.VariableFilter) ([]*influxdb.Variable, error) {
if filter.OrganizationID != nil {
return s.findOrganizationVariables(ctx, tx, *filter.OrganizationID)
Expand Down Expand Up @@ -176,6 +224,7 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro
now := s.Now()
v.CreatedAt = now
v.UpdatedAt = now

return s.putVariable(ctx, tx, v)
})
}
Expand All @@ -200,6 +249,10 @@ func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) err
}

func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error {
if err := s.putVariableOrgsIndex(tx, v); err != nil {
return err
}

ent := Entity{
ID: v.ID,
Name: v.Name,
Expand Down Expand Up @@ -263,6 +316,74 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in
// DeleteVariable removes a single variable from the store by its ID
func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
v, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return err
}

if err := s.removeVariableOrgsIndex(tx, v); err != nil {
return err
}
return s.variableStore.DeleteEnt(ctx, tx, Entity{ID: id})
})
}

func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) {
oID, err := variable.OrganizationID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad organization id",
}
}

mID, err := variable.ID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad variable id",
}
}

key := make([]byte, 0, influxdb.IDLength*2)
key = append(key, oID...)
key = append(key, mID...)

return key, nil
}

func (s *Service) putVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
key, err := encodeVariableOrgsIndex(variable)
if err != nil {
return err
}

idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}

if err := idx.Put(key, nil); err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}

return nil
}

func (s *Service) removeVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
key, err := encodeVariableOrgsIndex(variable)
if err != nil {
return err
}

idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}

if err := idx.Delete(key); err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}

return nil
}

0 comments on commit 4fb855f

Please sign in to comment.