-
Notifications
You must be signed in to change notification settings - Fork 53
Enable Resource Manager for k8s array plugin #71
Conversation
@DouglasCurbelo / @migueltol22 can we please create a issue for this and add it to the description |
@@ -78,6 +80,15 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kub | |||
|
|||
actualPhase := phaseInfo.Phase() | |||
if phaseInfo.Phase().IsSuccess() { | |||
|
|||
// Release token | |||
resourceNamespace := core.ResourceNamespace(tCtx.TaskExecutionMetadata().GetOwnerID().Namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
release has to be done in case of success/failure or aborts right? I would do this in Finalize() as no matter what once you want to finish the task, we invoke finalize and thus this would be the right place to do token release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also writing a simplified interface that launches, monitors, aborts and finalizes one job might be nicer way of arranging the code - wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my question with doing this in finalize is that currently finalize will be called after all of the subtasks get run. What if we have 1000 subtasks and only 300 resources in that case we'll never release a resource (assuming my understanding is correct). I do agree that this should be done where each subtask has its own launch, monitor, etc. Any ideas on best way to handle this? Would introducing a new phase such as processingChildren
be a good approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@migueltol22 good question, so I would always release on successful completion of a task. The finalize of the one job should probably be invoked in 2 places, immediately after success and one in the finalize of the parent. If the finalize previously succeeded it would just be a no-op. Thus you will not have to worry about not release resources.
@@ -78,6 +80,15 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, kub | |||
|
|||
actualPhase := phaseInfo.Phase() | |||
if phaseInfo.Phase().IsSuccess() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happen if the pod fail? Do we release the token as well?
@@ -94,6 +96,16 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeCli | |||
|
|||
pod = ApplyPodPolicies(ctx, config, pod) | |||
|
|||
// Allocate Token | |||
resourceNamespace := core.ResourceNamespace(pod.Namespace) | |||
allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespace, pod.Name, core.ResourceConstraintsSpec{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nob question, here we allocate resources using pod.Namespace, by on the initialization step, we :
iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(primaryLabel), tokenLimit)
Does that mean that we only can request tokens with the same label we used during registration? If so, does enforcement of limits per namespace would require registering quotas for each namespace on initialization?
@kumare3 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have the same question about how resource namespace and requests are used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand your question fully, but let me provide some details regarding RM to see if that resolves your concern.
RM is basically a pooling system for plugins. A plugin requests to create one or more pools during setup time (using ResourceRegistrar().RegisterResourceQuota()
). At the end of the setup time, the RM will be created based on the valid pool-creation requests, and the actual pools will be created. During execution time, the plugin send token-allocate and token-release requests to RM, and RM will try to put and remove tokens to and from the pools specified in the requests. So the term ResourceNamespace
here just means the name of a token pool, nothing else. It has nothing to do with the users' namespace or k8s namespace (i.e. <project>-<domain>
). So how you want to compose a ResourceNamespace it is totally up to you.
I just realized that the argument names used in the function signature are misleading in this case. Those names made sense when I first wrote RM, but it seems like it has become confusing for its current form. I'll try to fix the naming soon. Sorry for the confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an awesome start! but I think this will need to be a more involved change than this...
The statemachine is currently simple:
- Launch -> Monitor -> Compute Output/Error -> Succeed/Fail
Instead I think you will need to combine the first two
Launch&Monitor -> Compute Output/Error -> Succeed/Fail
And in Launch&Monitor it should check the subtask's phase (and that is kind of its own statemachine)
case NotLaunch:
-> AcquireToken
case HasToken:
-> Launch
case Launched:
-> Monitor
case Finished:
-> ReleaseResource
You should also implement Finalize on the plugin and release all resources (it's idempotent, even if it was called before, it's ok to call again)
go/tasks/plugins/array/k8s/config.go
Outdated
// Defines custom config for K8s Array plugin | ||
type Config struct { | ||
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."` | ||
MaxErrorStringLength int `json:"maxErrLength" pflag:",Determines the maximum length of the error string returned for the array."` | ||
MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."` | ||
OutputAssembler workqueue.Config | ||
ErrorAssembler workqueue.Config | ||
TokenConfigs TokenConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ResourcesConfig?
go/tasks/plugins/array/k8s/config.go
Outdated
@@ -32,13 +32,19 @@ var ( | |||
configSection = config.MustRegisterSection(configSectionKey, defaultConfig) | |||
) | |||
|
|||
type TokenConfig struct { | |||
primaryLabel string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These will need to be exported e.g.:
primaryLabel string | |
PrimaryLabel string `json:"primaryLabel" pflag:",What is this?"` |
go/tasks/plugins/array/core/state.go
Outdated
} else { | ||
totalRunning += count | ||
} | ||
} | ||
|
||
if totalWaitingForResources > 0 { | ||
logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources) | ||
return PhaseCheckingSubTaskExecutions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not PhaseWaitingForResources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated.
Summary: arraystatus.ArraySummary{}, | ||
Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())), | ||
} | ||
|
||
if int64(currentState.GetExecutionArraySize()) > config.MaxArrayJobSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not do this check before declaring all the variables above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
// The first time we enter this state we will launch every subtask. On subsequent rounds, the pod | ||
// has already been created so we return a Success value and continue with the Monitor step. | ||
var status TaskStatus | ||
status, err = task.Launch(ctx, tCtx, kubeClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry if i'm missing something obvious here, but if the task is not terminal we always launch? what if it's already running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is correct. If it's already running we will get alreadyExists
error and proceed with the rest of the flow in monitor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for explaining
go/tasks/plugins/array/k8s/task.go
Outdated
Success TaskStatus = iota | ||
Error | ||
Waiting | ||
ReturnState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not immediately clear what this TaskStatus represents. can you add a comment explaining 'ReturnState'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are essentially two state machines one for the entire Job and one for the subtasks. There are cases where we would like to return the entire job state based off of the outcome of a subtask. I couldn't come up with a great name. Any ideas?(will add a comment after deciding on name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's also confusing that we return Success from Launch when we successfully launch and we return the same Success from Monitor() when the task finishes successfully.
Maybe you just need LaunchResult and MonitorResult (two separate enums)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use two seperate enums. Still unsure about the name for returnState
. Any ideas for a better name?
|
||
var args []string | ||
if len(podTemplate.Spec.Containers) > 0 { | ||
args = append(podTemplate.Spec.Containers[0].Command, podTemplate.Spec.Containers[0].Args...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we always assume that there is at most one container in the podspec? should we error if there are more? 0? in line 60 and elsewhere you reference the Containers[0] again without this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure. This was the code that was there before hand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm it seems odd that we check before de-referencing here but now below, can we maybe check and just throw an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to throw error. Not sure if this is what you meant or throw error below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@katrogan ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, thank you!
go/tasks/plugins/array/k8s/task.go
Outdated
return Success, nil | ||
} | ||
|
||
func (t Task) Abort() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a no-op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So abort in the handle step had never been implemented it was just calling finalize. Can update to actually call abort. Just have it as it because that was the preexisting implementation.
go/tasks/plugins/array/k8s/task.go
Outdated
|
||
} | ||
|
||
func allocateResource(ctx context.Context, tCtx core.TaskExecutionContext, config *Config, podName string, childIdx int, arrayStatus *arraystatus.ArrayStatus) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the bool in the response represent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the case we do not get a resource we do not want to continue with the flow. We'd like to proceed to the next iteration in the for loop. I'm thinking a better solution would be to just return the allocation status and have the check being done by the caller of allocate Resource. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds reasonable!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
go/tasks/plugins/array/k8s/config.go
Outdated
// Defines custom config for K8s Array plugin | ||
type Config struct { | ||
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."` | ||
MaxErrorStringLength int `json:"maxErrLength" pflag:",Determines the maximum length of the error string returned for the array."` | ||
MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."` | ||
ResourceConfig ResourceConfig `json:"resourceConfig" pflag:"-,ResourceConfiguration to limit number of resources used by k8s-array."` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for the -
in pflag? this will make it not generate pflags options... desired or copy paste error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Had it originally like that because I was using a map and pflag doesn't support that type. Updated.
go/tasks/plugins/array/k8s/task.go
Outdated
|
||
func (t Task) Abort() {} | ||
|
||
func (t Task) Finalize(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is the implementation of Abort.. Finalize should just try to deallocateResource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Updated abort and finalize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies about the delay, just noticed you requested feedback :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! I hear we've already used this in some experiments? fantastic... Please make sure Katrina is ok with your responses/resolutions to the comments she left...
I think that's as far as I'll be able to spot by just reviewing the code. I've not seen much of unit tests added though. Are you working on that?
|
||
var args []string | ||
if len(podTemplate.Spec.Containers) > 0 { | ||
args = append(podTemplate.Spec.Containers[0].Command, podTemplate.Spec.Containers[0].Args...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@katrogan ^^
@katrogan @migueltol22, sorry I stop following this change. Is the change ready to merge, if not what is missing? |
if IsResourceConfigSet() { | ||
primaryLabel := GetConfig().ResourceConfig.PrimaryLabel | ||
limit := GetConfig().ResourceConfig.Limit | ||
if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(primaryLabel), limit); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EngHabu regarding the resource name: We discussed earlier that we might need to add flyte cluster information to the resource name as a preparation step for moving toward a centralized Redis. Do you think the change should be done in this PR or a separate one? cc @migueltol22 just FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have been mistaken. Maybe there is no way to get the cluster name... we might have to do something in our deployment to do that... might not be so trivial... let me ask on the channel..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, found out we already pass the cluster name to spark deployment.
So we can do the same to flytepropeller to make it aware of its cluster name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! We should then be able to move to elasticache without a problem
TL;DR
Use resource manager for k8s array plugin as a rate limiter to k8s.
Example Config that could be used for resource manager:
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
flyteorg/flyte#201
Follow-up issue
NA
OR
https://github.com/lyft/flyte/issues/