Skip to content

Commit

Permalink
Revert "SVR-468: Batch provision execution namespaces and support del…
Browse files Browse the repository at this point in the history
…ete" (#272)

Reverts unionai/flyte#246
  • Loading branch information
katrogan authored May 14, 2024
1 parent ae82652 commit 2fada83
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 358 deletions.
266 changes: 42 additions & 224 deletions flyteadmin/pkg/clusterresource/controller.go

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions flyteadmin/pkg/clusterresource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/md5" // #nosec
"io/ioutil"
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -30,23 +29,22 @@ const domain = "domain-bar"
var testScope = mockScope.NewTestScope()

func TestTemplateAlreadyApplied(t *testing.T) {
ctx := context.TODO()
const namespace = "namespace"
const fileName = "fileName"
testController := controller{
metrics: newMetrics(testScope),
}
checksum1 := md5.Sum([]byte("template1")) // #nosec
checksum2 := md5.Sum([]byte("template2")) // #nosec
assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))

testController.appliedNamespaceTemplateChecksums = sync.Map{}
testController.setTemplateChecksum(ctx, namespace, fileName, checksum1)
assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2))
testController.appliedTemplates = make(map[string]TemplateChecksums)
testController.setTemplateChecksum(namespace, fileName, checksum1)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))

testController.setTemplateChecksum(ctx, namespace, fileName, checksum2)
assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2))
testController.setTemplateChecksum(namespace, fileName, checksum2)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))
}

func TestPopulateTemplateValues(t *testing.T) {
Expand Down
31 changes: 8 additions & 23 deletions flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,22 @@ func (p serviceAdminProvider) GetClusterResourceAttributes(ctx context.Context,
return nil, NewMissingEntityError("cluster resource attributes")
}

// We want both active and system generated projects
var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED)

var archivedProjectsFilter = fmt.Sprintf("eq(state,%d)", admin.Project_ARCHIVED)

var descUpdatedAtSortParam = admin.Sort{
var descCreatedAtSortParam = admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "updated_at",
Key: "created_at",
}

var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descUpdatedAtSortParam, models.ProjectColumns)
var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descCreatedAtSortParam, models.ProjectColumns)

func (p serviceAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (*admin.Projects, error) {
func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
projects := make([]*admin.Project, 0)
listReq := &admin.ProjectListRequest{
Limit: 100,
// Prefer to sync projects most newly updated to ensure their resources get modified first when other resources exist.
SortBy: &descUpdatedAtSortParam,
}
if useActiveProjectsFilter {
listReq.Filters = activeProjectsFilter
} else {
listReq.Filters = archivedProjectsFilter
Limit: 100,
Filters: activeProjectsFilter,
// Prefer to sync projects most newly created to ensure their resources get created first when other resources exist.
SortBy: &descCreatedAtSortParam,
}

// Iterate through all pages of projects
Expand All @@ -75,14 +68,6 @@ func (p serviceAdminProvider) getProjects(ctx context.Context, useActiveProjects
}, nil
}

func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getActiveProjects)
}

func (p serviceAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getArchivedProjects)
}

func NewAdminServiceDataProvider(
adminClient service.AdminServiceClient) interfaces.FlyteAdminDataProvider {
return &serviceAdminProvider{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func TestServiceGetProjects(t *testing.T) {
t.Run("happy case", func(t *testing.T) {
mockAdmin := mocks.AdminServiceClient{}
mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool {
res := req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at"
return res
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at"
})).Return(&admin.Projects{
Projects: []*admin.Project{
{
Expand All @@ -111,7 +110,7 @@ func TestServiceGetProjects(t *testing.T) {
t.Run("admin error", func(t *testing.T) {
mockAdmin := mocks.AdminServiceClient{}
mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool {
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at"
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at"
})).Return(nil, errFoo)
provider := serviceAdminProvider{
adminClient: &mockAdmin,
Expand Down
29 changes: 4 additions & 25 deletions flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
)

const (
stateColumn = "state"
)

// Implementation of an interfaces.FlyteAdminDataProvider which fetches data directly from the provided database connection.
type dbAdminProvider struct {
db repositoryInterfaces.Repository
Expand Down Expand Up @@ -51,20 +47,11 @@ func (p dbAdminProvider) getDomains() []*admin.Domain {
return domains
}

func (p dbAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (projectsList *admin.Projects, err error) {
var filter common.InlineFilter
if useActiveProjectsFilter {
filter, err = common.NewSingleValueFilter(common.Project, common.NotEqual, stateColumn, int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
}
} else {
filter, err = common.NewSingleValueFilter(common.Project, common.Equal, stateColumn, int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
}
func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
filter, err := common.NewSingleValueFilter(common.Project, common.NotEqual, "state", int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
}

projectModels, err := p.db.ProjectRepo().List(ctx, repositoryInterfaces.ListResourceInput{
SortParameter: descCreatedAtSortDBParam,
InlineFilters: []common.InlineFilter{filter},
Expand All @@ -78,14 +65,6 @@ func (p dbAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilte
}, nil
}

func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getActiveProjects)
}

func (p dbAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getArchivedProjects)
}

func NewDatabaseAdminDataProvider(db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider {
return &dbAdminProvider{
db: db,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestGetProjects(t *testing.T) {
mockRepo.(*repoMocks.MockRepository).ProjectRepoIface = &repoMocks.MockProjectRepo{
ListProjectsFunction: func(ctx context.Context, input repoInterfaces.ListResourceInput) ([]models.Project, error) {
assert.Len(t, input.InlineFilters, 1)
assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "updated_at desc")
assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "created_at desc")
return []models.Project{
{
Identifier: "flytesnacks",
Expand Down
5 changes: 0 additions & 5 deletions flyteadmin/pkg/clusterresource/impl/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
)

const (
getActiveProjects = true
getArchivedProjects = false
)

func NewMissingEntityError(entity string) error {
return errors.NewFlyteAdminErrorf(codes.NotFound, "Failed to find [%s]", entity)
}
1 change: 0 additions & 1 deletion flyteadmin/pkg/clusterresource/interfaces/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ import (
type FlyteAdminDataProvider interface {
GetClusterResourceAttributes(ctx context.Context, org, project, domain string) (*admin.ClusterResourceAttributes, error)
GetProjects(ctx context.Context) (*admin.Projects, error)
GetArchivedProjects(ctx context.Context) (*admin.Projects, error)
}
41 changes: 0 additions & 41 deletions flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteadmin/pkg/clusterresource/sync_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ type ResourceSyncStats struct {
Updated int
AlreadyThere int
Errored int
Deleted int
}

// Add adds the values of the other ResourceSyncStats to this one
Expand All @@ -15,5 +14,4 @@ func (m *ResourceSyncStats) Add(other ResourceSyncStats) {
m.Updated += other.Updated
m.AlreadyThere += other.AlreadyThere
m.Errored += other.Errored
m.Deleted += other.Deleted
}
7 changes: 0 additions & 7 deletions flyteadmin/pkg/runtime/cluster_resource_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ var clusterResourceConfig = config.MustRegisterSection(clusterResourceKey, &inte
Duration: time.Minute,
},
CustomData: make(map[interfaces.DomainName]interfaces.TemplateData),
UnionProjectSyncConfig: interfaces.UnionProjectSyncConfig{
BatchSize: 10,
},
})

// Implementation of an interfaces.ClusterResourceConfiguration
Expand All @@ -44,10 +41,6 @@ func (p *ClusterResourceConfigurationProvider) IsStandaloneDeployment() bool {
return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).StandaloneDeployment
}

func (p *ClusterResourceConfigurationProvider) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig {
return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).UnionProjectSyncConfig
}

func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration {
return &ClusterResourceConfigurationProvider{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ type DomainName = string

type TemplateData = map[string]DataSource

type UnionProjectSyncConfig struct {
CleanupNamespace bool `json:"cleanupNamespace" pflag:", Whether to clean up resources associated with archived projects"`
BatchSize int `json:"batchSize" pflag:", How many projects to process in parallel (use 0 for serial processing)"`
}

type ClusterResourceConfig struct {
TemplatePath string `json:"templatePath"`
// TemplateData maps template keys e.g. my_super_secret_password to a data source
Expand All @@ -47,9 +42,8 @@ type ClusterResourceConfig struct {
foo:
value: "baz"
*/
CustomData map[DomainName]TemplateData `json:"customData"`
StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"`
UnionProjectSyncConfig UnionProjectSyncConfig `json:"unionProjectSyncConfig"`
CustomData map[DomainName]TemplateData `json:"customData"`
StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"`
}

type ClusterResourceConfiguration interface {
Expand All @@ -58,5 +52,4 @@ type ClusterResourceConfiguration interface {
GetRefreshInterval() time.Duration
GetCustomTemplateData() map[DomainName]TemplateData
IsStandaloneDeployment() bool
GetUnionProjectSyncConfig() UnionProjectSyncConfig
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ func (c MockClusterResourceConfiguration) IsStandaloneDeployment() bool {
return c.StandaloneDeployment
}

func (c MockClusterResourceConfiguration) GetArchiveProjectConfig() interfaces.UnionProjectSyncConfig {
return interfaces.UnionProjectSyncConfig{}
}

func (c MockClusterResourceConfiguration) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig {
return interfaces.UnionProjectSyncConfig{}
}

func NewMockClusterResourceConfiguration() interfaces.ClusterResourceConfiguration {
return &MockClusterResourceConfiguration{}
}

0 comments on commit 2fada83

Please sign in to comment.