From 033cf05cfee3eeb2fa76dbd722120398e5a55fd1 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 15 Dec 2022 13:11:00 -0600 Subject: [PATCH] Expose kubeclient configuration (#491) * expose and use kubeclient configs if available Signed-off-by: Babis Kiosidis * omit empty kubeclientconfig Signed-off-by: Babis Kiosidis * setting configuration on all kubeclients Signed-off-by: Daniel Rammer * addressing PR renaming comments Signed-off-by: Dan Rammer Signed-off-by: Babis Kiosidis Signed-off-by: Daniel Rammer Signed-off-by: Dan Rammer Co-authored-by: Babis Kiosidis --- flyteadmin/pkg/config/config.go | 22 +++++++++- flyteadmin/pkg/config/serverconfig_flags.go | 3 ++ .../pkg/config/serverconfig_flags_test.go | 42 +++++++++++++++++++ .../impl/cluster_execution_target_provider.go | 2 +- flyteadmin/pkg/flytek8s/client.go | 38 ++++++++++------- .../interfaces/cluster_configuration.go | 11 +++-- 6 files changed, 96 insertions(+), 22 deletions(-) diff --git a/flyteadmin/pkg/config/config.go b/flyteadmin/pkg/config/config.go index 5d5bc3dbcd..28996bd236 100644 --- a/flyteadmin/pkg/config/config.go +++ b/flyteadmin/pkg/config/config.go @@ -25,8 +25,9 @@ type ServerConfig struct { // Deprecated: please use auth.AppAuth.ThirdPartyConfig instead. DeprecatedThirdPartyConfig authConfig.ThirdPartyConfigOptions `json:"thirdPartyConfig" pflag:",Deprecated please use auth.appAuth.thirdPartyConfig instead."` - DataProxy DataProxyConfig `json:"dataProxy" pflag:",Defines data proxy configuration."` - ReadHeaderTimeoutSeconds int `json:"readHeaderTimeoutSeconds" pflag:",The amount of time allowed to read request headers."` + DataProxy DataProxyConfig `json:"dataProxy" pflag:",Defines data proxy configuration."` + ReadHeaderTimeoutSeconds int `json:"readHeaderTimeoutSeconds" pflag:",The amount of time allowed to read request headers."` + KubeClientConfig KubeClientConfig `json:"kubeClientConfig" pflag:",Configuration to control the Kubernetes client"` } type DataProxyConfig struct { @@ -51,6 +52,18 @@ type GrpcConfig struct { MaxMessageSizeBytes int `json:"maxMessageSizeBytes" pflag:",The max size in bytes for incoming gRPC messages"` } +// KubeClientConfig contains the configuration used by flyteadmin to configure its internal Kubernetes Client. +type KubeClientConfig struct { + // QPS indicates the maximum QPS to the master from this client. + // If it's zero, the created RESTClient will use DefaultQPS: 5 + QPS int32 `json:"qps" pflag:",Max QPS to the master for requests to KubeAPI. 0 defaults to 5."` + // Maximum burst for throttle. + // If it's zero, the created RESTClient will use DefaultBurst: 10. + Burst int `json:"burst" pflag:",Max burst rate for throttle. 0 defaults to 10"` + // The maximum length of time to wait before giving up on a server request. A value of zero means no timeout. + Timeout config.Duration `json:"timeout" pflag:",Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout."` +} + type ServerSecurityOptions struct { Secure bool `json:"secure"` Ssl SslOptions `json:"ssl"` @@ -97,6 +110,11 @@ var defaultServerConfig = &ServerConfig{ }, }, ReadHeaderTimeoutSeconds: 32, // just shy of requestTimeoutUpperBound + KubeClientConfig: KubeClientConfig{ + QPS: 100, + Burst: 25, + Timeout: config.Duration{Duration: 30 * time.Second}, + }, } var serverConfig = config.MustRegisterSection(SectionKey, defaultServerConfig) diff --git a/flyteadmin/pkg/config/serverconfig_flags.go b/flyteadmin/pkg/config/serverconfig_flags.go index ebdf12deb7..c37a826032 100755 --- a/flyteadmin/pkg/config/serverconfig_flags.go +++ b/flyteadmin/pkg/config/serverconfig_flags.go @@ -76,5 +76,8 @@ func (cfg ServerConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "dataProxy.upload.storagePrefix"), defaultServerConfig.DataProxy.Upload.StoragePrefix, "Storage prefix to use for all upload requests.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "dataProxy.download.maxExpiresIn"), defaultServerConfig.DataProxy.Download.MaxExpiresIn.String(), "Maximum allowed expiration duration.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "readHeaderTimeoutSeconds"), defaultServerConfig.ReadHeaderTimeoutSeconds, "The amount of time allowed to read request headers.") + cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "kubeClientConfig.qps"), defaultServerConfig.KubeClientConfig.QPS, "Max QPS to the master for requests to KubeAPI. 0 defaults to 5.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "kubeClientConfig.burst"), defaultServerConfig.KubeClientConfig.Burst, "Max burst rate for throttle. 0 defaults to 10") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "kubeClientConfig.timeout"), defaultServerConfig.KubeClientConfig.Timeout.String(), "Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout.") return cmdFlags } diff --git a/flyteadmin/pkg/config/serverconfig_flags_test.go b/flyteadmin/pkg/config/serverconfig_flags_test.go index 4412be6374..b16e0416dd 100755 --- a/flyteadmin/pkg/config/serverconfig_flags_test.go +++ b/flyteadmin/pkg/config/serverconfig_flags_test.go @@ -463,4 +463,46 @@ func TestServerConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_kubeClientConfig.qps", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("kubeClientConfig.qps", testValue) + if vInt32, err := cmdFlags.GetInt32("kubeClientConfig.qps"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vInt32), &actual.KubeClientConfig.QPS) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_kubeClientConfig.burst", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("kubeClientConfig.burst", testValue) + if vInt, err := cmdFlags.GetInt("kubeClientConfig.burst"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vInt), &actual.KubeClientConfig.Burst) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_kubeClientConfig.timeout", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := defaultServerConfig.KubeClientConfig.Timeout.String() + + cmdFlags.Set("kubeClientConfig.timeout", testValue) + if vString, err := cmdFlags.GetString("kubeClientConfig.timeout"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vString), &actual.KubeClientConfig.Timeout) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go b/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go index 328a288810..ad6e95bb93 100644 --- a/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go +++ b/flyteadmin/pkg/executioncluster/impl/cluster_execution_target_provider.go @@ -16,7 +16,7 @@ type clusterExecutionTargetProvider struct{} // Creates a new Execution target for a cluster based on config passed in. func (c *clusterExecutionTargetProvider) GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) { - kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster) + kubeConf, err := flytek8s.GetRestClientConfig("", "", &k8sCluster) if err != nil { return nil, err } diff --git a/flyteadmin/pkg/flytek8s/client.go b/flyteadmin/pkg/flytek8s/client.go index e22c15a855..a5baabb08a 100644 --- a/flyteadmin/pkg/flytek8s/client.go +++ b/flyteadmin/pkg/flytek8s/client.go @@ -5,6 +5,7 @@ import ( "context" "os" + "github.com/flyteorg/flyteadmin/pkg/config" "github.com/flyteorg/flyteadmin/pkg/errors" "google.golang.org/grpc/codes" @@ -37,33 +38,33 @@ func RemoteClusterConfig(host string, auth runtimeInterfaces.Auth) (*restclient. }, nil } -func GetRestClientConfigForCluster(cluster runtimeInterfaces.ClusterConfig) (*restclient.Config, error) { - kubeConfiguration, err := RemoteClusterConfig(cluster.Endpoint, cluster.Auth) - - if err != nil { - return nil, err - } - logger.Debugf(context.Background(), "successfully loaded kube configuration from %v", cluster) - return kubeConfiguration, nil -} - // Initializes a config using a variety of configurable or default fallback options that can be passed to a Kubernetes client on // initialization. -func GetRestClientConfig(kubeConfig, master string, +func GetRestClientConfig(kubeConfigPathString, master string, k8sCluster *runtimeInterfaces.ClusterConfig) (*restclient.Config, error) { var kubeConfiguration *restclient.Config var err error - if kubeConfig != "" { + kubeClientConfig := &config.GetConfig().KubeClientConfig + if kubeConfigPathString != "" { // ExpandEnv allows using $HOME in the path and it will automatically map to the right OS's user home - kubeConfigPath := os.ExpandEnv(kubeConfig) + kubeConfigPath := os.ExpandEnv(kubeConfigPathString) kubeConfiguration, err = clientcmd.BuildConfigFromFlags(master, kubeConfigPath) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Error building kubeconfig: %v", err) } - logger.Debugf(context.Background(), "successfully loaded kube config from %s", kubeConfig) + logger.Debugf(context.Background(), "successfully loaded kube config from %s", kubeConfigPathString) } else if k8sCluster != nil { - return GetRestClientConfigForCluster(*k8sCluster) + kubeConfiguration, err = RemoteClusterConfig(k8sCluster.Endpoint, k8sCluster.Auth) + if err != nil { + return nil, err + } + logger.Debugf(context.Background(), "successfully loaded kube configuration from %v", k8sCluster) + + if k8sCluster.KubeClientConfig != nil { + logger.Debugf(context.Background(), "using rest config from remote cluster override for k8s cluster %s", k8sCluster.Name) + kubeClientConfig = k8sCluster.KubeClientConfig + } } else { kubeConfiguration, err = restclient.InClusterConfig() if err != nil { @@ -71,6 +72,13 @@ func GetRestClientConfig(kubeConfig, master string, } logger.Debug(context.Background(), "successfully loaded kube configuration from in cluster config") } + + if kubeClientConfig != nil { + kubeConfiguration.QPS = float32(kubeClientConfig.QPS) + kubeConfiguration.Burst = kubeClientConfig.Burst + kubeConfiguration.Timeout = kubeClientConfig.Timeout.Duration + } + return kubeConfiguration, nil } diff --git a/flyteadmin/pkg/runtime/interfaces/cluster_configuration.go b/flyteadmin/pkg/runtime/interfaces/cluster_configuration.go index 100e6f0047..491caed3f1 100644 --- a/flyteadmin/pkg/runtime/interfaces/cluster_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/cluster_configuration.go @@ -3,15 +3,18 @@ package interfaces import ( "io/ioutil" + "github.com/flyteorg/flyteadmin/pkg/config" + "github.com/pkg/errors" ) // Holds details about a cluster used for workflow execution. type ClusterConfig struct { - Name string `json:"name"` - Endpoint string `json:"endpoint"` - Auth Auth `json:"auth"` - Enabled bool `json:"enabled"` + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Auth Auth `json:"auth"` + Enabled bool `json:"enabled"` + KubeClientConfig *config.KubeClientConfig `json:"kubeClientConfig,omitempty"` } type Auth struct {