Skip to content

Commit

Permalink
Implement named entity visibility (flyteorg#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Apr 7, 2020
1 parent e7d14f9 commit a7c35ce
Show file tree
Hide file tree
Showing 30 changed files with 307 additions and 305 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.17.24
github.com/lyft/flyteidl v0.17.27
github.com/lyft/flytepropeller v0.2.13
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
Expand Down
53 changes: 11 additions & 42 deletions flyteadmin/go.sum

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions flyteadmin/pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
type Entity = string

const (
Execution = "e"
LaunchPlan = "l"
NodeExecution = "ne"
NodeExecutionEvent = "nee"
Task = "t"
TaskExecution = "te"
Workflow = "w"
Execution = "e"
LaunchPlan = "l"
NodeExecution = "ne"
NodeExecutionEvent = "nee"
Task = "t"
TaskExecution = "te"
Workflow = "w"
NamedEntityMetadata = "nem"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
63 changes: 51 additions & 12 deletions flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"context"
"fmt"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"google.golang.org/grpc/codes"
)

Expand Down Expand Up @@ -132,6 +131,8 @@ func GetInvalidRepeatedValueFilterErr(expression FilterExpression) error {
type InlineFilter interface {
// Returns the entity for which this filter should be applied.
GetEntity() Entity
// Returns the column filtered on.
GetField() string
// Generates fields necessary to add a filter to a gorm database query.
GetGormQueryExpr() (GormQueryExpr, error)
// Generates fields necessary to add a filter on a gorm database join query.
Expand All @@ -153,7 +154,11 @@ func (f *inlineFilterImpl) GetEntity() Entity {
return f.entity
}

func (f *inlineFilterImpl) GetGormQueryExpr() (GormQueryExpr, error) {
func (f *inlineFilterImpl) GetField() string {
return f.field
}

func (f *inlineFilterImpl) getGormQueryExpr(formattedField string) (GormQueryExpr, error) {

// ValueIn is special because it uses repeating values.
if f.function == ValueIn {
Expand All @@ -168,54 +173,58 @@ func (f *inlineFilterImpl) GetGormQueryExpr() (GormQueryExpr, error) {
case Contains:
return GormQueryExpr{
// WHERE field LIKE %value%
Query: fmt.Sprintf(containsQuery, f.field),
Query: fmt.Sprintf(containsQuery, formattedField),
// args renders to something like: "%value%"
Args: fmt.Sprintf(containsArgs, f.value),
}, nil
case GreaterThan:
return GormQueryExpr{
// WHERE field > value
Query: fmt.Sprintf(greaterThanQuery, f.field),
Query: fmt.Sprintf(greaterThanQuery, formattedField),
Args: f.value,
}, nil
case GreaterThanOrEqual:
return GormQueryExpr{
// WHERE field >= value
Query: fmt.Sprintf(greaterThanOrEqualQuery, f.field),
Query: fmt.Sprintf(greaterThanOrEqualQuery, formattedField),
Args: f.value,
}, nil
case LessThan:
return GormQueryExpr{
// WHERE field < value
Query: fmt.Sprintf(lessThanQuery, f.field),
Query: fmt.Sprintf(lessThanQuery, formattedField),
Args: f.value,
}, nil
case LessThanOrEqual:
return GormQueryExpr{
// WHERE field <= value
Query: fmt.Sprintf(lessThanOrEqualQuery, f.field),
Query: fmt.Sprintf(lessThanOrEqualQuery, formattedField),
Args: f.value,
}, nil
case Equal:
return GormQueryExpr{
// WHERE field = value
Query: fmt.Sprintf(equalQuery, f.field),
Query: fmt.Sprintf(equalQuery, formattedField),
Args: f.value,
}, nil
case NotEqual:
return GormQueryExpr{
// WHERE field <> value
Query: fmt.Sprintf(notEqualQuery, f.field),
Query: fmt.Sprintf(notEqualQuery, formattedField),
Args: f.value,
}, nil
}
logger.Debugf(context.Background(), "can't create gorm query expr for %s", getFilterExpressionName(f.function))
return GormQueryExpr{}, GetUnsupportedFilterExpressionErr(f.function)
}

func (f *inlineFilterImpl) GetGormQueryExpr() (GormQueryExpr, error) {
return f.getGormQueryExpr(f.field)
}

func (f *inlineFilterImpl) GetGormJoinTableQueryExpr(tableName string) (GormQueryExpr, error) {
f.field = fmt.Sprintf(joinArgsFormat, tableName, f.field)
return f.GetGormQueryExpr()
formattedField := fmt.Sprintf(joinArgsFormat, tableName, f.field)
return f.getGormQueryExpr(formattedField)
}

func customizeField(field string, entity Entity) string {
Expand Down Expand Up @@ -284,3 +293,33 @@ func NewMapFilter(filter map[string]interface{}) MapFilter {
filter: filter,
}
}

const queryWithDefaultFmt = "COALESCE(%s, %v)"

type withDefaultValueFilter struct {
inlineFilterImpl
defaultValue interface{}
}

func (f *withDefaultValueFilter) GetGormQueryExpr() (GormQueryExpr, error) {
formattedField := fmt.Sprintf(queryWithDefaultFmt, f.GetField(), f.defaultValue)
return f.getGormQueryExpr(formattedField)
}

func (f *withDefaultValueFilter) GetGormJoinTableQueryExpr(tableName string) (GormQueryExpr, error) {
formattedField := fmt.Sprintf(queryWithDefaultFmt, fmt.Sprintf(joinArgsFormat, tableName, f.GetField()), f.defaultValue)
return f.getGormQueryExpr(formattedField)
}

func NewWithDefaultValueFilter(defaultValue interface{}, filter InlineFilter) (InlineFilter, error) {
inlineFilter, ok := filter.(*inlineFilterImpl)
if !ok {
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to create default value filter for [%s] because the system encountered an unknown filter type",
filter.GetField())
}
return &withDefaultValueFilter{
inlineFilterImpl: *inlineFilter,
defaultValue: defaultValue,
}, nil
}
19 changes: 19 additions & 0 deletions flyteadmin/pkg/common/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,22 @@ func TestMapFilter(t *testing.T) {
}
assert.EqualValues(t, mapFilterValue, NewMapFilter(mapFilterValue).GetFilter())
}

func TestWithDefaultValueFilter(t *testing.T) {
filter, err := NewSingleValueFilter(NamedEntityMetadata, Equal, "state", 1)
assert.NoError(t, err)

filterWithDefaultValue, err := NewWithDefaultValueFilter(0, filter)
assert.NoError(t, err)

queryExpression, err := filterWithDefaultValue.GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "COALESCE(state, 0) = ?", queryExpression.Query)
assert.Equal(t, 1, queryExpression.Args)

queryExpression, err = filterWithDefaultValue.GetGormJoinTableQueryExpr(
"named_entity_metadata")
assert.NoError(t, err)
assert.Equal(t, "COALESCE(named_entity_metadata.state, 0) = ?", queryExpression.Query)
assert.Equal(t, 1, queryExpression.Args)
}
37 changes: 36 additions & 1 deletion flyteadmin/pkg/manager/impl/named_entity_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"context"
"strconv"
"strings"

"github.com/lyft/flytestdlib/contextutils"

Expand All @@ -22,6 +23,8 @@ import (
"github.com/lyft/flytestdlib/promutils"
)

const state = "state"

type NamedEntityMetrics struct {
Scope promutils.Scope
}
Expand Down Expand Up @@ -65,6 +68,31 @@ func (m *NamedEntityManager) GetNamedEntity(ctx context.Context, request admin.N
return util.GetNamedEntity(ctx, m.db, request.ResourceType, *request.Id)
}

func (m *NamedEntityManager) updateQueryFilters(identityFilters []common.InlineFilter, requestFilters string) (
[]common.InlineFilter, error) {
if len(requestFilters) == 0 {
return identityFilters, nil
}
additionalFilters, err := util.ParseFilters(requestFilters, common.NamedEntityMetadata)
if err != nil {
return nil, err
}
var finalizedFilters = identityFilters
for _, filter := range additionalFilters {
if strings.Contains(filter.GetField(), state) {
filterWithDefaultValue, err := common.NewWithDefaultValueFilter(
strconv.Itoa(int(admin.NamedEntityState_NAMED_ENTITY_ACTIVE)), filter)
if err != nil {
return nil, err
}
finalizedFilters = append(finalizedFilters, filterWithDefaultValue)
} else {
finalizedFilters = append(finalizedFilters, filter)
}
}
return finalizedFilters, nil
}

func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request admin.NamedEntityListRequest) (
*admin.NamedEntityList, error) {
if err := validation.ValidateNamedEntityListRequest(request); err != nil {
Expand All @@ -73,13 +101,20 @@ func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request admi
}
ctx = contextutils.WithProjectDomain(ctx, request.Project, request.Domain)

filters, err := util.GetDbFilters(util.FilterSpec{
identifierFilters, err := util.GetDbFilters(util.FilterSpec{
Project: request.Project,
Domain: request.Domain,
}, common.ResourceTypeToEntity[request.ResourceType])
if err != nil {
return nil, err
}
// HACK: In order to filter by state (if requested) - we need to amend the filter to use COALESCE
// e.g. eq(state, 1) becomes 'WHERE (COALESCE(state, 0) = '1')' since not every NamedEntity necessarily
// has an entry, and therefore the default state value '0' (active), should be assumed.
filters, err := m.updateQueryFilters(identifierFilters, request.Filters)
if err != nil {
return nil, err
}
var sortParameter common.SortParameter
if request.SortBy != nil {
sortParameter, err = common.NewSortParameter(*request.SortBy)
Expand Down
22 changes: 22 additions & 0 deletions flyteadmin/pkg/manager/impl/named_entity_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/lyft/flyteadmin/pkg/common"

"github.com/lyft/flyteadmin/pkg/manager/impl/testutils"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteadmin/pkg/repositories/interfaces"
Expand Down Expand Up @@ -84,6 +86,26 @@ func TestNamedEntityManager_Get_BadRequest(t *testing.T) {
assert.Nil(t, response)
}

func TestNamedEntityManager_UpdateQueryFilters(t *testing.T) {
identityFilter, err := common.NewSingleValueFilter(common.NamedEntityMetadata, common.Equal, "project", "proj")
assert.NoError(t, err)

repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
updatedFilters, err := manager.(*NamedEntityManager).updateQueryFilters([]common.InlineFilter{
identityFilter,
}, "eq(state, 0)")
assert.NoError(t, err)
assert.Len(t, updatedFilters, 2)

assert.Equal(t, "project", updatedFilters[0].GetField())
assert.Equal(t, "state", updatedFilters[1].GetField())
queryExp, err := updatedFilters[1].GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "COALESCE(state, 0) = ?", queryExp.Query)
assert.Equal(t, "0", queryExp.Args)
}

func TestNamedEntityManager_Update(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
Expand Down
13 changes: 7 additions & 6 deletions flyteadmin/pkg/manager/impl/util/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ const filterFieldEntityPrefixFmt = "%s."
const secondsFormat = "%vs"

var filterFieldEntityPrefix = map[string]common.Entity{
"task": common.Task,
"workflow": common.Workflow,
"launch_plan": common.LaunchPlan,
"execution": common.Execution,
"node_execution": common.NodeExecution,
"task_execution": common.TaskExecution,
"task": common.Task,
"workflow": common.Workflow,
"launch_plan": common.LaunchPlan,
"execution": common.Execution,
"node_execution": common.NodeExecution,
"task_execution": common.TaskExecution,
"named_entity_metadata": common.NamedEntityMetadata,
}

func parseField(field string, primaryEntity common.Entity) (common.Entity, string) {
Expand Down
10 changes: 10 additions & 0 deletions flyteadmin/pkg/manager/impl/validation/named_entity_validator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package validation

import (
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"
)

func ValidateNamedEntityGetRequest(request admin.NamedEntityGetRequest) error {
Expand All @@ -25,6 +28,13 @@ func ValidateNamedEntityUpdateRequest(request admin.NamedEntityUpdateRequest) er
if request.Metadata == nil {
return shared.GetMissingArgumentError(shared.Metadata)
}

// Anything but the default state is only permitted for workflow resources.
if request.Metadata.State != admin.NamedEntityState_NAMED_ENTITY_ACTIVE &&
request.ResourceType != core.ResourceType_WORKFLOW {
return errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Only workflow name entities can have their state updated")
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package validation
import (
"testing"

"github.com/lyft/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -105,6 +108,28 @@ func TestValidateNamedEntityUpdateRequest(t *testing.T) {
Name: "name",
},
}))
assert.Equal(t, codes.InvalidArgument, ValidateNamedEntityUpdateRequest(admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Metadata: &admin.NamedEntityMetadata{
State: admin.NamedEntityState_NAMED_ENTITY_ARCHIVED,
},
}).(errors.FlyteAdminError).Code())
assert.Nil(t, ValidateNamedEntityUpdateRequest(admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_WORKFLOW,
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Metadata: &admin.NamedEntityMetadata{
State: admin.NamedEntityState_NAMED_ENTITY_ARCHIVED,
},
}))
}

func TestValidateNamedEntityListRequest(t *testing.T) {
Expand Down
Loading

0 comments on commit a7c35ce

Please sign in to comment.