Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Add inital dask plugin IDL #minor #339

Merged
merged 8 commits into from
Dec 28, 2022
Merged

Conversation

bstadlbauer
Copy link
Member

@bstadlbauer bstadlbauer commented Nov 1, 2022

TL;DR

Adds the IDL for creating a DaskJob

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Tracking Issue

https://github.com/flyteorg/flyte/issues/

Follow-up issue

NA

@welcome
Copy link

welcome bot commented Nov 1, 2022

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@bstadlbauer bstadlbauer changed the title Add inital dask plugin IDL WIP: Add inital dask plugin IDL Nov 1, 2022
@codecov
Copy link

codecov bot commented Nov 1, 2022

Codecov Report

Merging #339 (6ae78d6) into master (fd208b7) will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master     #339   +/-   ##
=======================================
  Coverage   73.12%   73.12%           
=======================================
  Files          18       18           
  Lines        1362     1362           
=======================================
  Hits          996      996           
  Misses        315      315           
  Partials       51       51           
Flag Coverage Δ
unittests 73.12% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@bstadlbauer bstadlbauer marked this pull request as ready for review November 14, 2022 14:17
@bstadlbauer bstadlbauer changed the title WIP: Add inital dask plugin IDL Add inital dask plugin IDL #minor Nov 14, 2022
@bstadlbauer bstadlbauer requested a review from hamersaw November 14, 2022 14:33
Signed-off-by: Bernhard <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. One small question - I see from the docs that the DaskJob creates a number of Pods (more complex then other clusters ex. Spark / Ray). This is why we have separate configurable image fields here which serve as a default overriding the container_name flytekit parameter. Presumably we can have finer control, ex:

@task(container_image="foo2", task_config=DaskConfig(
    image="foo1",
    // totally not correct syntax, but you get the idea
))
def bar():
    // random work

Is the main usage of this to allow a JobSpec image to have additional python dependencies? I'm guessing the dask version needs to (or at least should) be the same between the cluster pods (ie. scheduler and worker) and the JobSpec Pod right?

Hopefully very soon we want to offer Flyte support for persistent clusters, so that rather than ephemerally creating / deleting for each task we can have a cluster (Dask / Spark / Ray / etc) start at the beginning of a workflow and be used for multiple task within that workflow - or across workflows even. Just making sure none of this definition conflicts with that vision.

@bstadlbauer
Copy link
Member Author

@hamersaw Thank for looking at this!

Is the main usage of this to allow a JobSpec image to have additional python dependencies? I'm guessing the dask version needs to (or at least should) be the same between the cluster pods (ie. scheduler and worker) and the JobSpec Pod right?

I mostly wanted to give users flexibility in choosing the image if they want to, but would strongly advise against doing so (unless folks know the pros/cons). I've also tried making that clear in the plugin docs. One case I could see is when folks want to use GPU nodes as workers, which may or may not require a different image. Especially as I might add support for Additional Worker Groups to the DaskJob CRD, which would enable CPU and GPU workers next to each other.

All that being said, I've also thought about getting rid of the possibility to override image, so no strong opinion here.

@hamersaw
Copy link
Contributor

Oh perfect. Can you say a little more about what configuring additional worker groups might look like for this config? IIUC for ephemeral clusters, since there will only be one job running on each cluster this may not be

I'm wondering if it makes sense to go another step further here and split the DaskCluster configuration into SchedulerSpec and WorkerSpec as well. It seems like resource requirements would be much larger for worker pods and users may want to configure them separately. Is that a correct assumption?
cc @cosmicBboy ^^^ in your experience with Dask does it make sense for this level of configuration?

Also, right now it looks like we're using the existing container_name and resources fields in the flytekit task decorater as defaults. Then offering overrides for both the JobPodSpec and DaskCluster. Is there a scenario where users would override both the JobPodSpec and the DaskCluster? IIUC that could be done by just overriding a single one and using the existing fields right? For example, from your resource specification docs:

@task(
  task_config=Dask(
      job_pod_spec=JobPodSpec(
          image="foo:bar".
          limits=Resources(cpu="1", mem="2Gi"),  # Will be applied to the job pod
      ),
      cluster=DaskCluster(
          image:"foo:baz",
          limits=Resources(cpu="4", mem="10Gi"), # Will be applied to the scheduler and worker pods
      ),
  ),
)

and

@task(
  task_config=Dask(
    cluster=DaskCluster(
          image:"foo:baz",
          limits=Resources(cpu="4", mem="10Gi"), # Will be applied to the scheduler and worker pods
      ),
  ),
  container_name="foo:bar",
  limits=Resources(cpu="1", mem="2Gi")  # Will be applied to all components
)

The two task configurations will result in the same task right? In the latter case we can use the existing flytekit configuration instead of including additional Dask configuration. I don't feel very strongly about this, just trying to make sure the API is ergonomic and relatively similar to other Flyte constructs.
cc @wild-endeavor @eapolinario ^^^ re: flytekit API for dask tasks?

@bstadlbauer
Copy link
Member Author

Super glad to have this conversation! All in for making this more ergonomic and similar to other Flyte constructs 👍

cc @jacobtomlinson feel free to add any opinion to this thread.

Oh perfect. Can you say a little more about what configuring additional worker groups might look like for this config? IIUC for ephemeral clusters, since there will only be one job running on each cluster this may not be

So there's two more concepts that the operator would support (but they cannot be passed into a DaskJob CRD as of now): DaskWorkerGroup (docs) and DaskAutoscaler (docs). I could see myself adding those to the DaskJob in the future (given that's something dask-kubernetes wants) and afterwards add Flyte support. DaskWorkerGroups would be especially nice, as those would allow a mix of workers. One usecase we would currently have is to mix GPU and CPU workers for some jobs. Here is the full doc on how that works for client code in case you're interested.

I was thinking that both of these would then fit into the python DaskCluster object, e.g. something like the following:

@task(
    task_config=Dask(
        cluster=DaskCluster(
          ...,
          additional_worker_groups=[
              WorkerGroup(
                  image=...,
                  requests=..., 
                  limits=...,
              )
          ], 
          autoscaler=Autoscaler(
              min_workers=...,
              max_workers=...
          )
        ),
    ),
)

I'm wondering if it makes sense to go another step further here and split the DaskCluster configuration into SchedulerSpec and WorkerSpec as well. It seems like resource requirements would be much larger for worker pods and users may want to configure them separately. Is that a correct assumption?
cc @cosmicBboy ^^^ in your experience with Dask does it make sense for this level of configuration?

That seems reasonable 👍 Given the thoughts on worker groups and autocaling, would you replace the the DaskCluster or keep it and split it up within the class?
Also, even if support for additional worker groups does not exist yet, we might already create the WorkerGroup to use to specify the default workers? E.g. something like the following:

@task(
    task_config=Dask(
        ...,
        # I've kept the `cluster=` in the example, but we could also remove
        cluster=DaskCluster(
          scheduler=Scheduler(),
          default_workers=WorkerGroup(
              count=...,
              image=...,
              requests=...,
              limits=...,
            ),
        ),
    ),
)

The two task configurations will result in the same task right? In the latter case we can use the existing flytekit configuration instead of including additional Dask configuration. I don't feel very strongly about this, just trying to make sure the API is ergonomic and relatively similar to other Flyte constructs.
cc @wild-endeavor @eapolinario ^^^ re: flytekit API for dask tasks?

Ok, how about removing the job_pod_spec=JobPodSpec altogether? Then we could also get rid of the DaskCluster class and pull everything up one level.

By default, all dask job components (the runner, scheduler and worker) would then use the image and resources specified directly in @task() and folks have the ability to override the image as well as resources for the scheduler and workers (separately).

So combining all of the above, my new proposal would be:

@task(
    task_config=Dask(
        # All of these are optional and default to settings in `@task`
        scheduler=Scheduler(
            image=...,
            limits=...,
            requests=...,
        ),
        # All of these are optional and default to settings in `@task`
        workers=WorkerGroup(
            count=...,
            image=...,
            requests=...,
            limits=...,
        ),
    ),
    image=...,
    requests=..., 
    limits=...
)

@hamersaw
Copy link
Contributor

I really like this ^^^. Hoping for a bit more input before moving forward, can we give this a day?

@bstadlbauer
Copy link
Member Author

Sure thing, no rush on my end

@jacobtomlinson
Copy link

jacobtomlinson commented Dec 14, 2022

A few thoughts.

Hopefully very soon we want to offer Flyte support for persistent clusters, so that rather than ephemerally creating / deleting for each task we can have a cluster (Dask / Spark / Ray / etc) start at the beginning of a workflow and be used for multiple task within that workflow - or across workflows even. Just making sure none of this definition conflicts with that vision.

@hamersaw This would be great. This is one of the use cases we had in mind for workflow engines when we designed the new Dask Kubernetes Operator. You could create a DaskCluster resource at the start of your workflow and tear it down at the end while reusing it through the various tasks.

So there's two more concepts that the operator would support (but they cannot be passed into a DaskJob CRD as of now): DaskWorkerGroup (docs) and DaskAutoscaler (docs). I could see myself adding those to the DaskJob in the future (given that's something dask-kubernetes wants) and afterwards add Flyte support.

@bstadlbauer This is interesting. I don't imagine that we will nest additional DaskWorkerGroup and DaskAutoscaler resources within the DaskJob CRD. The resources are intended to be used together anyway. Once you have created a DaskJob (or DaskCluster) resource you can create more DaskWorkerGroup and DaskAutoscaler resources in the future that use a selector to attach them to the cluster.

Is there any limitation in Flyte that requires everything to be a single resource, or can you create multiple resources?

@wild-endeavor
Copy link
Contributor

@hamersaw @bstadlbauer yup, i like the final syntax a lot, thank you both for all the iterating. Should we just drop the jobpodspec message then from the proto file?

To answer the question @jacobtomlinson i'll defer to @hamersaw - I'm not sure what the vision for this plugin is on the backend. If this takes the shape of a custom plugin on the backend, certainly that would have the ability to control more than one resource, though admittedly it makes the code more complex.

@hamersaw
Copy link
Contributor

Is there any limitation in Flyte that requires everything to be a single resource, or can you create multiple resources?

@jacobtomlinson so right now the k8s plugins are designed to operate over a single resource. So basically running a task is creating a k8s resource and then monitoring it's status. Which is why adding the status to the top-level CRD for the DaskJob was important (thanks again both of you for getting this through!).

The vision for enabling persistent clusters, while admittedly very early in design, is to create a new set of Flyte plugins that allow for managing clusters. So as you suggested, a workflow begins by calling the DaskCluster plugin which creates a cluster using the DaskCluster CRD, then calls a Flyte Dask task which could create a new DaskJob CRD attached to the existing cluster, etc. For most use-cases this seems to fit, ex. Ray, Spark. However, it may be a little more complicated here to dynamically add worker groups.

I'm wondering about the advantages of supporting dynamically adding worker groups to a cluster. Is it much better than creating a cluster with two worker groups at the beginning of the workflow? Or creating two separate clusters at the beginning of the workflow?

Not sure this has to be addressed in these PRs - maybe we should start a new issue for supporting "persistent clusters" where these conversations can be tracked.

@cosmicBboy
Copy link
Contributor

The final syntax LGTM too! One question about it: would the workers kwarg also accept a list of WorkerGroup? Just asking because the previous iteration had the arg as additional_worker_groups.

@bstadlbauer
Copy link
Member Author

bstadlbauer commented Dec 15, 2022

@jacobtomlinson Thank you so much for having a look here!

@bstadlbauer This is interesting. I don't imagine that we will nest additional DaskWorkerGroup and DaskAutoscaler resources within the DaskJob CRD. The resources are intended to be used together anyway. Once you have created a DaskJob (or DaskCluster) resource you can create more DaskWorkerGroup and DaskAutoscaler resources in the future that use a selector to attach them to the cluster.

@hamersaw already answered this, but yes, currently this is a Flyte limitation of one plugin task corresponding to one k8s CRD.

@wild-endeavor

@hamersaw @bstadlbauer yup, i like the final syntax a lot, thank you both for all the iterating. Should we just drop the jobpodspec message then from the proto file?

I would change up the protobuf structure to match the proposed one here. Which would roughly entail:

  • Dropping the JobPodSpec
  • Dropping the DaskCluster
  • Adding both Scheduler and WorkerGroup (and add that to the toplevel DaskJob)

Or would anyone prefer to keep the current protobuf (without JobPodSpec) and then do the "translation" in Python?

@hamersaw

I'm wondering about the advantages of supporting dynamically adding worker groups to a cluster. Is it much better than creating a cluster with two worker groups at the beginning of the workflow? Or creating two separate clusters at the beginning of the workflow?

I think it would be sufficient when additional worker groups are created as part of the cluster creation in the Flyte usecase. Creating two clusters would not work, as you'd want to build a (dask) task graph where some of the graph's nodes could be run on a different worker (e.g. one with more memory, etc).

@cosmicBboy

The final syntax LGTM too! One question about it: would the workers kwarg also accept a list of WorkerGroup? Just asking because the previous iteration had the arg as additional_worker_groups.

I was thinking workers would take one WorkerGroup and in case there will be support for additional worker groups then a new argument additional_worker_groups: List[WorkerGroup] = [] could be added. We could collapse that down to one argument though, maybe calling it worker_groups: List[WorkerGroup] = [WorkerGroup()] and then being very explicit in the docs that the first WorkerGroup is the "default" one?
I would be in favor of the first option (two arguments), because:

  • As there is currently no support for additional worker groups having a list with more than one element could be confusing
  • I feel like it matches dask-kubernetes a bit more closely, where you always have a "default" worker group and can then add "additional" ones in case you need them. I would assume that adding more worker groups is also fairly uncommon.

@jacobtomlinson
Copy link

jacobtomlinson commented Dec 15, 2022

so right now the k8s plugins are designed to operate over a single resource

Right that makes sense, I understand the motivation now.

creates a cluster using the DaskCluster CRD, then calls a Flyte Dask task which could create a new DaskJob CRD attached to the existing cluster

For reference, the DaskJob resource is a meta resource that creates a DaskCluster + Pod with some env vars injected into the Pod to enable configless connectivity to the cluster. So I would imagine you would create the DaskCluster resource at the start and your task would also just be a Pod or Job with the appropriate env var config.

I'm wondering about the advantages of supporting dynamically adding worker groups to a cluster.

It's less about being dynamic and more about being composable. Folks compose together a DaskCluster, multiple DaskWorkerGroups and a DaskAutoscaler resource to define their cluster. This is the same model as folks composing together a few Deployment, Service and HorizontalPodAutoscaler resources to make up an application.

We made the decision to nest one DaskWorkerGroup within the DaskCluster resource so try and keep simple use cases simple. You can get a fully functioning cluster with one resource. In hindsight, I wonder if we should've created a DaskScheduler resource to contain the scheduler logic rather than having the DaskCluster create the scheduler Pod and Service directly. Perhaps the DaskCluster should also take a list of worker groups instead of one.

Then we could push more in the direction of having some primitive Dask building blocks like DaskScheduler, DaskWorkerGroup and DaskAutoscaler. Then modify the DaskCluster to be a purely meta resource where you can specify a DaskScheduler, a list of DaskWorkerGroup and a DaskAutoscaler as one resource. Then the DaskJob is another level higher and allows you to specify your workload Pod and a DaskCluster as a single resource.

Users who want to be super composable can create manifests or helm charts with DaskScheduler, DaskWorkerGroup, DaskAutoscaler, etc combined with Deployment, Pod, Service, etc to build up their application. But workflow tools like Flyte with a single resource limitation can specify everything Dask related as part of one huge meta resource.

Is that what you had in mind @bstadlbauer?

@hamersaw
Copy link
Contributor

But workflow tools like Flyte with a single resource limitation can specify everything Dask related as part of one huge meta resource.

Totally understand the breakdown of different components (ie. DaskWorkerGroup, DaskScheduler, DaskAutoscaler). For this work I think we're just setting up a plugin for executing a single task on an ephemeral cluster. So as long as we cover the DaskCluster config with enough depth, or at least offer extensibility, that should suffice for the initial implementation.

Once we work on persistent clusters (ie. starting at beginning of workflow and executing mutliple tasks) we should prioritize the ability to manage multiple resources, I don't foresee this being an issue. Thanks @jacobtomlinson for the thorough explanation into the requirements to fully utilize the k8s Dask offerings, this model sounds like it should apply well with other cluster types as well.

@jacobtomlinson
Copy link

So as long as we cover the DaskCluster config with enough depth, or at least offer extensibility, that should suffice for the initial implementation.

Yeah this is what I had in mind because the DaskCluster resource can only specify one DaskWorkerGroup and no DaskAutoscaler resources. The user is expected to create those separately. So if your plugin can only create one resource for the ephemeral clusters maybe we should expand DaskCluster to allow for an autoscaler and list of worker groups. Which I think is what @bstadlbauer was proposing.

@cosmicBboy
Copy link
Contributor

Just leaving a +1 here for the additional_worker_groups argument to differentiate default worker from additional workers.

@bstadlbauer
Copy link
Member Author

Thank you all for the quick responses as well as the great feedback! I've made the changes according to this discussion, but happy to revisit if there are any further comments.

@hamersaw
Please note that I've changed up a few details, but no strong opinion here, so happy to change back:

  • Renamed nWorkers to number_of_workers in the .proto file, as google recommends snake case, and other files in this repo do that as well (the spark IDL however, doesn't)
  • Converted number_of_workers from an int32 to a uint32 as it should never be negative in the first place. Also guarded against this in the flytekit plugin
  • All field names are now lowercase (as that also seems to be the standard)

@jacobtomlinson

Yeah this is what I had in mind because the DaskCluster resource can only specify one DaskWorkerGroup and no DaskAutoscaler resources. The user is expected to create those separately. So if your plugin can only create one resource for the ephemeral clusters maybe we should expand DaskCluster to allow for an autoscaler and list of worker groups. Which I think is what @bstadlbauer was proposing.

Exactly, yes!

hamersaw
hamersaw previously approved these changes Dec 27, 2022
eapolinario
eapolinario previously approved these changes Dec 27, 2022
Comment on lines 20 to 41
message Scheduler {
// Optional image to use. If unset, will use the default image.
string image = 1;

// Resources assigned to the scheduler pod.
core.Resources resources = 2;
}

message WorkerGroup {
// Number of workers in the group.
uint32 number_of_workers = 1;

// Optional image to use for the pods of the worker group. If unset, will use the default image.
string image = 2;

// Resources assigned to the all pods of the worker group.
// As per https://kubernetes.dask.org/en/latest/kubecluster.html?highlight=limit#best-practices
// it is advised to only set limits. If requests are not explicitly set, the plugin will make
// sure to set requests==limits.
// The plugin sets ` --memory-limit` as well as `--nthreads` for the workers according to the limit.
core.Resources resources = 3;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we either start prefixing all messages with the name of the plugin or create separate go packages for them?
In python, these will be created under dask_pb2.Scheduler which is fine but in go, it'll be just plugins.Scheduler which isn't very distinguishable ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@hamersaw hamersaw Dec 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, thanks! I would vote for just prefixing each with Dask, but could be convinced either way if anyone else feels strongly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @EngHabu! I've prefixed everything with Dask in 6ae78d6 - let me know if you'd rather have a distinct package

Signed-off-by: Bernhard Stadlbauer <[email protected]>
@bstadlbauer bstadlbauer dismissed stale reviews from eapolinario and hamersaw via 6ae78d6 December 28, 2022 14:58
Copy link
Contributor

@EngHabu EngHabu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thank you

@EngHabu EngHabu merged commit ac5d3d8 into flyteorg:master Dec 28, 2022
@welcome
Copy link

welcome bot commented Dec 28, 2022

Congrats on merging your first pull request! 🎉

wild-endeavor pushed a commit that referenced this pull request Jan 4, 2023
Signed-off-by: Bernhard Stadlbauer <[email protected]>
@eapolinario eapolinario mentioned this pull request Apr 24, 2023
8 tasks
eapolinario pushed a commit that referenced this pull request Sep 8, 2023
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants