-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468
Conversation
Jenkins, ok to test |
Test build #82614 has finished for PR 19468 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great! looks like nothing is broken :)
shall we enable the new code via the profile?
pom.xml
Outdated
@@ -2649,6 +2649,13 @@ | |||
</profile> | |||
|
|||
<profile> | |||
<id>kubernetes</id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this new profile is not use in build script, shall we add this so that the new contributions in this PR will be built and tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also change the sbt file to make it work using sbt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. PTAL.
@felixcheung I am adding @susanxhuynh @ArtRand (both from Mesosphere), I am not sure Michael is available. I will also have a look. Btw wouln't be easier to work on the pluggable API first (maybe missing something here) before moving integration tests to RiseLab etc and before we add more dependencies? |
@skonto, there was some discussion about this on the SPIP. We see them as separate and independent issues with the pluggable API being a long term goal. It would involve a working group of people drawn from all cluster manager integration maintainers - because the changes are extensive and require scalability/performance testing in a variety of environments - probably over 2-3 releases before we can iron out the kinks. In the short term however, adding the package here enables supporting the growing number of K8s users of Spark that currently rely on our fork, and the integration testing makes us confident about not impacting the release process or adding complexity to the maintainers workflow. The K8s community will set up all the requisite testing, and ensure health of this package. The integration testing would also carry forward to when we have pluggable APIs in the future. |
A large chunk of the difficulty in identifying and ironing out kinks in such a project is the difficulty of writing adequate tests of the scheduler code. I'd expect test coverage to take roughly the same amount of effort as all of the rest of the scheduler plug-in effort. |
Test build #82854 has finished for PR 19468 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly pointed out style issues. There's quite a few of minor deviations from the convention used in other modules.
I also noticed a lot of code "borrowed" from the YARN backend. While it would be nice to try to refactor the parts that make sense, the main worry is whether those make sense in this context. e.g. I saw error code values that are internal to YARN being used here.
I haven't really thoroughly reviewed the scheduler backend nor the tests, although I did seem to notice some code that kind seemed to be replicating executor accounting that is currently done in CoarseGrainedSchedulerBackend
and ExecutorAllocationManager
. I'd double check that the code is really needed, since this is an area that constantly ends up with bugs because of state being tracked in too many places.
Can't really comment on k8s code. But I'll just mimic comments that ask for as many tests as possible.
kubernetes = Module( | ||
name="kubernetes", | ||
dependencies=[], | ||
source_file_regexes=["resource-managers/kubernetes/core"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove 'core'
pom.xml
Outdated
@@ -2649,6 +2649,13 @@ | |||
</profile> | |||
|
|||
<profile> | |||
<id>kubernetes</id> | |||
<modules> | |||
<module>resource-managers/kubernetes/core</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to be a multi-module module later? If so, might want to create a parent pom in resource-managers/kubernetes
. Otherwise probably should remove core
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expected it would be a multi-module (https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes). The other modules being - configuration files for the docker images and integration tests. The docker files are pretty much static configuration files, so, moving that instead to a directory like conf/kubernetes/
might make sense also. I'm not sure where the integration tests will be eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on a discussion in last week's meeting with Shane Knapp from RISELab, we want to keep the integration tests as a sub-module here - in the interest of keeping test code together. We should have the additional parent pom to facilitate that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin this isn't a multi-module project in the sense that the Kubernetes cluster manager and spark-submit implementation are split across multiple projects - but rather that there is a module for said cluster manager + spark-submit implementation, and then there are modules for integration testing said code.
@foxish The Dockerfiles feel more like application code rather than static configuration but that might just be a matter of implementation. The structure of the CMD
in the Dockerfiles is particular to what spark-submit
will expect for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are integration tests in a separate module? e.g. maven has an integration-test
phase which is separate from the usual test
phase used for unit tests. And that's assuming that you really don't want to run them during unit tests. Then all code could potentially live in the same module.
in the interest of keeping test code together
That would mean keeping the test code in the same module as the core code, not in a separate module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not absolutely necessary to have integration tests in a specific separate module. However, there are some nice organizational benefits we can get. For example, integration tests in the same module as the core code will need a specific package namespace that is omitted from the test
phase and only executed in the integrationTest
phase. Having a separate module means that the integration test pom can just make the test
phase a no-op and integrationTest runs all tests in the test
folder. (I don't know if Maven has a concept of a difference between src/test/scala
and src/integrationTest/scala
, which would help a lot.)
It's also IMO easier to read the pom.xml
of the integration test separately from the pom.xml
of the Kubernetes core implementation. FWIW this is what we have in the integration test POM at the moment: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/integration-tests/pom.xml. (The minikube related things are going away with apache-spark-on-k8s#521).
And that's assuming that you really don't want to run them during unit tests.
We definitely don't want to run these during unit tests - they are relatively expensive, require building Docker images, and require Minikube to be pre-installed on the given machine. Having them in at least the separate integration test phase makes these differences clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That (keeping them separate) is actually pretty useful for SBT.
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.internal.Logging | ||
|
||
private[spark] object ConfigurationUtils extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging
is not used?
} | ||
|
||
def requireSecondIfFirstIsDefined( | ||
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one param per line
maybeServiceAccountCaCert: Option[File]): KubernetesClient = { | ||
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" | ||
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" | ||
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not create constants like for other config options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lacks context from the spark-submit
implementation that is not in this PR.
We intend to have two different sets of authentication options for the Kubernetes API. The first is the credentials for creating a driver pod and all the Kubernetes resources that the application requires outside of executor pods. The second is a set of credentials that the driver can use to create executor pods. These options will have shared suffixes in the configuration keys but different prefixes.
The reasoning for two sets of credentials is twofold:
- The driver needs strictly fewer privileges than
spark-submit
, because the driver only creates + deletes pods butspark-submit
needs to make pods and other Kubernetes resources. Two sets of credentials allows the driver to have an appropriately limited scope of API access. - Part of the credentials includes TLS certificates for accessing the Kubernetes API over HTTPs. A common environment is to have the Kubernetes API server be accessible from a proxy into the cluster from an outside location, but then the driver will access the API server from inside the cluster. A front door for the API server typically asks for a different certificate than the certificate one would present when accessing the API server internally.
|
||
} | ||
|
||
override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this method necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the application ID for labeling the driver and executors pods in k8s so users can aggregate and filter by that label to obtain all execution units belonging to a particular job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...execution units belonging to a particular Spark Application.
not job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted, thanks! (I meant "job" in the context of a "batch/streaming job", but I understand that's an overloaded term wrt Spark.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that doesn't answer my question. Are you saying that you let users define the application ID?
That would be a bad idea since Spark assumes those are unique in certain places (e.g. the event logger and the history server).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's simply running the Client as one might expect.
Yeah, but what about client mode? In client mode the first time the k8s code will be invoked is when the scheduler backend is called. So you can't rely on anything else setting spark.app.id
for you. In that case here you'd end up with whatever the super class defines - unless the user sets an app id themselves, and that shouldn't be allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client mode on k8s will look very different from that on other cluster managers. This is because in almost all k8s clusters, networking in setup such that pods (or simply containers) within clusters are not routable directly from outside, and vice versa.
An alternative to support the same functionality is "in-cluster client mode". This will look similar to cluster mode (driver and executor will run within the cluster), except that we'd set a couple of additional parameters on the driver pod, to allow someone outside the cluster to connect with it interactively (using the Kubernetes APIServer as a proxy). We don't foresee a design in which the KubernetesClusterSchedulerBackend
will be invoked directly on the client-side, because there is no standard mechanism by which executors can reach back out to the driver in k8s (this is by design, and the reasoning is explained in part by the requirements in https://kubernetes.io/docs/concepts/cluster-administration/networking/#kubernetes-model).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this is starting to sound more like a review of your spark-submit integration. Can you move this method implementation to that PR later on, since then the git history will actually give you more context about why this particular implementation is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin, your general concern is understood. I think here the key difference from YARN is that we don't have the notion of asking k8s for a unique ID and "registering the application" in the same way which is why we need to pass this ID to the components we create - the driver and the executors.
Like @mccheah said, we could actually avoid using spark.app.id
to identify and name various components we create in the cluster, and just use our a k8s-specific random generated ID instead, but that might make it difficult for people to reconcile what they see in the history server or other UIs with what they see in Kubernetes (driver & executor pods). If you feel strongly about the setting of spark.app.id
, we can do this. So far from usage we've seen, I don't think it has been surprising to people.
If you don't feel strongly about it, we can document this discussion with a code comment for now - so we don't lose this thread which has important concerns - and if we get to a point where it becomes evident that it's causing issues/confusion, we can move to the above approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this is starting to sound more like a review of your spark-submit integration. Can you move this method implementation to that PR later on, since then the git history will actually give you more context about why this particular implementation is needed?
Yeah, this SGTM. We can do that and revisit this later.
} | ||
|
||
private object KubernetesClusterSchedulerBackend { | ||
private val VMEM_EXCEEDED_EXIT_CODE = -103 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These values are defined by YARN (not the Spark backend) - see YARN's ContainerExitStatus
. Do they make any sense in k8s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@varunkatta PTAL
.build() | ||
|
||
private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] | ||
private type LABELLED_PODS = FilterWatchListDeletable[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
labeled
verify(podOperations).create(SECOND_EXECUTOR_POD) | ||
} | ||
|
||
test("Deleting executors and then running an allocator pass after finding the loss reason" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shorter test name?
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD) | ||
} | ||
|
||
test("Executors should only try to get the loss reason a number of times before giving up and" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is too long, try to make them fit in one line.
Test build #82856 has finished for PR 19468 at commit
|
@vanzin, thank you for reviewing - I've addressed some of the style and organization related issues. You're right in that the use of YARN internal constants may not make sense here - we'll be discussing this in our community meeting and addressing that shortly. |
Test build #82886 has finished for PR 19468 at commit
|
@vanzin, you were right, the YARN constants were left overs and made no sense wrt k8s. We discussed it in our weekly meeting - it was simply dead code. I've addressed most of the style comments and the major concern about the constants. It's ready for a more in-depth review. |
Test build #82938 has finished for PR 19468 at commit
|
Test build #82940 has finished for PR 19468 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of minor comments, thanks!
} catch { | ||
case throwable: Throwable => | ||
logError(s"Executor cannot find driver pod.", throwable) | ||
throw new SparkException(s"Executor cannot find driver pod", throwable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't need s"
this and previous line, and L98, L103.
typically we avoid that for (minor) performance reasons
|
||
private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE = | ||
ConfigBuilder("spark.kubernetes.allocation.batch.size") | ||
.doc("Number of pods to launch at once in each round of dynamic allocation. ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space at the end of string here and L102
masterWithoutK8sPrefix | ||
} else { | ||
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit no need for s"
here.
also perhaps this is more useful as Info instead of Debug
.withName(kubernetesDriverPodName) | ||
.get() | ||
} catch { | ||
case throwable: Throwable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on Throwable - is there a more specific exception/error to catch? or just let it through
knownExitReason.fold { | ||
removeExecutorOrIncrementLossReasonCheckCount(executorId) | ||
} { executorExited => | ||
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log as Error? or Warning?
// allows them to be debugged later on. Otherwise, mark them as to be deleted from the | ||
// the API server. | ||
if (!executorExited.exitCausedByApp) { | ||
deleteExecutorFromClusterAndDataStructures(executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for debuggability, it might be good to log the fact that this is exitCausedByApp
executorPodsByIPs.remove(podIP) | ||
} | ||
if (action == Action.ERROR) { | ||
logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warning or Error?
// container was probably actively killed by the driver. | ||
val exitReason = if (isPodAlreadyReleased(pod)) { | ||
ExecutorExited(containerExitStatus, exitCausedByApp = false, | ||
s"Container in pod " + pod.getMetadata.getName + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"Container in pod ${pod.getMetadata.getName}"
Test build #83060 has finished for PR 19468 at commit
|
Test build #84227 has finished for PR 19468 at commit
|
Test build #84228 has finished for PR 19468 at commit
|
.intConf | ||
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " + | ||
"must be a positive integer") | ||
.createWithDefault(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't this not 10 earlier ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably keep it the same as before, as it's what we've been running with already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, changed back to 10.
docs/configuration.md
Outdated
@@ -1397,10 +1397,10 @@ Apart from these, the following properties are also available, and may be useful | |||
</tr> | |||
<tr> | |||
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td> | |||
<td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td> | |||
<td>2.3.0 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.8 for KUBERNETES mode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, totally mis-interpreted it. Fixed.
Test build #84230 has finished for PR 19468 at commit
|
Test build #84233 has finished for PR 19468 at commit
|
Test build #84234 has finished for PR 19468 at commit
|
I see the latest changes are backported back to the fork - thanks! it's useful to do since I think we get more comprehensive test coverage there, for now. |
I think the other cases, mesos, yarn/docker are quite different as they are one of the possible environment/setup, whereas in this case there is no other alternative - docker image must be there.
Anyway I think we should proceed as-is and revisit if it makes sense.
|
@felixcheung @mridulm All the comments have been addressed. Is this good to merge? |
Test build #84265 has finished for PR 19468 at commit
|
Thanks for the changes, and all the great work ! We can leave it open for a day or so in case others want to do a last pass over it before merging it in. |
LGTM, thanks for the awesome work! |
Thanks - merging in master! |
For future pull requests, can you create subtasks under https://issues.apache.org/jira/browse/SPARK-18278 ? |
Done, created subtasks. |
Thanks everyone and congrats!
|
This PR contains implementation of the basic submission client for the cluster mode of Spark on Kubernetes. It's step 2 from the step-wise plan documented [here](apache-spark-on-k8s#441 (comment)). This addition is covered by the [SPIP](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) vote which passed on Aug 31. This PR and #19468 together form a MVP of Spark on Kubernetes that allows users to run Spark applications that use resources locally within the driver and executor containers on Kubernetes 1.6 and up. Some changes on pom and build/test setup are copied over from #19468 to make this PR self contained and testable. The submission client is mainly responsible for creating the Kubernetes pod that runs the Spark driver. It follows a step-based approach to construct the driver pod, as the code under the `submit.steps` package shows. The steps are orchestrated by `DriverConfigurationStepsOrchestrator`. `Client` creates the driver pod and waits for the application to complete if it's configured to do so, which is the case by default. This PR also contains Dockerfiles of the driver and executor images. They are included because some of the environment variables set in the code would not make sense without referring to the Dockerfiles. * The patch contains unit tests which are passing. * Manual testing: ./build/mvn -Pkubernetes clean package succeeded. * It is a subset of the entire changelist hosted at http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations. * There is integration testing enabled in the fork currently hosted by PepperData which is being moved over to RiseLAB CI. * Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html cc rxin felixcheung mateiz (shepherd) k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926 Author: Yinan Li <[email protected]> Closes #19717 from liyinan926/spark-kubernetes-4.
What changes were proposed in this pull request? This PR contains documentation on the usage of Kubernetes scheduler in Spark 2.3, and a shell script to make it easier to build docker images required to use the integration. The changes detailed here are covered by #19717 and #19468 which have merged already. How was this patch tested? The script has been in use for releases on our fork. Rest is documentation. cc rxin mateiz (shepherd) k8s-big-data SIG members & contributors: foxish ash211 mccheah liyinan926 erikerlandson ssuchter varunkatta kimoonkim tnachen ifilonenko reviewers: vanzin felixcheung jiangxb1987 mridulm TODO: - [x] Add dockerfiles directory to built distribution. (#20007) - [x] Change references to docker to instead say "container" (#19995) - [x] Update configuration table. - [x] Modify spark.kubernetes.allocation.batch.delay to take time instead of int (#20032) Author: foxish <[email protected]> Closes #19946 from foxish/update-k8s-docs.
What changes were proposed in this pull request?
This is a stripped down version of the
KubernetesClusterSchedulerBackend
for Spark with the following components:It's step 1 from the step-wise plan documented here.
This addition is covered by the SPIP vote which passed on Aug 31 .
How was this patch tested?
./build/mvn -Pkubernetes clean package
succeeded.cc @rxin @felixcheung @mateiz (shepherd)
k8s-big-data SIG members & contributors: @mccheah @ash211 @ssuchter @varunkatta @kimoonkim @erikerlandson @liyinan926 @tnachen @ifilonenko