-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rearchitecture: Kubernetes Backend #198
Comments
cc @dankerrigan |
Small, admittedly pedantic, point of clarification because it can lead to some confusion: it's important to disambiguate between the custom resource definition (CRD) and individual custom resources (CRs). The former are like a class, the latter, instances of the class. Another way to think about it is that the CRD is like a table and CRs are like rows in that table. The above is currently implemented in the operator.
The above is urrently implemented in the operator.
The above is currently implemented in the operator.
Easy to implement owner reference for secret. It's already being done for the deployments, services, and ingresses.
OSDK supports both workflows. Here's some documentation that touches on the pros and cons. If we're not relying on Kubernetes-native horizontal pod autoscaling, watching a single namespace may be sufficient, since scaling directives will always flow from the CRs down. All of the CRs could reside in a single namespace. That's all the controller would need to watch. The child deployments, on the other hand, could be deployed to any namespace, and the native controller could handle recovery in the event of pod failure or rescheduling.
OSDK provides a testing framework. We've written some pretty comprehensive unit and acceptance tests. Those will need to be refactored alongside the forthcoming changes of course. |
Oh, your controller implementation uses |
We're using deployments currently, but we can definitely strip that and use replica sets or even just pods instead. The main drawback of managing pods directly would be that we would need to recreate some fantastic, universally adopted, and thoroughly tested benefits provided by the ReplicationController. If there was a way to leverage the Kubernetes native functionality that doesn't infringe on the level of control that Dask Gateway requires, that would be ideal in my opinion. Definitely open to trying out / implementing any approach though. I'm definitely not a go native or go home zealot. One other advantage of using deployments / replica sets is that it would permit the operator to be modular and function as a standalone solution that would provide an alternative to dask-kubernetes. I could see frameworks like Kubeflow or Flyte using it in a pluggable way (engineers from both products expressed interest in the Dask operator being open-sourced at Kubecon), which would ultimately promote Dask adoption. That being said, we could always maintain two versions of the operator, one that plugs into Dask Gateway and another that functions as a standalone solution. |
I also would vastly prefer to reuse things, but am not sure the existing abstractions would work for our use case. We don't need all the fancy features that deployments provide either, just pod management and rescheduling, so implementing the subset we care about may not be too terrible. What we want is a replicaset like thing, with the following behavior:
Right now I think we should focus on solving the dask-gateway use case. If after we get things working something seems generalizable, then we may extract things out. |
This comes up most times that we interact with Kubernetes. When Dask scales things down, it wants to carefully select which dask-workers to kill, move data off of those workers, and then kill them. This sort of stateful handling isn't something that most Kubernetes Deployments were comfortable thinking about, so historically we've managed this ourself. Fortunately, our implementation is also decently well tested and used (although obviously less well than Kubernetes') and so ends up handling this process ok. My guess is that Operators give us more flexibility to handle this in a more native Kubernetes fashion. I'm not familiar enough with them to say one way or the other though. |
What criteria does Dask use when selecting workers to kill? I haven't used it before, but we may be able to use the preStop life cycle hook to ensure data transfer before termination. Happy to work towards either implementation! Thanks for taking my input into consideration. |
We want to close workers that aren't working on anything right now, and aren't storing much. In principle this could get more complex if there are some data that we can't move easily (maybe it requires a GPU for example). The full implementation is here, you might be interested in the docstring The implementation of actually retiring those workers gracefully is just below. In my ideal world a Kubernetes Operator would query the Scheduler pod and ask it to gracefully close the right things down, and then would clean up the underlying pods. This is the kind of logic that we may want to change in the future, and I think that it would be good to keep that logic in one place. |
This isn't exactly ideal. What if the scheduler pod is unresponsive, or slow to respond? When scaling down many clusters you'd have to limit the number of open requests. Scaling down a worker nicely can also take time. Instead, in dask-gateway we have the scheduler manage its own
|
Yeah, I guess I was thinking of this in the Kubernetes Operator perspective, where a user might want to scale a Dask deployment thing to n replicas. In that case it would be nice for the operator to ask the scheduler what to do. You're right though that I wasn't thinking of this in the full context of Dask-Gateway. Happy to retract my comment. |
Maybe not for choosing which pod to terminate, but what about ensuring data is transferred to other workers before termination? |
It could be used for that, but it's not ideal. We'd like to select a pod that has the least data/work to minimize the cost of shutting it down. Kubernetes could select a bad pod (say one with most of the data) and the cluster could rebalance itself before scaling down, but it'd be less efficient than if we let dask make the decision itself. |
That makes sense. I see where the data transfer is initiated in dask now https://github.com/dask/distributed/blob/f561e9646a121f74d1aecc6d1ee31baeabffad49/distributed/scheduler.py#L3423-L3429 |
Is the If so, the operator does provide support for autoscaling through a horizontal pod autoscaler. It relies on metrics server to gather information about the worker pods and increases / decreases the worker replica count accordingly. That functionality, which can be toggled on and off with a flag, in conjunction with Just because it's possible doesn't mean we should do it, of course. It may be out of scope for a preliminary implementation or insufficiently advantageous downstream. I don't mind stripping it and relying on a more DG-native approach to expedite things. Also, here's an interesting/relevant Kubernetes issue. |
Here's one potential approach (among many) to structuring the custom resource and controller in a way that's responsive to DG's worker-specific deletion requests: The custom resource would include the following fields (among others): type DaskSpec struct {
ToDelete []string
Replicas int
}
type DaskStatus struct {
Replicas int
Workers [string]string
}
Scale UpWhen DG initially instantiates a CR, the When a scale up action is triggered, DG updates the The reconciliation loop is triggered by the change in the CR. It includes logic that compares At the end of the reconciliation loop, the status subfield is updated. The Scale DownWhen a scale down is triggered, DG updates the CR's This triggers the reconciliation loop in the controller. The logic forks based on the presence of content in The controller uses the IP to lookup the specified pod name(s) in the It deletes the specified worker(s), and updates |
I've been thinking about potential ways to preserve the ReplicaSet abstraction and have confirmed that at least one approach works. Whether or not we should use it is a separate question but it's nice to know that we have the option. Solution
I tested this out on a local cluster and confirmed it works. # Initial ReplicaSet with three replicas.
❯ ~ kgrs
NAME DESIRED CURRENT READY AGE
frontend 3 3 3 16s
❯ ~ kgp
NAME READY STATUS RESTARTS AGE
frontend-g9tns 1/1 Running 0 3m33s
frontend-smqw5 1/1 Running 0 3m33s
frontend-xm8m4 1/1 Running 0 3m33s
# Delete ReplicaSet with --cascade=false flag.
❯ ~ k delete rs frontend --cascade=false
replicaset.extensions "frontend" deleted
# Confirm ReplicaSet has been deleted.
❯ ~ kgrs
No resources found in default namespace.
# Confirm pods are still up.
❯ ~ kgp
NAME READY STATUS RESTARTS AGE
frontend-g9tns 1/1 Running 0 3m53s
frontend-smqw5 1/1 Running 0 3m53s
frontend-xm8m4 1/1 Running 0 3m53s
# Delete one pod.
❯ ~ k delete pod frontend-g9tns
pod "frontend-g9tns" deleted
# Confirm that there are now only two pods.
❯ ~ kgp
NAME READY STATUS RESTARTS AGE
frontend-smqw5 1/1 Running 0 7m5s
frontend-xm8m4 1/1 Running 0 7m5s
# Apply a modified ReplicaSet manifest with only two replicas.
❯ ~ k apply -f code/frontend.yaml
replicaset.apps/frontend created
# Confirm that the ReplicaSet was created.
❯ ~ kgrs
NAME DESIRED CURRENT READY AGE
frontend 2 2 2 5s
# Confirm that no new pods were created.
❯ ~ kgp
NAME READY STATUS RESTARTS AGE
frontend-smqw5 1/1 Running 0 7m27s
frontend-xm8m4 1/1 Running 0 7m27s
# Delete one of the pods to confirm that the old pods are linked to the new ReplicaSet.
❯ ~ k delete pod frontend-smqw5
pod "frontend-smqw5" deleted
# Confirmed
❯ ~ kgp
NAME READY STATUS RESTARTS AGE
frontend-vx7dj 1/1 Running 0 40s
frontend-xm8m4 1/1 Running 0 26m |
Hmmm, that's interesting. I still think we'd be better served by managing the pods ourselves directly though. While the client-side operations you've done above may happen quickly, I suspect the kubernetes core bits managing the ReplicaSet being created/deleted in quick succession will have some measurable overhead compared to a more custom solution. That's a clever solution though, wouldn't have thought of that. |
@jcrist Going through my meeting notes and just want to clarify one discussion item. The scheduler will want to terminate specific pods. Will DG attempt to do so directly using KPC or do you want to pass a list of pods to terminate to the CR and let the controller handle that logic? Thanks! |
What is KPC? Here's the logic I'd implement in the controller:
So if replicas is 3, and we have 4 active workers and 2 pending workers, cancel the pending workers. The scheduler will properly terminate the remaining 1 active worker which will exit with exit code 0. Never terminate a running pod unless the whole cluster is being shutdown. |
Which of the four paths that you described corresponds to the scheduler wanting to delete a specific worker? What do you mean by pending worker? Maybe that's where my confusion is coming from. If that's the terminology used to describe pods that the Dask Scheduler has marked for deletion, we may want to disambiguate that from pending in the Kubernetes lexicon. I'm specifically wondering about a scale down action. Let's say there are 10 workers. The scheduler decides that 2 specific workers should be deleted. What's responsible for actually executing the deletion? |
Pending meaning not-yet-running. The kubernetes lexicon version is correct (https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase). Note that change to replicas always come from the scheduler -> gateway api server -> k8s api server -> controller. So scale down events can never be run with A few scenarios: Scenario 1
Scenario 2
Scenario 3
|
Thanks for the clarification, @jcrist! I didn't realize that you wanted the pods to terminate without mediation from a call to the Kubernetes API. That clarifies a lot. Regarding scenario 1, step 8:
Would that discrepancy trigger a scale up action? I haven't dug deep enough to confirm yet, but we may be able to leverage the job abstraction.
|
How would that trigger a scale up? It sees 10 workers running and replicas is set to 8. Replicas < running. If the workers stopped before replicas was changed then this would trigger a scale up, but they stop after the k8s api server has been updated. Kubernetes events arrive in order across watch streams, so we'll always see the decrease in replicas before the pod stop events.
I looked at that earlier, but wasn't sure if it would fit. Looking closer, it may work for us, provided we can patch The controller actions would then be:
|
Just tried, |
I was thinking in replicaset and deployment controller terms where scale ups are determined by a comparison between spec.replicas and status.replicas. We don't have to follow that model though. |
In this case |
@jcrist have you walked through CR updates beyond replica count? What's the game plan for when a user passes a config change to the CR (an annotation change, for example)? On the DG side, will that patch the CR? If so, the controller will need to include logic that compares the fields of the updated CR with fields in the workers to determine whether or not an update is necessary. |
I'm not sure I understand the question? The only changes a user will ever make is scale up, scale down, or shutdown. I'm not thinking about this so much as a kubernetes controller, more as extracting the management of pods out of the gateway api server and into a separate smaller service. So most things other controllers would have to implement we can completely ignore - the only supported interactions are through the gateway api server. |
When users call A good example of that is role ARNs which will be injected into the pod annotations so that the resulting pods can integrate with Kube2IAM to pull outside data. Another example is custom labels that we intend to use for cost tracking in conjunction with Kubecost. I was wondering if users could subsequently update those values without creating a new cluster (by having DG patch the CR -- I understand that users will not interact with the CR directly). Going through the client API documentation though, it looks like the answer to that question is no. Please correct me if that's not the case. |
Yeah, modifying the configuration of an existing cluster is out-of-scope. Dask clusters should be ephemeral and usually fairly short lived. If you created one with an incorrect configuration, just shut it down and start a new one. |
Fantastic. Thanks, Jim. That makes my life so much easier. FYI @gforsyth. |
The PodTemplate already includes a template field so we're stacking func newDaskCluster() *gatewayv1alpha1.DaskCluster {
labels := map[string]string{
"app": "foo",
}
return &gatewayv1alpha1.DaskCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "foo" + "-pod",
Namespace: "default",
Labels: labels,
},
Spec: gatewayv1alpha1.DaskClusterSpec{
Scheduler: gatewayv1alpha1.Scheduler{
Template: corev1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
Labels: labels,
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "busybox",
Image: "busybox",
Command: []string{"sleep", "3600"},
},
},
},
},
},
},
... The downstream calls in the controller require invoking |
Ah, good catch. Yeah, perhaps we should swap out |
👍 Sounds good. |
In #195 server architecture was redone to improve resiliency/performance and make the kubernetes backend more kubernetes-native. In that PR we dropped support entirely for kubernetes, support will be added back in a subsequent PR. Here I'll outline how I think the new Kubernetes backend should be architected.
When running on Kubernetes,
dask-gateway
will be composed of the following components:dask-gateway
api server, running behind a ClusterIP Service. These provide the internal api routing, but users won't (usually) connect directly to the api server.traefik
configured to use the Kubernetes CRD provider (https://docs.traefik.io/providers/kubernetes-crd/). These will handle proxying both the scheduler and dashboard routes.When initially installed, only a single
IngressRoute
object will be created to expose thedask-gateway
api server through the proxy. As dask-clusters come and go, additional routes will be added/removed. Users will only communicate with thetraefik
proxy service directly, remaining network traffic doesn't need to be exposed directly.Walking through the components individually.
Dask-Gateway CRD
An instance of a Dask-Gateway CRD object needs to contain information on how to run a single dask cluster. Here's a sketch of a proposed schema (in pseudocode):
Each dask cluster created by the gateway will have one of these objects created.
Backend Class
To plugin to the dask-gateway api server, we'd need to write a kubernetes backend. We may need to tweak the interface exposed by the
dask_gateway_server.backends.base.Backend
class, but I think the existing interface is pretty close to what we'll want.Walking through the methods:
start_cluster(self, user, cluster_options)
This is called when a new cluster is requested for a
user
. This should validate the request, then compose and create the dask-gateway CRD object and corresponding Secret (storing the TLS credentials and api token). It should return the object name (or some proxy for it) so that it can be looked up again. Note that when this method returns the cluster doesn't need to be running, only submitted.stop_cluster(self, cluster_name, failed=False)
This is called when a cluster is requested to stop. This should do whatever is needed to stop the CRD object, and should be a no-op if it's already stopped. As above, this doesn't need to wait for the cluster to stop, only for it to be stopping.
If we want to keep some job history then
stop_cluster
can't delete the CRD object, since that's the only record that the job happened. If we don't care about this then deleting the CRD is fine. Other backends keep around a short job history, but that may not matter here.get_cluster(self, cluster_name, wait=False)
This is called when a user wants to lookup or connect to a cluster. It provides the cluster name (as returned by
start_cluster
. It should return adask_gateway_server.models.Cluster
object.I think what we'll want is for each instance of the api server to watch for objects required to compose the
Cluster
model (dask-gateway CRD objects, secrets with a label selector, etc...). If the needed object is already reflected locally, we can use that. If it's not, we can try querying the k8s server in case the object was created and hasn't propagated to us yet (and then store it in the reflector locally). It should be fine to return slightly outdated information from this call.list_clusters(self, username=None, statuses=None)
This is used to query clusters, optionally filtering on username and statuses. Like
get_cluster
above, this can be slightly out of date, so I think returning only the reflected view is fine. If not, this should map pretty easily onto a k8s query with a label selector (a cluster'susername
will be stored in a label).on_cluster_heartbeat(self, cluster_name, msg)
This responds to messages from the cluster to the gateway. For the k8s backend, we only really need to handle messages that contain changes to the cluster worker count. These will translate to patch updates to the CRD objects, updating the
replicas
field.Controller
The controller handles the bulk of the logic around when/how to run pods. It should watch for updates to the CRD objects, and update the child resources (pods, ...) accordingly. Walking through the logic:
When a new CRD is created, the controller will create a scheduler pod from the scheduler template, along with any
IngressRoute
objects specified in the CRD. The scheduler pod should have the CRD as an owner reference so it'll be automatically cleaned up on CRD deletion. Likewise theIngressRoute
objects should have the scheduler pod as an owner reference.When the scheduler pod is running, it will transition the CRD status to RUNNING. This signals to the API server that the cluster is now available.
When the
replicas
value is higher than the current number of child workers (scale up event), additional worker pods will be created. The worker pods should be created with the scheduler pod as an owner reference so that they'll be automatically cleaned up.When the
replicas
value is lower than the current number of child workers the controller should do nothing. All worker pods should be configured withrestartPolicy: onFailure
. When a cluster is scaling down, the workers will exit successfully and the pods will stop themselves.When the CRD is signaled to shutdown, the scheduler pod should be deleted, which will delete the worker pods as well. We'll also need to delete the corresponding Secret. Perhaps
start_cluster
sets the CRD as the owner reference of the secret, and the controller is robust to CRDs being created before their corresponding Secret? Not sure.We want to support running clusters in multiple namespaces in the same gateway. I'm not sure what the performance implications of watching all namespaces are vs a single namespace - if it's significant we may want to make this configurable, otherwise we may watch all namespaces always even if we only deploy in a single namespace.
We'll also want to be resilient to pod creation being denied due to a Resource Quota.
I've written a controller with the Python k8s api and it was pretty simple, but I'd be happy writing this using operator-sdk or kubebuilder as well. No strong opinions.
Since this splits up the kubernetes implementation into a couple of smaller services, any meaningful testing will likely require all of them running. I'm not familiar with the tooling people use for this (see #196), advice here would be useful. We can definitely write unit tests for the smaller components, but an integration test will require everything running.
The text was updated successfully, but these errors were encountered: