Skip to content

Commit

Permalink
Expose kubeclient configuration (flyteorg#491)
Browse files Browse the repository at this point in the history
* expose and use kubeclient configs if available

Signed-off-by: Babis Kiosidis <[email protected]>

* omit empty kubeclientconfig

Signed-off-by: Babis Kiosidis <[email protected]>

* setting configuration on all kubeclients

Signed-off-by: Daniel Rammer <[email protected]>

* addressing PR renaming comments

Signed-off-by: Dan Rammer <[email protected]>

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Dan Rammer <[email protected]>
Co-authored-by: Babis Kiosidis <[email protected]>
  • Loading branch information
hamersaw and ckiosidis authored Dec 15, 2022
1 parent 6037335 commit 5300f92
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 22 deletions.
22 changes: 20 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type ServerConfig struct {
// Deprecated: please use auth.AppAuth.ThirdPartyConfig instead.
DeprecatedThirdPartyConfig authConfig.ThirdPartyConfigOptions `json:"thirdPartyConfig" pflag:",Deprecated please use auth.appAuth.thirdPartyConfig instead."`

DataProxy DataProxyConfig `json:"dataProxy" pflag:",Defines data proxy configuration."`
ReadHeaderTimeoutSeconds int `json:"readHeaderTimeoutSeconds" pflag:",The amount of time allowed to read request headers."`
DataProxy DataProxyConfig `json:"dataProxy" pflag:",Defines data proxy configuration."`
ReadHeaderTimeoutSeconds int `json:"readHeaderTimeoutSeconds" pflag:",The amount of time allowed to read request headers."`
KubeClientConfig KubeClientConfig `json:"kubeClientConfig" pflag:",Configuration to control the Kubernetes client"`
}

type DataProxyConfig struct {
Expand All @@ -51,6 +52,18 @@ type GrpcConfig struct {
MaxMessageSizeBytes int `json:"maxMessageSizeBytes" pflag:",The max size in bytes for incoming gRPC messages"`
}

// KubeClientConfig contains the configuration used by flyteadmin to configure its internal Kubernetes Client.
type KubeClientConfig struct {
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS int32 `json:"qps" pflag:",Max QPS to the master for requests to KubeAPI. 0 defaults to 5."`
// Maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int `json:"burst" pflag:",Max burst rate for throttle. 0 defaults to 10"`
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout config.Duration `json:"timeout" pflag:",Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout."`
}

type ServerSecurityOptions struct {
Secure bool `json:"secure"`
Ssl SslOptions `json:"ssl"`
Expand Down Expand Up @@ -97,6 +110,11 @@ var defaultServerConfig = &ServerConfig{
},
},
ReadHeaderTimeoutSeconds: 32, // just shy of requestTimeoutUpperBound
KubeClientConfig: KubeClientConfig{
QPS: 100,
Burst: 25,
Timeout: config.Duration{Duration: 30 * time.Second},
},
}
var serverConfig = config.MustRegisterSection(SectionKey, defaultServerConfig)

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/serverconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions pkg/config/serverconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type clusterExecutionTargetProvider struct{}

// Creates a new Execution target for a cluster based on config passed in.
func (c *clusterExecutionTargetProvider) GetExecutionTarget(initializationErrorCounter prometheus.Counter, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster)
kubeConf, err := flytek8s.GetRestClientConfig("", "", &k8sCluster)
if err != nil {
return nil, err
}
Expand Down
38 changes: 23 additions & 15 deletions pkg/flytek8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"os"

"github.com/flyteorg/flyteadmin/pkg/config"
"github.com/flyteorg/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -37,40 +38,47 @@ func RemoteClusterConfig(host string, auth runtimeInterfaces.Auth) (*restclient.
}, nil
}

func GetRestClientConfigForCluster(cluster runtimeInterfaces.ClusterConfig) (*restclient.Config, error) {
kubeConfiguration, err := RemoteClusterConfig(cluster.Endpoint, cluster.Auth)

if err != nil {
return nil, err
}
logger.Debugf(context.Background(), "successfully loaded kube configuration from %v", cluster)
return kubeConfiguration, nil
}

// Initializes a config using a variety of configurable or default fallback options that can be passed to a Kubernetes client on
// initialization.
func GetRestClientConfig(kubeConfig, master string,
func GetRestClientConfig(kubeConfigPathString, master string,
k8sCluster *runtimeInterfaces.ClusterConfig) (*restclient.Config, error) {
var kubeConfiguration *restclient.Config
var err error

if kubeConfig != "" {
kubeClientConfig := &config.GetConfig().KubeClientConfig
if kubeConfigPathString != "" {
// ExpandEnv allows using $HOME in the path and it will automatically map to the right OS's user home
kubeConfigPath := os.ExpandEnv(kubeConfig)
kubeConfigPath := os.ExpandEnv(kubeConfigPathString)
kubeConfiguration, err = clientcmd.BuildConfigFromFlags(master, kubeConfigPath)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Error building kubeconfig: %v", err)
}
logger.Debugf(context.Background(), "successfully loaded kube config from %s", kubeConfig)
logger.Debugf(context.Background(), "successfully loaded kube config from %s", kubeConfigPathString)
} else if k8sCluster != nil {
return GetRestClientConfigForCluster(*k8sCluster)
kubeConfiguration, err = RemoteClusterConfig(k8sCluster.Endpoint, k8sCluster.Auth)
if err != nil {
return nil, err
}
logger.Debugf(context.Background(), "successfully loaded kube configuration from %v", k8sCluster)

if k8sCluster.KubeClientConfig != nil {
logger.Debugf(context.Background(), "using rest config from remote cluster override for k8s cluster %s", k8sCluster.Name)
kubeClientConfig = k8sCluster.KubeClientConfig
}
} else {
kubeConfiguration, err = restclient.InClusterConfig()
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Cannot get incluster kubeconfig : %v", err.Error())
}
logger.Debug(context.Background(), "successfully loaded kube configuration from in cluster config")
}

if kubeClientConfig != nil {
kubeConfiguration.QPS = float32(kubeClientConfig.QPS)
kubeConfiguration.Burst = kubeClientConfig.Burst
kubeConfiguration.Timeout = kubeClientConfig.Timeout.Duration
}

return kubeConfiguration, nil
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/runtime/interfaces/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package interfaces
import (
"io/ioutil"

"github.com/flyteorg/flyteadmin/pkg/config"

"github.com/pkg/errors"
)

// Holds details about a cluster used for workflow execution.
type ClusterConfig struct {
Name string `json:"name"`
Endpoint string `json:"endpoint"`
Auth Auth `json:"auth"`
Enabled bool `json:"enabled"`
Name string `json:"name"`
Endpoint string `json:"endpoint"`
Auth Auth `json:"auth"`
Enabled bool `json:"enabled"`
KubeClientConfig *config.KubeClientConfig `json:"kubeClientConfig,omitempty"`
}

type Auth struct {
Expand Down

0 comments on commit 5300f92

Please sign in to comment.