Skip to content

Commit

Permalink
Configurable cluster resource sync: in cluster or standalone (flyteor…
Browse files Browse the repository at this point in the history
…g#326)

* refactor

Signed-off-by: Katrina Rogan <[email protected]>

* Add mocks

Signed-off-by: Katrina Rogan <[email protected]>

* lint

Signed-off-by: Katrina Rogan <[email protected]>

* Review comments

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
Katrina Rogan authored Jan 21, 2022
1 parent cf76955 commit a6932ce
Show file tree
Hide file tree
Showing 14 changed files with 609 additions and 97 deletions.
27 changes: 21 additions & 6 deletions flyteadmin/cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/impl"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
execClusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyteadmin/pkg/repositories"
repositoryConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"

"github.com/flyteorg/flytestdlib/promutils"

Expand All @@ -29,7 +34,7 @@ func getClusterResourceController(ctx context.Context, scope promutils.Scope, co
initializationErrorCounter := scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
var listTargetsProvider interfaces.ListTargetsInterface
var listTargetsProvider execClusterIfaces.ListTargetsInterface
var err error
if len(configuration.ClusterConfiguration().GetClusterConfigs()) == 0 {
serverConfig := config.GetConfig()
Expand All @@ -41,12 +46,22 @@ func getClusterResourceController(ctx context.Context, scope promutils.Scope, co
panic(err)
}

clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
panic(err)
var adminDataProvider interfaces.FlyteAdminDataProvider
if configuration.ClusterResourceConfiguration().IsStandaloneDeployment() {
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
panic(err)
}
adminDataProvider = impl.NewAdminServiceDataProvider(clientSet.AdminClient())
} else {
dbConfig := repositoryConfig.NewDbConfig(configuration.ApplicationConfiguration().GetDbConfig())
db := repositories.GetRepository(
repositories.POSTGRES, dbConfig, scope.NewSubScope("database"))

adminDataProvider = impl.NewDatabaseAdminDataProvider(db, configuration, resources.NewResourceManager(db, configuration.ApplicationConfiguration()))
}

return clusterresource.NewClusterResourceController(clientSet.AdminClient(), listTargetsProvider, scope)
return clusterresource.NewClusterResourceController(adminDataProvider, listTargetsProvider, scope)
}

var controllerRunCmd = &cobra.Command{
Expand Down
72 changes: 18 additions & 54 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import (

"google.golang.org/grpc/status"

"github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"

"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
"github.com/flyteorg/flyteadmin/pkg/executioncluster"
executionclusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/runtime"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -96,14 +95,9 @@ type controller struct {
metrics controllerMetrics
lastAppliedTemplateDir string
// Map of [namespace -> [templateFileName -> last modified time]]
appliedTemplates NamespaceCache
adminClient service.AdminServiceClient
listTargets interfaces.ListTargetsInterface
}

var descCreatedAtSortParam = &admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "created_at",
appliedTemplates NamespaceCache
adminDataProvider interfaces.FlyteAdminDataProvider
listTargets executionclusterIfaces.ListTargetsInterface
}

func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFile os.FileInfo) bool {
Expand Down Expand Up @@ -199,23 +193,19 @@ func (c *controller) getCustomTemplateValues(
}
collectedErrs := make([]error, 0)
// All override values saved in the database take precedence over the domain-specific defaults.
resource, err := c.adminClient.GetProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesGetRequest{
Project: project,
Domain: domain,
ResourceType: admin.MatchableResource_CLUSTER_RESOURCE,
})
attributes, err := c.adminDataProvider.GetClusterResourceAttributes(ctx, project, domain)
if err != nil {
s, ok := status.FromError(err)
if !ok || s.Code() != codes.NotFound {
collectedErrs = append(collectedErrs, err)
}
}
if resource != nil && resource.Attributes != nil && resource.Attributes.MatchingAttributes != nil &&
resource.Attributes.MatchingAttributes.GetClusterResourceAttributes() != nil {
for templateKey, templateValue := range resource.Attributes.MatchingAttributes.GetClusterResourceAttributes().Attributes {
if attributes != nil && attributes.Attributes != nil {
for templateKey, templateValue := range attributes.Attributes {
customTemplateValues[fmt.Sprintf(templateVariableFormat, templateKey)] = templateValue
}
}

if len(collectedErrs) > 0 {
return nil, errors.NewCollectedFlyteAdminError(codes.InvalidArgument, collectedErrs)
}
Expand Down Expand Up @@ -550,32 +540,6 @@ func (c *controller) createPatch(gvk schema.GroupVersionKind, currentObj *unstru
return patch, patchType, nil
}

var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED)

func (c *controller) listAllProjects(ctx context.Context) ([]*admin.Project, error) {
projects := make([]*admin.Project, 0)
listReq := &admin.ProjectListRequest{
Limit: 100,
Filters: activeProjectsFilter,
// Prefer to sync projects most newly created to ensure their resources get created first when other resources exist.
SortBy: descCreatedAtSortParam,
}

// Iterate through all pages of projects
for {
projectResp, err := c.adminClient.ListProjects(ctx, listReq)
if err != nil {
return nil, err
}
projects = append(projects, projectResp.Projects...)
if len(projectResp.Token) == 0 {
break
}
listReq.Token = projectResp.Token
}
return projects, nil
}

func (c *controller) Sync(ctx context.Context) error {
defer func() {
if err := recover(); err != nil {
Expand All @@ -586,7 +550,7 @@ func (c *controller) Sync(ctx context.Context) error {
c.metrics.SyncStarted.Inc()
logger.Debugf(ctx, "Running an invocation of ClusterResource Sync")

projects, err := c.listAllProjects(ctx)
projects, err := c.adminDataProvider.GetProjects(ctx)
if err != nil {
return err
}
Expand All @@ -603,7 +567,7 @@ func (c *controller) Sync(ctx context.Context) error {
errs = append(errs, err)
}

for _, project := range projects {
for _, project := range projects.Projects {
for _, domain := range *domains {
namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Id, domain.Name)
customTemplateValues, err := c.getCustomTemplateValues(
Expand Down Expand Up @@ -669,14 +633,14 @@ func newMetrics(scope promutils.Scope) controllerMetrics {
}
}

func NewClusterResourceController(adminClient service.AdminServiceClient, listTargets interfaces.ListTargetsInterface, scope promutils.Scope) Controller {
func NewClusterResourceController(adminDataProvider interfaces.FlyteAdminDataProvider, listTargets executionclusterIfaces.ListTargetsInterface, scope promutils.Scope) Controller {
config := runtime.NewConfigurationProvider()
return &controller{
adminClient: adminClient,
config: config,
listTargets: listTargets,
poller: make(chan struct{}),
metrics: newMetrics(scope),
appliedTemplates: make(map[string]map[string]time.Time),
adminDataProvider: adminDataProvider,
config: config,
listTargets: listTargets,
poller: make(chan struct{}),
metrics: newMetrics(scope),
appliedTemplates: make(map[string]map[string]time.Time),
}
}
47 changes: 18 additions & 29 deletions flyteadmin/pkg/clusterresource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/flyteorg/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteadmin/pkg/executioncluster/mocks"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/mocks"
execClusterMocks "github.com/flyteorg/flyteadmin/pkg/executioncluster/mocks"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
clientMocks "github.com/flyteorg/flyteidl/clients/go/admin/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
mockScope "github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -152,28 +154,16 @@ func TestPopulateDefaultTemplateValues(t *testing.T) {
}

func TestGetCustomTemplateValues(t *testing.T) {
adminClient := clientMocks.AdminServiceClient{}
adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool {
return req.Project == proj && req.Domain == domain
})).Return(&admin.ProjectDomainAttributesGetResponse{
Attributes: &admin.ProjectDomainAttributes{
Project: proj,
Domain: domain,
MatchingAttributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ClusterResourceAttributes{
ClusterResourceAttributes: &admin.ClusterResourceAttributes{
Attributes: map[string]string{
"var1": "val1",
"var2": "val2",
},
},
},
},
adminDataProvider := mocks.FlyteAdminDataProvider{}
adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, proj, domain).Return(&admin.ClusterResourceAttributes{
Attributes: map[string]string{
"var1": "val1",
"var2": "val2",
},
}, nil)

testController := controller{
adminClient: &adminClient,
adminDataProvider: &adminDataProvider,
}
domainTemplateValues := templateValuesType{
"{{ var1 }}": "i'm getting overwritten",
Expand All @@ -191,12 +181,11 @@ func TestGetCustomTemplateValues(t *testing.T) {
}

func TestGetCustomTemplateValues_NothingToOverride(t *testing.T) {
adminClient := clientMocks.AdminServiceClient{}
adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool {
return req.Project == proj && req.Domain == domain
})).Return(&admin.ProjectDomainAttributesGetResponse{}, nil)
adminDataProvider := mocks.FlyteAdminDataProvider{}
adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, proj, domain).Return(
nil, errors.NewFlyteAdminError(codes.NotFound, "foo"))
testController := controller{
adminClient: &adminClient,
adminDataProvider: &adminDataProvider,
}
customTemplateValues, err := testController.getCustomTemplateValues(context.Background(), proj, domain, templateValuesType{
"{{ var1 }}": "val1",
Expand Down Expand Up @@ -344,11 +333,11 @@ metadata:
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
adminClient := clientMocks.AdminServiceClient{}
adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.Anything).Return(&admin.ProjectDomainAttributesGetResponse{}, nil)
adminDataProvider := mocks.FlyteAdminDataProvider{}
adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, mock.Anything, mock.Anything).Return(&admin.ClusterResourceAttributes{}, nil)
mockPromScope := mockScope.NewTestScope()

c := NewClusterResourceController(&adminClient, &mocks.ListTargetsInterface{}, mockPromScope)
c := NewClusterResourceController(&adminDataProvider, &execClusterMocks.ListTargetsInterface{}, mockPromScope)
testController := c.(*controller)

gotK8sManifest, err := testController.createResourceFromTemplate(tt.args.ctx, tt.args.templateDir, tt.args.templateFileName, tt.args.project, tt.args.domain, tt.args.namespace, tt.args.templateValues, tt.args.customTemplateValues)
Expand Down
66 changes: 66 additions & 0 deletions flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package impl

import (
"context"
"fmt"

"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
)

// Implementation of an interfaces.FlyteAdminDataProvider which fetches data using a flyteadmin service client
type serviceAdminProvider struct {
adminClient service.AdminServiceClient
}

func (p serviceAdminProvider) GetClusterResourceAttributes(ctx context.Context, project, domain string) (*admin.ClusterResourceAttributes, error) {
resource, err := p.adminClient.GetProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesGetRequest{
Project: project,
Domain: domain,
ResourceType: admin.MatchableResource_CLUSTER_RESOURCE,
})
if err != nil {
return nil, err
}
if resource != nil && resource.Attributes != nil && resource.Attributes.MatchingAttributes != nil &&
resource.Attributes.MatchingAttributes.GetClusterResourceAttributes() != nil {
return resource.Attributes.MatchingAttributes.GetClusterResourceAttributes(), nil
}
return nil, NewMissingEntityError("cluster resource attributes")
}

var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED)

func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
projects := make([]*admin.Project, 0)
listReq := &admin.ProjectListRequest{
Limit: 100,
Filters: activeProjectsFilter,
// Prefer to sync projects most newly created to ensure their resources get created first when other resources exist.
SortBy: &descCreatedAtSortParam,
}

// Iterate through all pages of projects
for {
projectResp, err := p.adminClient.ListProjects(ctx, listReq)
if err != nil {
return nil, err
}
projects = append(projects, projectResp.Projects...)
if len(projectResp.Token) == 0 {
break
}
listReq.Token = projectResp.Token
}
return &admin.Projects{
Projects: projects,
}, nil
}

func NewAdminServiceDataProvider(
adminClient service.AdminServiceClient) interfaces.FlyteAdminDataProvider {
return &serviceAdminProvider{
adminClient: adminClient,
}
}
Loading

0 comments on commit a6932ce

Please sign in to comment.