-
Notifications
You must be signed in to change notification settings - Fork 53
Conversation
PTAL @EngHabu |
f616f34
to
50ccc6b
Compare
if totalRunning+totalSuccesses < minSuccesses { | ||
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] < minSuccesses[%v]", | ||
totalRunning, totalSuccesses, minSuccesses) | ||
if totalRunning+totalSuccesses+totalWaitingForResources < minSuccesses { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow how was this working correctly before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we never have totalWaitingForResources
? Is that your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I didn't realize that phase was introduced here. yes, thanks for explaining
return string(token), nil | ||
} | ||
|
||
// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should move this to flytestdlib since it also exists in flyteadmin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Will add TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@katrogan / @anandswaminathan I prefer we dont do this at all, I know you want to do this to test, but keep it for shorter time. My preference is that we run the entire workflow in a separate cluster if need be
// Continue with execution if successful | ||
case LaunchError: | ||
return currentState, logLinks, err | ||
// If Resource manager is enabled and there are currently not enough resources we can skip this round |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the only case when we get to LaunchWaiting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Only resourcemanager has introduced LaunchWaiting.
go/tasks/plugins/array/k8s/task.go
Outdated
func (t Task) Launch(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient) (LaunchResult, error) { | ||
podTemplate, _, err := FlyteArrayJobToK8sPodTemplate(ctx, tCtx) | ||
// Do not owner references for remote cluster execution | ||
if t.Config.RemoteClusterConfig.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this check after if err != nil {
below? or is it necessary to clear regardless?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah good catch
go/tasks/plugins/array/k8s/task.go
Outdated
return MonitorError, errors2.Wrapf(ErrCheckPodStatus, err, "Failed to check pod status.") | ||
} | ||
|
||
logger.Debugf(ctx, "Monitor - phase info %v", phaseInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add task index to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. For debug. Not useful.
|
||
return nil | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be wrapped flytestdlib.error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. Any reason to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have been wrapping it in stdlib error if we want to achieve some specific behavior. But for plugins we want to make it simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 got it
|
||
resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel) | ||
resourceConstraintSpec := core.ResourceConstraintsSpec{ | ||
ProjectScopeResourceConstraint: nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we do not want any Project/Namespace based restriction today. We can add later if needed. I think it will not be used at all as resource quota will get a lot sooner. On top there is global limit as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused though, don't we need to specify some constraint in order to leverage resource manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is the resourceNamespace that constraint and sufficient?
Codecov Report
@@ Coverage Diff @@
## master #120 +/- ##
==========================================
- Coverage 61.84% 61.20% -0.65%
==========================================
Files 104 105 +1
Lines 5797 5941 +144
==========================================
+ Hits 3585 3636 +51
- Misses 1845 1930 +85
- Partials 367 375 +8
Continue to review full report at Codecov.
|
ce45427
to
cf9c388
Compare
@anandswaminathan can you please associate this with the issue? |
import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" | ||
import mock "github.com/stretchr/testify/mock" | ||
import ( | ||
context "context" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too many blank lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will check
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) for the cluster"` | ||
} | ||
|
||
type ClusterConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put some comments here, explaining why we have a cluster config and why we need to do this? This seems a little complicated IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
go/tasks/plugins/array/core/state.go
Outdated
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] < minSuccesses[%v]", | ||
totalRunning, totalSuccesses, minSuccesses) | ||
if totalRunning+totalSuccesses+totalWaitingForResources < minSuccesses { | ||
logger.Infof(ctx, "Array failed early because totalRunning[%v] + totalSuccesses[%v] + totalWaitingForResource[%v] < minSuccesses[%v]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Array failed early because number of failures are > minimum success requirements. Snapshot: TotalRunning[%d].....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have concurrency limits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if strings.Contains(err.Error(), "exceeded quota") { | ||
// TODO: Quota errors are retried forever, it would be good to have support for backoff strategy. | ||
logger.Infof(ctx, "Failed to launch job, resource quota exceeded. Err: %v", err) | ||
t.State = t.State.SetPhase(arrayCore.PhaseWaitingForResources, 0).SetReason("Not enough resources to launch job") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the same as plugin manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in ? yes. If you are referring to same phase? then yes
podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr) | ||
resourceNamespace := core.ResourceNamespace(config.ResourceConfig.PrimaryLabel) | ||
|
||
err := tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespace, podName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is not found handled here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I checked and confirmed logs. Will confirm with resource manager API as well.
|
||
// TODO: Move logic to flytestdlib | ||
// Reads secret values from paths specified in the config to initialize a Kubernetes rest client Config. | ||
func RemoteClusterConfig(host string, auth Auth) (*restclient.Config, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of remoteCluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is our V0 milestone. We do not have a way to prioritize regular tasks over array tasks in the same namespace. Array task will end up utilizing all the resource quotas, that will prevent all the other tasks from progressing.
} | ||
|
||
if phaseInfo.Info() != nil { | ||
logLinks = append(logLinks, phaseInfo.Info().Logs...) | ||
// The first time we enter this state we will launch every subtask. On subsequent rounds, the pod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which state? i don't see this wrapped in a check for a specific phase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the entire function - LaunchAndCheckSubTasksState
. The specific phase is the caller - PhaseCheckingSubTaskExecutions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we call Launch everytime we're in PhaseCheckingSubTaskExecutions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is launch idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. And we handle K8s already exists.
Not just launch, but also resource allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, but not sure if @kumare3 has more comments
* Support for remote endpoint * Unit tests for resource manager and k8s array. * Few minor fixes and updates. * Remove owner refs for remote execution. * Removed project/namespace limit for number of pods. * abort then finalize * rm unnecessary code & update state transition * Adding unit tests, remote endpoint and fixes Co-authored-by: Miguel Toledo <[email protected]>
Continued from #71 - Unable to push to mtoledo/k8-array-resource-manager
New changes:
cc @EngHabu @migueltol22