-
Notifications
You must be signed in to change notification settings - Fork 53
K8 array resource manager #120
Changes from 40 commits
d4efa26
74523da
e2c3aae
e12e287
e7c226d
8ed48d4
6090a7b
3bd1ef4
b913806
61f4751
d39b457
78ee66d
e371f27
f2a16bf
ae4f146
286425c
9cd4e4e
1a5c62c
a72846d
94f695d
98409ba
08b75e7
ba2a74d
dd5db90
438b0fd
edd233e
5f5d1a9
ad4ce20
050e8b2
f6c4d72
b031292
726c37b
86444fb
089f9ba
0b5580a
e54c081
50ccc6b
68912a6
0d1961c
cf9c388
24c18e0
85455ba
fac4413
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -226,6 +226,7 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus | |
totalSuccesses := int64(0) | ||
totalFailures := int64(0) | ||
totalRunning := int64(0) | ||
totalWaitingForResources := int64(0) | ||
for phase, count := range summary { | ||
totalCount += count | ||
if phase.IsTerminal() { | ||
|
@@ -238,6 +239,8 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus | |
// TODO: preferable to auto-combine to array tasks for now. | ||
totalFailures += count | ||
} | ||
} else if phase.IsWaitingForResources() { | ||
totalWaitingForResources += count | ||
} else { | ||
totalRunning += count | ||
} | ||
|
@@ -249,12 +252,16 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus | |
} | ||
|
||
// No chance to reach the required success numbers. | ||
if totalRunning+totalSuccesses < minSuccesses { | ||
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] < minSuccesses[%v]", | ||
totalRunning, totalSuccesses, minSuccesses) | ||
if totalRunning+totalSuccesses+totalWaitingForResources < minSuccesses { | ||
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] + totalWaitingForResource[%v] < minSuccesses[%v]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Array failed early because number of failures are > minimum success requirements. Snapshot: TotalRunning[%d]..... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have concurrency limits? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
totalRunning, totalSuccesses, totalWaitingForResources, minSuccesses) | ||
return PhaseWriteToDiscoveryThenFail | ||
} | ||
|
||
if totalWaitingForResources > 0 { | ||
logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) | ||
return PhaseWaitingForResources | ||
} | ||
if totalSuccesses >= minSuccesses && totalRunning == 0 { | ||
logger.Infof(ctx, "Array succeeded because totalSuccesses[%v] >= minSuccesses[%v]", totalSuccesses, minSuccesses) | ||
return PhaseWriteToDiscovery | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,10 +5,16 @@ | |
package k8s | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
|
||
"github.com/pkg/errors" | ||
v1 "k8s.io/api/core/v1" | ||
restclient "k8s.io/client-go/rest" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config" | ||
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" | ||
"github.com/lyft/flytestdlib/config" | ||
) | ||
|
||
//go:generate pflags Config --default-var=defaultConfig | ||
|
@@ -31,14 +37,80 @@ var ( | |
}, | ||
} | ||
|
||
configSection = config.MustRegisterSection(configSectionKey, defaultConfig) | ||
configSection = pluginsConfig.MustRegisterSubSection(configSectionKey, defaultConfig) | ||
) | ||
|
||
type ResourceConfig struct { | ||
PrimaryLabel string `json:"primaryLabel" pflag:",PrimaryLabel of a given service cluster"` | ||
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) for the cluster"` | ||
} | ||
|
||
type ClusterConfig struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we put some comments here, explaining why we have a cluster config and why we need to do this? This seems a little complicated IMO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
Name string `json:"name" pflag:",Friendly name of the remote cluster"` | ||
Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"` | ||
Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"` | ||
Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"` | ||
} | ||
|
||
type Auth struct { | ||
Type string `json:"type" pflag:", Authentication type"` | ||
TokenPath string `json:"tokenPath" pflag:", Token path"` | ||
CertPath string `json:"certPath" pflag:", Certificate path"` | ||
} | ||
|
||
func (auth Auth) GetCA() ([]byte, error) { | ||
cert, err := ioutil.ReadFile(auth.CertPath) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path") | ||
} | ||
return cert, nil | ||
} | ||
|
||
func (auth Auth) GetToken() (string, error) { | ||
token, err := ioutil.ReadFile(auth.TokenPath) | ||
if err != nil { | ||
return "", errors.Wrap(err, "failed to read k8s bearer token from configured path") | ||
} | ||
return string(token), nil | ||
} | ||
|
||
// TODO: Move logic to flytestdlib | ||
// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should move this to flytestdlib since it also exists in flyteadmin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup. Will add TODO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @katrogan / @anandswaminathan I prefer we dont do this at all, I know you want to do this to test, but keep it for shorter time. My preference is that we run the entire workflow in a separate cluster if need be |
||
func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of remoteCluster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is our V0 milestone. We do not have a way to prioritize regular tasks over array tasks in the same namespace. Array task will end up utilizing all the resource quotas, that will prevent all the other tasks from progressing. |
||
tokenString, err := auth.GetToken() | ||
if err != nil { | ||
return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err)) | ||
} | ||
|
||
caCert, err := auth.GetCA() | ||
if err != nil { | ||
return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err)) | ||
} | ||
|
||
tlsClientConfig := restclient.TLSClientConfig{} | ||
tlsClientConfig.CAData = caCert | ||
return &restclient.Config{ | ||
Host: host, | ||
TLSClientConfig: tlsClientConfig, | ||
BearerToken: tokenString, | ||
}, nil | ||
} | ||
|
||
func GetK8sClient(config ClusterConfig) (client.Client, error) { | ||
kubeConf, err := RemoteClusterConfig(config.Endpoint, config.Auth) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return client.New(kubeConf, client.Options{}) | ||
} | ||
|
||
// Defines custom config for K8s Array plugin | ||
type Config struct { | ||
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."` | ||
MaxErrorStringLength int `json:"maxErrLength" pflag:",Determines the maximum length of the error string returned for the array."` | ||
MaxErrorStringLength int `json:"maxErrorLength" pflag:",Determines the maximum length of the error string returned for the array."` | ||
MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."` | ||
ResourceConfig ResourceConfig `json:"resourceConfig" pflag:"-,ResourceConfiguration to limit number of resources used by k8s-array."` | ||
RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"-,Configuration of remote K8s cluster for array jobs"` | ||
NodeSelector map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."` | ||
Tolerations []v1.Toleration `json:"tolerations" pflag:"-,Tolerations to be applied for k8s-array pods"` | ||
OutputAssembler workqueue.Config | ||
|
@@ -48,3 +120,8 @@ type Config struct { | |
func GetConfig() *Config { | ||
return configSection.GetConfig().(*Config) | ||
} | ||
|
||
func IsResourceConfigSet(resourceConfig ResourceConfig) bool { | ||
emptyResouceConfig := ResourceConfig{} | ||
return resourceConfig != emptyResouceConfig | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow how was this working correctly before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we never have
totalWaitingForResources
? Is that your question?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I didn't realize that phase was introduced here. yes, thanks for explaining