From 2fada831ae853128edc5745e88ce1e30f06f897b Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 14 May 2024 12:40:01 -0700 Subject: [PATCH] Revert "SVR-468: Batch provision execution namespaces and support delete" (#272) Reverts unionai/flyte#246 --- flyteadmin/pkg/clusterresource/controller.go | 266 +++--------------- .../pkg/clusterresource/controller_test.go | 16 +- .../impl/admin_service_data_provider.go | 31 +- .../impl/admin_service_data_provider_test.go | 5 +- .../impl/db_admin_data_provider.go | 29 +- .../impl/db_admin_data_provider_test.go | 2 +- flyteadmin/pkg/clusterresource/impl/shared.go | 5 - .../pkg/clusterresource/interfaces/admin.go | 1 - .../mocks/flyte_admin_data_provider.go | 41 --- flyteadmin/pkg/clusterresource/sync_stats.go | 2 - .../pkg/runtime/cluster_resource_provider.go | 7 - .../cluster_resource_configuration.go | 11 +- .../mocks/mock_cluster_resource_provider.go | 8 - 13 files changed, 66 insertions(+), 358 deletions(-) diff --git a/flyteadmin/pkg/clusterresource/controller.go b/flyteadmin/pkg/clusterresource/controller.go index c5e3f8ef2ea..e0d1ba868fa 100644 --- a/flyteadmin/pkg/clusterresource/controller.go +++ b/flyteadmin/pkg/clusterresource/controller.go @@ -11,16 +11,11 @@ import ( "path/filepath" "runtime/debug" "strings" - "sync" "github.com/prometheus/client_golang/prometheus" - "github.com/samber/lo" - "golang.org/x/exp/maps" - "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" - k8_api_err "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +33,6 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/restmapper" - "sigs.k8s.io/controller-runtime/pkg/client" impl2 "github.com/flyteorg/flyte/flyteadmin/pkg/clusterresource/impl" "github.com/flyteorg/flyte/flyteadmin/pkg/clusterresource/interfaces" @@ -68,9 +62,6 @@ const templateVariableFormat = "{{ %s }}" const replaceAllInstancesOfString = -1 const noChange = "{}" -var deleteImmediately = int64(0) -var deleteForeground = metav1.DeletePropagationForeground - // The clusterresource Controller manages applying desired templatized kubernetes resource files as resources // in the execution kubernetes cluster. type Controller interface { @@ -81,12 +72,9 @@ type Controller interface { type controllerMetrics struct { Scope promutils.Scope SyncStarted prometheus.Counter - SyncsCompleted prometheus.Counter KubernetesResourcesCreated prometheus.Counter KubernetesResourcesCreateErrors prometheus.Counter - KubernetesNamespaceDeleteErrors prometheus.Counter ResourcesAdded prometheus.Counter - NamespacesDeleted prometheus.Counter ResourceAddErrors prometheus.Counter TemplateReadErrors prometheus.Counter TemplateDecodeErrors prometheus.Counter @@ -108,25 +96,19 @@ type controller struct { metrics controllerMetrics lastAppliedTemplateDir string // Map of [namespace -> [templateFileName -> last modified time]] - appliedNamespaceTemplateChecksums sync.Map - adminDataProvider interfaces.FlyteAdminDataProvider - listTargets executionclusterIfaces.ListTargetsInterface + appliedTemplates NamespaceCache + adminDataProvider interfaces.FlyteAdminDataProvider + listTargets executionclusterIfaces.ListTargetsInterface } // templateAlreadyApplied checks if there is an applied template with the same checksum -func (c *controller) templateAlreadyApplied(ctx context.Context, namespace NamespaceName, templateFilename string, checksum [16]byte) bool { - namespaceTemplateChecksumsVal, ok := c.appliedNamespaceTemplateChecksums.Load(namespace) +func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFilename string, checksum [16]byte) bool { + namespacedAppliedTemplates, ok := c.appliedTemplates[namespace] if !ok { // There is no record of this namespace altogether. return false } - - namespacedAppliedTemplateChecksums, ok := namespaceTemplateChecksumsVal.(TemplateChecksums) - if !ok { - logger.Errorf(ctx, "unexpected type for namespace '%s' in appliedTemplates map: %t", namespace, namespaceTemplateChecksumsVal) - return false - } - appliedChecksum, ok := namespacedAppliedTemplateChecksums[templateFilename] + appliedChecksum, ok := namespacedAppliedTemplates[templateFilename] if !ok { // There is no record of this file having ever been applied. return false @@ -136,20 +118,11 @@ func (c *controller) templateAlreadyApplied(ctx context.Context, namespace Names } // setTemplateChecksum records the latest checksum for the template file -func (c *controller) setTemplateChecksum(ctx context.Context, namespace NamespaceName, templateFilename string, checksum [16]byte) { - namespaceTemplateChecksumsVal, ok := c.appliedNamespaceTemplateChecksums.Load(namespace) - if !ok { - c.appliedNamespaceTemplateChecksums.Store(namespace, TemplateChecksums{templateFilename: checksum}) - return - } - - namespacedAppliedTemplateChecksums, ok := namespaceTemplateChecksumsVal.(TemplateChecksums) - if !ok { - logger.Errorf(ctx, "unexpected type for namespace '%s' in appliedTemplates map: %t", namespace, namespaceTemplateChecksumsVal) - return +func (c *controller) setTemplateChecksum(namespace NamespaceName, templateFilename string, checksum [16]byte) { + if _, ok := c.appliedTemplates[namespace]; !ok { + c.appliedTemplates[namespace] = make(TemplateChecksums) } - namespacedAppliedTemplateChecksums[templateFilename] = checksum - c.appliedNamespaceTemplateChecksums.Store(namespace, namespacedAppliedTemplateChecksums) + c.appliedTemplates[namespace][templateFilename] = checksum } // Given a map of templatized variable names -> data source, this function produces an output that maps the same @@ -304,12 +277,11 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string // 2. create the resource on the kubernetes cluster and cache successful outcomes func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName, templateValues, customTemplateValues templateValuesType) (ResourceSyncStats, error) { - logger.Debugf(ctx, "syncing namespace: '%s' for project '%s' and domain '%s' and org '%s'", namespace, project.Id, domain.Id, project.Org) templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath() if c.lastAppliedTemplateDir != templateDir { // Invalidate all caches c.lastAppliedTemplateDir = templateDir - c.appliedNamespaceTemplateChecksums = sync.Map{} + c.appliedTemplates = make(NamespaceCache) } templateFiles, err := ioutil.ReadDir(templateDir) if err != nil { @@ -321,7 +293,6 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, collectedErrs := make([]error, 0) stats := ResourceSyncStats{} for _, templateFile := range templateFiles { - templateFile := templateFile templateFileName := templateFile.Name() if filepath.Ext(templateFileName) != ".yaml" { // nothing to do. @@ -338,7 +309,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, } checksum := md5.Sum([]byte(k8sManifest)) // #nosec - if c.templateAlreadyApplied(ctx, namespace, templateFileName, checksum) { + if c.templateAlreadyApplied(namespace, templateFileName, checksum) { // nothing to do. logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name()) stats.AlreadyThere++ @@ -357,8 +328,8 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, continue } - logger.Debugf(ctx, "Attempting to create resource [%+v] in cluster [%v] for namespace [%s] and templateFileName: %+v", - dynamicObj.obj.GetKind(), target.ID, namespace, templateFileName) + logger.Debugf(ctx, "Attempting to create resource [%+v] in cluster [%v] for namespace [%s]", + dynamicObj.obj.GetKind(), target.ID, namespace) dr := getDynamicResourceInterface(dynamicObj.mapping, target.DynamicClient, namespace) _, err = dr.Create(ctx, dynamicObj.obj, metav1.CreateOptions{}) @@ -403,7 +374,6 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Infof(ctx, "Resource [%+v] in namespace [%s] is not modified", dynamicObj.obj.GetKind(), namespace) stats.AlreadyThere++ - c.setTemplateChecksum(ctx, namespace, templateFileName, checksum) continue } @@ -421,7 +391,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, stats.Updated++ logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]", dynamicObj.obj.GetKind(), namespace) - c.setTemplateChecksum(ctx, namespace, templateFileName, checksum) + c.setTemplateChecksum(namespace, templateFileName, checksum) } else { // Some error other than AlreadyExists was raised when we tried to Create the k8s object. c.metrics.KubernetesResourcesCreateErrors.Inc() @@ -438,64 +408,15 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes", dynamicObj.obj.GetKind(), namespace) c.metrics.KubernetesResourcesCreated.Inc() - c.setTemplateChecksum(ctx, namespace, templateFileName, checksum) + c.setTemplateChecksum(namespace, templateFileName, checksum) } } } if len(collectedErrs) > 0 { return stats, errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs) } - return stats, nil -} -func (c *controller) deleteNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, - namespace NamespaceName) (ResourceSyncStats, error) { - logger.Debugf(ctx, "attempting to delete namespace: '%s' for project '%s' and domain '%s' and org '%s'", - namespace, project.Id, domain.Id, project.Org) - collectedErrs := make([]error, 0) - _, mapHasVal := c.appliedNamespaceTemplateChecksums.Load(namespace) - if !mapHasVal { - logger.Debugf(ctx, "namespace: '%s' for project '%s' and domain '%s' and org '%s' is already recorded as deleted", - namespace, project.Id, domain.Id, project.Org) - return ResourceSyncStats{}, nil - } - for _, target := range c.listTargets.GetValidTargets() { - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - }, - } - err := target.Client.Delete(context.Background(), ns, &client.DeleteOptions{ - GracePeriodSeconds: &deleteImmediately, - PropagationPolicy: &deleteForeground, - }) - if err != nil { - if k8_api_err.IsNotFound(err) || k8_api_err.IsGone(err) { - logger.Debugf(ctx, "namespace: '%s' for project '%s' and domain '%s' and org '%s' is already deleted", - namespace, project.Id, domain.Id, project.Org) - continue - } - - c.metrics.KubernetesNamespaceDeleteErrors.Inc() - logger.Errorf(ctx, "failed to delete namespace [%s] for project [%s] and domain [%s] and org [%s] with err: %v", - namespace, project.Id, domain.Id, project.Org, err) - collectedErrs = append(collectedErrs, err) - continue - } - logger.Infof(ctx, "successfully deleted namespace [%s] for project [%s] and domain [%s] and org [%s]", namespace, project.Id, domain.Id, project.Org) - c.metrics.NamespacesDeleted.Inc() - } - - // Now clean up internal state that tracks this formerly active namespace - c.appliedNamespaceTemplateChecksums.Delete(namespace) - - if len(collectedErrs) > 0 { - return ResourceSyncStats{}, errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs) - } - - return ResourceSyncStats{ - Deleted: 1, - }, nil + return stats, nil } var metadataAccessor = meta.NewAccessor() @@ -640,113 +561,6 @@ func (c *controller) createPatch(gvk schema.GroupVersionKind, currentObj *unstru return patch, patchType, nil } -type syncNamespaceProcessor = func(ctx context.Context, project *admin.Project, domain *admin.Domain, - namespace NamespaceName) (ResourceSyncStats, error) - -// newSyncNamespacer returns a closure that can be used to sync a namespace with the given default template values and domain-specific template values -func (c *controller) newSyncNamespacer(templateValues templateValuesType, domainTemplateValues map[string]templateValuesType) syncNamespaceProcessor { - return func(ctx context.Context, project *admin.Project, domain *admin.Domain, - namespace NamespaceName) (ResourceSyncStats, error) { - logger.Debugf(ctx, "sync namespacer is processing namespace: '%s' for project '%s' and domain '%s' and org '%s'", - namespace, project.Id, domain.Id, project.Org) - customTemplateValues, err := c.getCustomTemplateValues( - ctx, project.Org, project.Id, domain.Id, domainTemplateValues[domain.Id]) - if err != nil { - logger.Errorf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err) - return ResourceSyncStats{}, err - } - - // syncNamespace actually mutates the templateValues, so we copy it for safe concurrent access. - templateValuesCopy := maps.Clone(templateValues) - newStats, err := c.syncNamespace(ctx, project, domain, namespace, templateValuesCopy, customTemplateValues) - if err != nil { - logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err) - c.metrics.ResourceAddErrors.Inc() - return ResourceSyncStats{}, err - } - c.metrics.ResourcesAdded.Inc() - logger.Debugf(ctx, "successfully created kubernetes resources for '%s' for project '%s' and domain '%s' and org '%s'", - namespace, project.Id, domain.Id, project.Org) - return newStats, nil - } -} - -// processProjects accepts a generic project processor and applies it to all projects in the input list, -// optionally batching and parallelizing their processing for greater perf. -func (c *controller) processProjects(ctx context.Context, stats ResourceSyncStats, projects []*admin.Project, processor syncNamespaceProcessor) (ResourceSyncStats, error) { - if len(projects) == 0 { - // Nothing to do - return stats, nil - } - - if c.config.ClusterResourceConfiguration().GetUnionProjectSyncConfig().BatchSize == 0 { - logger.Debugf(ctx, "processing projects serially") - for _, project := range projects { - for _, domain := range project.Domains { - namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Org, project.Id, domain.Name) - newStats, err := processor(ctx, project, domain, namespace) - if err != nil { - return stats, err - } - stats.Add(newStats) - logger.Infof(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, newStats) - } - } - return stats, nil - } - - logger.Debugf(ctx, "processing projects in parallel with batch size: %d", c.config.ClusterResourceConfiguration().GetUnionProjectSyncConfig().BatchSize) - statsEntries := make([]ResourceSyncStats, len(projects)*len(projects[0].Domains)) - - batchSize := c.config.ClusterResourceConfiguration().GetUnionProjectSyncConfig().BatchSize - for i := 0; i < len(projects); i += batchSize { - logger.Debugf(ctx, "processing projects batch {%d...%d}", i, i+batchSize) - processProjectsGroup, processProjectsCtx := errgroup.WithContext(ctx) - for _, project := range projects[i:min(i+batchSize, len(projects))] { - var lastFormattedNamespace NamespaceName - for domainIdx, domain := range project.Domains { - // redefine variables for loop closure - i := i - org := project.Org - project := project - domain := domain - domainIdx := domainIdx - namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), org, project.Id, domain.Id) - if lastFormattedNamespace == namespace { - // This is a case where the namespace isn't customized based on project + domain + org, but rather project + org only - continue - } - lastFormattedNamespace = namespace - - processProjectsGroup.Go(func() error { - resourceStats, err := processor(processProjectsCtx, project, domain, namespace) - if err != nil { - logger.Warningf(ctx, "failed to create cluster resources for namespace [%s] with err: %v", namespace, err) - c.metrics.ResourceAddErrors.Inc() - // we don't error out here, in order to not block other namespaces from being synced - } else { - c.metrics.ResourcesAdded.Inc() - statsEntries[i+domainIdx] = resourceStats - logger.Debugf(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, resourceStats) - } - - return nil - }) - } - } - if err := processProjectsGroup.Wait(); err != nil { - logger.Warningf(ctx, "failed to process projects batch {%d...%d} when syncing namespaces with: %v", - i, i+batchSize, err) - } - logger.Debugf(ctx, "processed projects batch {%d...%d} when syncing namespaces", i, i+batchSize) - } - statsSummary := lo.Reduce(statsEntries, func(existing ResourceSyncStats, item ResourceSyncStats, _ int) ResourceSyncStats { - existing.Add(item) - return existing - }, ResourceSyncStats{}) - return statsSummary, nil -} - func (c *controller) Sync(ctx context.Context) error { defer func() { if err := recover(); err != nil { @@ -757,7 +571,6 @@ func (c *controller) Sync(ctx context.Context) error { c.metrics.SyncStarted.Inc() logger.Debugf(ctx, "Running an invocation of ClusterResource Sync") - // First handle ensuring k8s resources for active projects projects, err := c.adminDataProvider.GetProjects(ctx) if err != nil { return err @@ -775,31 +588,38 @@ func (c *controller) Sync(ctx context.Context) error { } stats := ResourceSyncStats{} - syncNamespaceProcessorFunc := c.newSyncNamespacer(templateValues, domainTemplateValues) - stats, err = c.processProjects(ctx, stats, projects.GetProjects(), syncNamespaceProcessorFunc) - if err != nil { - logger.Warningf(ctx, "Failed to process projects when syncing namespaces: %v", err) - return err - } - // Second, handle deleting k8s namespaces for archived projects - archivedProjects, err := c.adminDataProvider.GetArchivedProjects(ctx) - if err != nil { - logger.Warningf(ctx, "failed to get archived projects: %v", err) - return err - } - stats, err = c.processProjects(ctx, stats, archivedProjects.GetProjects(), c.deleteNamespace) - if err != nil { - logger.Warningf(ctx, "Failed to process projects when deleting namespaces: %v", err) - return err + for _, project := range projects.Projects { + for _, domain := range project.Domains { + namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Org, project.Id, domain.Name) + customTemplateValues, err := c.getCustomTemplateValues( + ctx, project.Org, project.Id, domain.Id, domainTemplateValues[domain.Id]) + if err != nil { + logger.Errorf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err) + errs = append(errs, err) + } + + newStats, err := c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues) + if err != nil { + logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err) + c.metrics.ResourceAddErrors.Inc() + errs = append(errs, err) + } else { + c.metrics.ResourcesAdded.Inc() + logger.Debugf(ctx, "Successfully created kubernetes resources for [%s]", namespace) + stats.Add(newStats) + } + + logger.Infof(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, newStats) + } } + logger.Infof(ctx, "Completed cluster resource creation loop with stats: [%+v]", stats) if len(errs) > 0 { return errors.NewCollectedFlyteAdminError(codes.Internal, errs) } - c.metrics.SyncsCompleted.Inc() return nil } @@ -822,15 +642,12 @@ func newMetrics(scope promutils.Scope) controllerMetrics { Scope: scope, SyncStarted: scope.MustNewCounter("k8s_resource_syncs", "overall count of the number of invocations of the resource controller 'sync' method"), - SyncsCompleted: scope.MustNewCounter("sync_success", "overall count of successful invocations of the resource controller 'sync' method"), KubernetesResourcesCreated: scope.MustNewCounter("k8s_resources_created", "overall count of successfully created resources in kubernetes"), KubernetesResourcesCreateErrors: scope.MustNewCounter("k8s_resource_create_errors", "overall count of errors encountered attempting to create resources in kubernetes"), - KubernetesNamespaceDeleteErrors: scope.MustNewCounter("k8s_namespace_delete_errors", "overall count of errors encountered attempting to delete namespaces in kubernetes"), ResourcesAdded: scope.MustNewCounter("resources_added", "overall count of successfully added resources for namespaces"), - NamespacesDeleted: scope.MustNewCounter("namespaces_deleted", "overall count of successfully deleted namespaces"), ResourceAddErrors: scope.MustNewCounter("resource_add_errors", "overall count of errors encountered creating resources for namespaces"), TemplateReadErrors: scope.MustNewCounter("template_read_errors", @@ -856,6 +673,7 @@ func NewClusterResourceController(adminDataProvider interfaces.FlyteAdminDataPro listTargets: listTargets, poller: make(chan struct{}), metrics: newMetrics(scope), + appliedTemplates: make(map[string]TemplateChecksums), } } diff --git a/flyteadmin/pkg/clusterresource/controller_test.go b/flyteadmin/pkg/clusterresource/controller_test.go index 482f2c60b49..671ac7945ad 100644 --- a/flyteadmin/pkg/clusterresource/controller_test.go +++ b/flyteadmin/pkg/clusterresource/controller_test.go @@ -5,7 +5,6 @@ import ( "crypto/md5" // #nosec "io/ioutil" "os" - "sync" "testing" "github.com/stretchr/testify/assert" @@ -30,7 +29,6 @@ const domain = "domain-bar" var testScope = mockScope.NewTestScope() func TestTemplateAlreadyApplied(t *testing.T) { - ctx := context.TODO() const namespace = "namespace" const fileName = "fileName" testController := controller{ @@ -38,15 +36,15 @@ func TestTemplateAlreadyApplied(t *testing.T) { } checksum1 := md5.Sum([]byte("template1")) // #nosec checksum2 := md5.Sum([]byte("template2")) // #nosec - assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1)) + assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum1)) - testController.appliedNamespaceTemplateChecksums = sync.Map{} - testController.setTemplateChecksum(ctx, namespace, fileName, checksum1) - assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1)) - assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2)) + testController.appliedTemplates = make(map[string]TemplateChecksums) + testController.setTemplateChecksum(namespace, fileName, checksum1) + assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum1)) + assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum2)) - testController.setTemplateChecksum(ctx, namespace, fileName, checksum2) - assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2)) + testController.setTemplateChecksum(namespace, fileName, checksum2) + assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum2)) } func TestPopulateTemplateValues(t *testing.T) { diff --git a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go index 68f34686fa2..bdb7716c12f 100644 --- a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go +++ b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go @@ -33,29 +33,22 @@ func (p serviceAdminProvider) GetClusterResourceAttributes(ctx context.Context, return nil, NewMissingEntityError("cluster resource attributes") } -// We want both active and system generated projects var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED) -var archivedProjectsFilter = fmt.Sprintf("eq(state,%d)", admin.Project_ARCHIVED) - -var descUpdatedAtSortParam = admin.Sort{ +var descCreatedAtSortParam = admin.Sort{ Direction: admin.Sort_DESCENDING, - Key: "updated_at", + Key: "created_at", } -var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descUpdatedAtSortParam, models.ProjectColumns) +var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descCreatedAtSortParam, models.ProjectColumns) -func (p serviceAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (*admin.Projects, error) { +func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { projects := make([]*admin.Project, 0) listReq := &admin.ProjectListRequest{ - Limit: 100, - // Prefer to sync projects most newly updated to ensure their resources get modified first when other resources exist. - SortBy: &descUpdatedAtSortParam, - } - if useActiveProjectsFilter { - listReq.Filters = activeProjectsFilter - } else { - listReq.Filters = archivedProjectsFilter + Limit: 100, + Filters: activeProjectsFilter, + // Prefer to sync projects most newly created to ensure their resources get created first when other resources exist. + SortBy: &descCreatedAtSortParam, } // Iterate through all pages of projects @@ -75,14 +68,6 @@ func (p serviceAdminProvider) getProjects(ctx context.Context, useActiveProjects }, nil } -func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { - return p.getProjects(ctx, getActiveProjects) -} - -func (p serviceAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) { - return p.getProjects(ctx, getArchivedProjects) -} - func NewAdminServiceDataProvider( adminClient service.AdminServiceClient) interfaces.FlyteAdminDataProvider { return &serviceAdminProvider{ diff --git a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go index 16f60613db3..176b037a7fe 100644 --- a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go +++ b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go @@ -89,8 +89,7 @@ func TestServiceGetProjects(t *testing.T) { t.Run("happy case", func(t *testing.T) { mockAdmin := mocks.AdminServiceClient{} mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool { - res := req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at" - return res + return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at" })).Return(&admin.Projects{ Projects: []*admin.Project{ { @@ -111,7 +110,7 @@ func TestServiceGetProjects(t *testing.T) { t.Run("admin error", func(t *testing.T) { mockAdmin := mocks.AdminServiceClient{} mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool { - return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at" + return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at" })).Return(nil, errFoo) provider := serviceAdminProvider{ adminClient: &mockAdmin, diff --git a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go index 59dd27ba420..a42df64aec1 100644 --- a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go +++ b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go @@ -12,10 +12,6 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" ) -const ( - stateColumn = "state" -) - // Implementation of an interfaces.FlyteAdminDataProvider which fetches data directly from the provided database connection. type dbAdminProvider struct { db repositoryInterfaces.Repository @@ -51,20 +47,11 @@ func (p dbAdminProvider) getDomains() []*admin.Domain { return domains } -func (p dbAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (projectsList *admin.Projects, err error) { - var filter common.InlineFilter - if useActiveProjectsFilter { - filter, err = common.NewSingleValueFilter(common.Project, common.NotEqual, stateColumn, int32(admin.Project_ARCHIVED)) - if err != nil { - return nil, err - } - } else { - filter, err = common.NewSingleValueFilter(common.Project, common.Equal, stateColumn, int32(admin.Project_ARCHIVED)) - if err != nil { - return nil, err - } +func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { + filter, err := common.NewSingleValueFilter(common.Project, common.NotEqual, "state", int32(admin.Project_ARCHIVED)) + if err != nil { + return nil, err } - projectModels, err := p.db.ProjectRepo().List(ctx, repositoryInterfaces.ListResourceInput{ SortParameter: descCreatedAtSortDBParam, InlineFilters: []common.InlineFilter{filter}, @@ -78,14 +65,6 @@ func (p dbAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilte }, nil } -func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { - return p.getProjects(ctx, getActiveProjects) -} - -func (p dbAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) { - return p.getProjects(ctx, getArchivedProjects) -} - func NewDatabaseAdminDataProvider(db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider { return &dbAdminProvider{ db: db, diff --git a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go index b8931e4d89f..9f62e0901e3 100644 --- a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go +++ b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go @@ -106,7 +106,7 @@ func TestGetProjects(t *testing.T) { mockRepo.(*repoMocks.MockRepository).ProjectRepoIface = &repoMocks.MockProjectRepo{ ListProjectsFunction: func(ctx context.Context, input repoInterfaces.ListResourceInput) ([]models.Project, error) { assert.Len(t, input.InlineFilters, 1) - assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "updated_at desc") + assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "created_at desc") return []models.Project{ { Identifier: "flytesnacks", diff --git a/flyteadmin/pkg/clusterresource/impl/shared.go b/flyteadmin/pkg/clusterresource/impl/shared.go index a3b75dc4a6b..6dac279812d 100644 --- a/flyteadmin/pkg/clusterresource/impl/shared.go +++ b/flyteadmin/pkg/clusterresource/impl/shared.go @@ -6,11 +6,6 @@ import ( "github.com/flyteorg/flyte/flyteadmin/pkg/errors" ) -const ( - getActiveProjects = true - getArchivedProjects = false -) - func NewMissingEntityError(entity string) error { return errors.NewFlyteAdminErrorf(codes.NotFound, "Failed to find [%s]", entity) } diff --git a/flyteadmin/pkg/clusterresource/interfaces/admin.go b/flyteadmin/pkg/clusterresource/interfaces/admin.go index 7cf596ff3a1..5121072442e 100644 --- a/flyteadmin/pkg/clusterresource/interfaces/admin.go +++ b/flyteadmin/pkg/clusterresource/interfaces/admin.go @@ -11,5 +11,4 @@ import ( type FlyteAdminDataProvider interface { GetClusterResourceAttributes(ctx context.Context, org, project, domain string) (*admin.ClusterResourceAttributes, error) GetProjects(ctx context.Context) (*admin.Projects, error) - GetArchivedProjects(ctx context.Context) (*admin.Projects, error) } diff --git a/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go b/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go index b1838e8a179..b6223c8e2a0 100644 --- a/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go +++ b/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go @@ -15,47 +15,6 @@ type FlyteAdminDataProvider struct { mock.Mock } -type FlyteAdminDataProvider_GetArchivedProjects struct { - *mock.Call -} - -func (_m FlyteAdminDataProvider_GetArchivedProjects) Return(_a0 *admin.Projects, _a1 error) *FlyteAdminDataProvider_GetArchivedProjects { - return &FlyteAdminDataProvider_GetArchivedProjects{Call: _m.Call.Return(_a0, _a1)} -} - -func (_m *FlyteAdminDataProvider) OnGetArchivedProjects(ctx context.Context) *FlyteAdminDataProvider_GetArchivedProjects { - c_call := _m.On("GetArchivedProjects", ctx) - return &FlyteAdminDataProvider_GetArchivedProjects{Call: c_call} -} - -func (_m *FlyteAdminDataProvider) OnGetArchivedProjectsMatch(matchers ...interface{}) *FlyteAdminDataProvider_GetArchivedProjects { - c_call := _m.On("GetArchivedProjects", matchers...) - return &FlyteAdminDataProvider_GetArchivedProjects{Call: c_call} -} - -// GetArchivedProjects provides a mock function with given fields: ctx -func (_m *FlyteAdminDataProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) { - ret := _m.Called(ctx) - - var r0 *admin.Projects - if rf, ok := ret.Get(0).(func(context.Context) *admin.Projects); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*admin.Projects) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - type FlyteAdminDataProvider_GetClusterResourceAttributes struct { *mock.Call } diff --git a/flyteadmin/pkg/clusterresource/sync_stats.go b/flyteadmin/pkg/clusterresource/sync_stats.go index bae4b0b2b6c..38d96091968 100644 --- a/flyteadmin/pkg/clusterresource/sync_stats.go +++ b/flyteadmin/pkg/clusterresource/sync_stats.go @@ -6,7 +6,6 @@ type ResourceSyncStats struct { Updated int AlreadyThere int Errored int - Deleted int } // Add adds the values of the other ResourceSyncStats to this one @@ -15,5 +14,4 @@ func (m *ResourceSyncStats) Add(other ResourceSyncStats) { m.Updated += other.Updated m.AlreadyThere += other.AlreadyThere m.Errored += other.Errored - m.Deleted += other.Deleted } diff --git a/flyteadmin/pkg/runtime/cluster_resource_provider.go b/flyteadmin/pkg/runtime/cluster_resource_provider.go index db0d8fedd53..f0ab808c244 100644 --- a/flyteadmin/pkg/runtime/cluster_resource_provider.go +++ b/flyteadmin/pkg/runtime/cluster_resource_provider.go @@ -16,9 +16,6 @@ var clusterResourceConfig = config.MustRegisterSection(clusterResourceKey, &inte Duration: time.Minute, }, CustomData: make(map[interfaces.DomainName]interfaces.TemplateData), - UnionProjectSyncConfig: interfaces.UnionProjectSyncConfig{ - BatchSize: 10, - }, }) // Implementation of an interfaces.ClusterResourceConfiguration @@ -44,10 +41,6 @@ func (p *ClusterResourceConfigurationProvider) IsStandaloneDeployment() bool { return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).StandaloneDeployment } -func (p *ClusterResourceConfigurationProvider) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig { - return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).UnionProjectSyncConfig -} - func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration { return &ClusterResourceConfigurationProvider{} } diff --git a/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go b/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go index f0adef22f18..990537450f5 100644 --- a/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go @@ -20,11 +20,6 @@ type DomainName = string type TemplateData = map[string]DataSource -type UnionProjectSyncConfig struct { - CleanupNamespace bool `json:"cleanupNamespace" pflag:", Whether to clean up resources associated with archived projects"` - BatchSize int `json:"batchSize" pflag:", How many projects to process in parallel (use 0 for serial processing)"` -} - type ClusterResourceConfig struct { TemplatePath string `json:"templatePath"` // TemplateData maps template keys e.g. my_super_secret_password to a data source @@ -47,9 +42,8 @@ type ClusterResourceConfig struct { foo: value: "baz" */ - CustomData map[DomainName]TemplateData `json:"customData"` - StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"` - UnionProjectSyncConfig UnionProjectSyncConfig `json:"unionProjectSyncConfig"` + CustomData map[DomainName]TemplateData `json:"customData"` + StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"` } type ClusterResourceConfiguration interface { @@ -58,5 +52,4 @@ type ClusterResourceConfiguration interface { GetRefreshInterval() time.Duration GetCustomTemplateData() map[DomainName]TemplateData IsStandaloneDeployment() bool - GetUnionProjectSyncConfig() UnionProjectSyncConfig } diff --git a/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go b/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go index d9cb8d0d9c5..d658c1e3cbe 100644 --- a/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go +++ b/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go @@ -33,14 +33,6 @@ func (c MockClusterResourceConfiguration) IsStandaloneDeployment() bool { return c.StandaloneDeployment } -func (c MockClusterResourceConfiguration) GetArchiveProjectConfig() interfaces.UnionProjectSyncConfig { - return interfaces.UnionProjectSyncConfig{} -} - -func (c MockClusterResourceConfiguration) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig { - return interfaces.UnionProjectSyncConfig{} -} - func NewMockClusterResourceConfiguration() interfaces.ClusterResourceConfiguration { return &MockClusterResourceConfiguration{} }