Skip to content

Commit

Permalink
Migrate cluster resource controller to call admin service endpoints (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jan 15, 2022
1 parent 19897af commit 17ff041
Show file tree
Hide file tree
Showing 23 changed files with 785 additions and 261 deletions.
102 changes: 33 additions & 69 deletions flyteadmin/cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,50 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flyteidl/clients/go/admin"

"github.com/flyteorg/flyteadmin/pkg/clusterresource"
"github.com/flyteorg/flyteadmin/pkg/config"
executioncluster "github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
"github.com/flyteorg/flyteadmin/pkg/runtime"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteadmin/pkg/config"
"github.com/flyteorg/flyteadmin/pkg/repositories"
repositoryConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/spf13/cobra"
_ "gorm.io/driver/postgres" // Required to import database driver.
gormLogger "gorm.io/gorm/logger"
)

var parentClusterResourceCmd = &cobra.Command{
Use: "clusterresource",
Short: "This command administers the ClusterResourceController. Please choose a subcommand.",
}

func GetLocalDbConfig() repositoryConfig.DbConfig {
return repositoryConfig.DbConfig{
Host: "localhost",
Port: 5432,
DbName: "postgres",
User: "postgres",
func getClusterResourceController(ctx context.Context, scope promutils.Scope, configuration runtimeInterfaces.Configuration) clusterresource.Controller {
initializationErrorCounter := scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
var listTargetsProvider interfaces.ListTargetsInterface
var err error
if len(configuration.ClusterConfiguration().GetClusterConfigs()) == 0 {
serverConfig := config.GetConfig()
listTargetsProvider, err = executioncluster.NewInCluster(initializationErrorCounter, serverConfig.KubeConfig, serverConfig.Master)
} else {
listTargetsProvider, err = executioncluster.NewListTargets(initializationErrorCounter, executioncluster.NewExecutionTargetProvider(), configuration.ClusterConfiguration())
}
if err != nil {
panic(err)
}

clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
panic(err)
}

return clusterresource.NewClusterResourceController(clientSet.AdminClient(), listTargetsProvider, scope)
}

var controllerRunCmd = &cobra.Command{
Expand All @@ -38,36 +56,9 @@ var controllerRunCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope).NewSubScope("clusterresource")
dbConfigValues := configuration.ApplicationConfiguration().GetDbConfig()
dbLogLevel := gormLogger.Silent
if dbConfigValues.Debug {
dbLogLevel = gormLogger.Info
}
dbConfig := repositoryConfig.DbConfig{
BaseConfig: repositoryConfig.BaseConfig{
LogLevel: dbLogLevel,
},
Host: dbConfigValues.Host,
Port: dbConfigValues.Port,
DbName: dbConfigValues.DbName,
User: dbConfigValues.User,
Password: dbConfigValues.Password,
ExtraOptions: dbConfigValues.ExtraOptions,
}
db := repositories.GetRepository(
repositories.POSTGRES, dbConfig, scope.NewSubScope("database"))

cfg := config.GetConfig()
executionCluster := executioncluster.GetExecutionCluster(
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration,
db)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
clusterResourceController := getClusterResourceController(ctx, scope, configuration)
clusterResourceController.Run()
logger.Infof(ctx, "ClusterResourceController started successfully")
logger.Infof(ctx, "ClusterResourceController started running successfully")
},
}

Expand All @@ -78,39 +69,12 @@ var controllerSyncCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope).NewSubScope("clusterresource")
dbConfigValues := configuration.ApplicationConfiguration().GetDbConfig()
dbLogLevel := gormLogger.Silent
if dbConfigValues.Debug {
dbLogLevel = gormLogger.Info
}
dbConfig := repositoryConfig.DbConfig{
BaseConfig: repositoryConfig.BaseConfig{
LogLevel: dbLogLevel,
},
Host: dbConfigValues.Host,
Port: dbConfigValues.Port,
DbName: dbConfigValues.DbName,
User: dbConfigValues.User,
Password: dbConfigValues.Password,
ExtraOptions: dbConfigValues.ExtraOptions,
}
db := repositories.GetRepository(
repositories.POSTGRES, dbConfig, scope.NewSubScope("database"))

cfg := config.GetConfig()
executionCluster := executioncluster.GetExecutionCluster(
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration,
db)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
clusterResourceController := getClusterResourceController(ctx, scope, configuration)
err := clusterResourceController.Sync(ctx)
if err != nil {
logger.Fatalf(ctx, "Failed to sync cluster resources [%+v]", err)
}
logger.Infof(ctx, "ClusterResourceController started successfully")
logger.Infof(ctx, "ClusterResourceController synced successfully")
},
}

Expand Down
17 changes: 11 additions & 6 deletions flyteadmin/cmd/entrypoints/k8s_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"path/filepath"
"strings"

"k8s.io/client-go/rest"

"github.com/flyteorg/flytestdlib/logger"
kubeErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/flyteorg/flyteadmin/auth"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyteadmin/pkg/config"
"github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
executioncluster "github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/promutils"
Expand Down Expand Up @@ -93,24 +94,28 @@ func buildK8sSecretData(_ context.Context, localPath string) (map[string][]byte,
}

func persistSecrets(ctx context.Context, _ *pflag.FlagSet) error {
serverCfg := config.GetConfig()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope)
initializationErrorCounter := scope.NewSubScope("secrets").MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
clusterClient, err := impl.NewInCluster(initializationErrorCounter, serverCfg.KubeConfig, serverCfg.Master)
listTargetsProvider, err := executioncluster.NewListTargets(initializationErrorCounter, executioncluster.NewExecutionTargetProvider(), configuration.ClusterConfiguration())
if err != nil {
return err
}

targets := clusterClient.GetAllValidTargets()
targets := listTargetsProvider.GetValidTargets()
// Since we are targeting the cluster Admin is running in, this list should contain exactly one item
if len(targets) != 1 {
return fmt.Errorf("expected exactly 1 valid target cluster. Found [%v]", len(targets))
}
var clusterCfg rest.Config
for _, target := range targets {
// We've just ascertained targets contains exactly 1 item, so we can safely assume we'll assign the clusterCfg
// from that one item now.
clusterCfg = target.Config
}

clusterCfg := targets[0].Config
kubeClient, err := kubernetes.NewForConfig(&clusterCfg)
if err != nil {
return errors.Wrapf("INIT", err, "Error building kubernetes clientset")
Expand Down
3 changes: 3 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/zalando/go-keyring v0.1.1
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/api v0.42.0
Expand Down Expand Up @@ -74,6 +75,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0 // indirect
github.com/dgraph-io/ristretto v0.0.3 // indirect
Expand All @@ -85,6 +87,7 @@ require (
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.4.8 // indirect
github.com/godbus/dbus/v5 v5.0.3 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/go-cmp v0.5.6 // indirect
Expand Down
6 changes: 6 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/danieljoos/wincred v1.1.0 h1:3RNcEpBg4IhIChZdFRSdlQt1QjCp1sMAPIrOnm7Yf8g=
github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -644,6 +646,8 @@ github.com/gobuffalo/x v0.0.0-20181003152136-452098b06085/go.mod h1:WevpGD+5YOre
github.com/gobuffalo/x v0.0.0-20181007152206-913e47c59ca7/go.mod h1:9rDPXaB3kXdKWzMc4odGQQdG2e2DIEmANy5aSJ9yesY=
github.com/goccy/go-json v0.4.8 h1:TfwOxfSp8hXH+ivoOk36RyDNmXATUETRdaNWDaZglf8=
github.com/goccy/go-json v0.4.8/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
Expand Down Expand Up @@ -1419,6 +1423,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zalando/go-keyring v0.1.1 h1:w2V9lcx/Uj4l+dzAf1m9s+DJ1O8ROkEHnynonHjTcYE=
github.com/zalando/go-keyring v0.1.1/go.mod h1:OIC+OZ28XbmwFxU/Rp9V7eKzZjamBJwRzC8UFJH9+L8=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.elastic.co/apm v1.8.0/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
Expand Down
Loading

0 comments on commit 17ff041

Please sign in to comment.