Skip to content

Commit

Permalink
feat(kv): Index Authorizations by User ID (#16818)
Browse files Browse the repository at this point in the history
* feat(kv): add user id index on authorizations

* chore(auths): test FindAuthorizations both with and without a populated index

* chore(kv): cleanup index skipping flag in auths service

* fix(kv): bad flag around auth by user index population

* fix(kv): auth by user index lookup use correct buckets

* chore(kv): ensure indexer is called as expected when auth user index missing

* chore(kv): add benchmarks around authorization lookup
  • Loading branch information
GeorgeMac authored Feb 11, 2020
1 parent 9561d0a commit df36fe9
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 56 deletions.
13 changes: 9 additions & 4 deletions http/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/influxdata/httprouter"
Expand Down Expand Up @@ -857,12 +858,12 @@ func TestService_handleDeleteAuthorization(t *testing.T) {

func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing.T) (platform.AuthorizationService, string, func()) {
t.Helper()
if t.Name() == "TestAuthorizationService_FindAuthorizations/find_authorization_by_token" {
if strings.Contains(t.Name(), "find_authorization_by_token") {
/*
TODO(goller): need a secure way to communicate get
authorization by token string via headers or something
*/
t.Skip("TestAuthorizationService_FindAuthorizations/find_authorization_by_token skipped because user tokens cannot be queried")
t.Skipf("%s skipped because user tokens cannot be queried", t.Name())
}

if t.Name() == "TestAuthorizationService_CreateAuthorization/providing_a_non_existing_user_is_invalid" {
Expand Down Expand Up @@ -957,11 +958,15 @@ func TestAuthorizationService_FindAuthorizationByToken(t *testing.T) {
}

func TestAuthorizationService_FindAuthorizations(t *testing.T) {
platformtesting.FindAuthorizations(initAuthorizationService, t)
// with pre-populated index
platformtesting.FindAuthorizations(initAuthorizationService, t, true)
// without pre-populated index
platformtesting.FindAuthorizations(initAuthorizationService, t, false)
}

func TestAuthorizationService_DeleteAuthorization(t *testing.T) {
platformtesting.DeleteAuthorization(initAuthorizationService, t)
// without pre-populated index
platformtesting.DeleteAuthorization(initAuthorizationService, t, true)
}

func TestAuthorizationService_UpdateAuthorization(t *testing.T) {
Expand Down
237 changes: 211 additions & 26 deletions kv/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kv
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/buger/jsonparser"
Expand All @@ -11,11 +12,16 @@ import (
)

var (
authBucket = []byte("authorizationsv1")
authIndex = []byte("authorizationindexv1")
)
authBucket = []byte("authorizationsv1")
authIndex = []byte("authorizationindexv1")
authByUserIndex = []byte("authorizationbyuserindexv1")

_ influxdb.AuthorizationService = (*Service)(nil)

var _ influxdb.AuthorizationService = (*Service)(nil)
// ErrMissingUserFromFilter is returned when a lookup by user is performed
// but neither a user ID or user resource is provided
ErrMissingUserFromFilter = errors.New("no user parameter in filter")
)

func (s *Service) initializeAuths(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(authBucket); err != nil {
Expand All @@ -24,6 +30,9 @@ func (s *Service) initializeAuths(ctx context.Context, tx Tx) error {
if _, err := authIndexBucket(tx); err != nil {
return err
}
if _, err := authByUserIndexBucket(tx); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -129,6 +138,80 @@ func (s *Service) findAuthorizationByToken(ctx context.Context, tx Tx, n string)
return s.findAuthorizationByID(ctx, tx, id)
}

func (s *Service) findAuthorizationsByUser(ctx context.Context, tx Tx, filter influxdb.AuthorizationFilter) (auths []*influxdb.Authorization, userID *influxdb.ID, err error) {
userID = filter.UserID
if userID == nil && filter.User != nil {
user, err := s.findUserByName(ctx, tx, *filter.User)
if err != nil {
return nil, nil, err
}

userID = &user.ID
}

if userID == nil {
err = ErrMissingUserFromFilter
return
}

var (
prefix = authByUserIndexPrefix(*userID)
filterFn = filterAuthorizationsFn(filter)
wrapInternal = func(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
)

bkt, err := tx.Bucket(authBucket)
if err != nil {
return nil, nil, err
}

idx, err := authByUserIndexBucket(tx)
if err != nil {
return nil, nil, err
}

// index scan
cursor, err := idx.ForwardCursor(prefix, WithCursorPrefix(prefix))
if err != nil {
return nil, nil, wrapInternal(err)
}

for k, v := cursor.Next(); k != nil && v != nil; k, v = cursor.Next() {
v, err := bkt.Get(v)
if err != nil {
return nil, nil, err
}

// preallocate Permissions to reduce multiple slice re-allocations
a := &influxdb.Authorization{
Permissions: make([]influxdb.Permission, 64),
}

if err := decodeAuthorization(v, a); err != nil {
return nil, nil, err
}

if filterFn(a) {
auths = append(auths, a)
}
}

if err := cursor.Err(); err != nil {
return nil, nil, wrapInternal(err)
}

if err := cursor.Close(); err != nil {
return nil, nil, wrapInternal(err)
}

return
}

func authorizationsPredicateFn(f influxdb.AuthorizationFilter) CursorPredicateFunc {
// if any errors occur reading the JSON data, the predicate will always return true
// to ensure the value is included and handled higher up.
Expand Down Expand Up @@ -243,14 +326,42 @@ func (s *Service) FindAuthorizations(ctx context.Context, filter influxdb.Author
return []*influxdb.Authorization{a}, 1, nil
}

as := []*influxdb.Authorization{}
err := s.kv.View(ctx, func(tx Tx) error {
auths, err := s.findAuthorizations(ctx, tx, filter)
if err != nil {
var (
auths []*influxdb.Authorization
err error
findOptions []findOption
)

if filter.UserID != nil || filter.User != nil {
var userID *influxdb.ID

// attempt index lookup
if err := s.kv.View(ctx, func(tx Tx) error {
auths, userID, err = s.findAuthorizationsByUser(ctx, tx, filter)
return err
}); err != nil {
return nil, 0, &influxdb.Error{
Err: err,
}
}
as = auths
return nil

if len(auths) > 0 {
return auths, len(auths), nil
}

// when found using full keyspace scan then publish authorization
// to indexer
findOptions = append(findOptions, withVisitFunc(func(a *influxdb.Authorization) {
id, _ := a.ID.Encode()
s.indexer.AddToIndex(authByUserIndex, map[string][]byte{
authByUserIndexKey(*userID, a.ID): id,
})
}))
}

err = s.kv.View(ctx, func(tx Tx) error {
auths, err = s.findAuthorizations(ctx, tx, filter, findOptions...)
return err
})

if err != nil {
Expand All @@ -259,10 +370,34 @@ func (s *Service) FindAuthorizations(ctx context.Context, filter influxdb.Author
}
}

return as, len(as), nil
return auths, len(auths), nil
}

func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.AuthorizationFilter) ([]*influxdb.Authorization, error) {
type findConfig struct {
visit func(*influxdb.Authorization)
}

func newFindConfig(opts ...findOption) findConfig {
config := findConfig{
visit: func(*influxdb.Authorization) {},
}

for _, opt := range opts {
opt(&config)
}

return config
}

type findOption func(*findConfig)

func withVisitFunc(fn func(*influxdb.Authorization)) findOption {
return func(c *findConfig) {
c.visit = fn
}
}

func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.AuthorizationFilter, opts ...findOption) ([]*influxdb.Authorization, error) {
// If the users name was provided, look up user by ID first
if f.User != nil {
u, err := s.findUserByName(ctx, tx, *f.User)
Expand All @@ -280,20 +415,23 @@ func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.Auth
f.OrgID = &o.ID
}

var as []*influxdb.Authorization
pred := authorizationsPredicateFn(f)
filterFn := filterAuthorizationsFn(f)
err := s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
if filterFn(a) {
as = append(as, a)
}
return true
})
if err != nil {
return nil, err
}
var (
conf = newFindConfig(opts...)
as []*influxdb.Authorization
pred = authorizationsPredicateFn(f)
filterFn = filterAuthorizationsFn(f)
err = s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
if filterFn(a) {
// visit using find config visit func
conf.visit(a)
// append to resulting slice
as = append(as, a)
}
return true
})
)

return as, nil
return as, err
}

// CreateAuthorization creates a influxdb authorization and sets b.ID, and b.UserID if not provided.
Expand Down Expand Up @@ -345,6 +483,12 @@ func (s *Service) createAuthorization(ctx context.Context, tx Tx, a *influxdb.Au
return nil
}

type authSkipIndexOnPutContextKey struct{}

func authDoSkipIndexOnPut(ctx context.Context) bool {
return ctx.Value(authSkipIndexOnPutContextKey{}) != nil
}

// PutAuthorization will put a authorization without setting an ID.
func (s *Service) PutAuthorization(ctx context.Context, a *influxdb.Authorization) error {
return s.kv.Update(ctx, func(tx Tx) error {
Expand Down Expand Up @@ -396,6 +540,23 @@ func (s *Service) putAuthorization(ctx context.Context, tx Tx, a *influxdb.Autho
}
}

// this should only be configurable via test package
// it is used to test behavior with empty caches
if !authDoSkipIndexOnPut(ctx) {
idx, err := authByUserIndexBucket(tx)
if err != nil {
return err
}

fk := authByUserIndexKey(a.UserID, a.ID)
if err := idx.Put([]byte(fk), encodedID); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
}

b, err := tx.Bucket(authBucket)
if err != nil {
return err
Expand All @@ -414,6 +575,16 @@ func authIndexKey(n string) []byte {
return []byte(n)
}

func authByUserIndexKey(userID, authID influxdb.ID) string {
id, _ := authID.Encode()
return string(append(authByUserIndexPrefix(userID), id...))
}

func authByUserIndexPrefix(userID influxdb.ID) []byte {
id, _ := userID.Encode()
return append(id, '/')
}

func decodeAuthorization(b []byte, a *influxdb.Authorization) error {
if err := json.Unmarshal(b, a); err != nil {
return err
Expand Down Expand Up @@ -481,6 +652,11 @@ func (s *Service) deleteAuthorization(ctx context.Context, tx Tx, id influxdb.ID
Err: err,
}
}

if err := idx.Delete([]byte(authByUserIndexKey(a.UserID, id))); err != nil {
return err
}

encodedID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Expand Down Expand Up @@ -536,7 +712,16 @@ func (s *Service) updateAuthorization(ctx context.Context, tx Tx, id influxdb.ID
}

func authIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket([]byte(authIndex))
b, err := tx.Bucket(authIndex)
if err != nil {
return nil, UnexpectedAuthIndexError(err)
}

return b, nil
}

func authByUserIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(authByUserIndex)
if err != nil {
return nil, UnexpectedAuthIndexError(err)
}
Expand Down
Loading

0 comments on commit df36fe9

Please sign in to comment.