-
Notifications
You must be signed in to change notification settings - Fork 118
Spark on Kubernetes - basic scheduler backend #498
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is excellent content-wise for an initial push. We could have considered moving the executor failure handling to its separate commit, but I'm not too strongly opinionated one way or another.
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.internal.Logging | ||
|
||
object ConfigurationUtils extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark with private[spark]
import org.apache.spark.internal.Logging | ||
|
||
object ConfigurationUtils extends Logging { | ||
def parseKeyValuePairs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we use this at least in the context of this PR?
requestExecutorsService) | ||
|
||
private val driverPod = try { | ||
kubernetesClient.pods().inNamespace(kubernetesNamespace). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might prefer to stylize a bit differently:
kubernetesClient.pods()
.inNamespace(kubernetesNamespace)
.withName(kubernetesDriverPodName)
.get
// by the executor pod watcher. If the loss reason was discovered by the watcher, | ||
// inform the parent class with removeExecutor. | ||
val disconnectedPodsByExecutorIdPendingRemovalCopy = | ||
Map.empty ++ disconnectedPodsByExecutorIdPendingRemoval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be a more idiomatic way to copy here. Can you try disconnectedPodsByExecutorIdPendingRemoval.toMap
- as an experiment, explicitly set the type of disconnectedPodsByExecutorIdPendingRemovalCopy
to Map[String, Pod]
? Basically as long as this collection is not mutable then we'll be safe.
import org.apache.spark.deploy.kubernetes.constants | ||
import org.apache.spark.network.netty.SparkTransportConf | ||
|
||
class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - ExecutorPodFactorySuite
- our convention at least so far has been to name the test after the trait and not the impl since there's only ever one impl for each trait. But this might change upstream.
Checkstyle on Java is what's causing the build failures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compared this with branch-2.2-kubernetes
. Looks reasonable to me.
Just one minor comment below. PTAL.
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object | ||
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK | ||
private val executorPodsByIPs = new mutable.HashMap[String, Pod] | ||
private val podsWithKnownExitReasons: concurrent.Map[String, ExecutorExited] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish @varunkatta This part seems a bit outdated compared with branch-2.2-kubernetes, which use ConcurrentHashMap for this line.
Addressed comments. Thanks @mccheah and @kimoonkim! |
Unit tests look good. There's not much else that can be tested at this point. If there are no further comments, I'll turn this into a PR against upstream tomorrow at 5pm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
btw, is there a reason for the private[spark]
scope? I think it would be better to start with a more constrained scope, like deploy
or k8s
even. but we could see if anyone complains :)
@felixcheung |
The relevant parts of #491 are already covered in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I see - this is good to go.
...Once the build is fixed, of course =) |
It should, for example sql has private[sql] and mesos has private[mesos]
https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
|
8b57934
to
f6fdd6a
Compare
* options for different components. | ||
*/ | ||
private[spark] object SparkKubernetesClientFactory { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Clean up these white spaces, or at least be consistent
This is being reviewed in apache#19468 |
- Move Kubernetes client calls out of synchronized blocks to prevent locking with HTTP connection lag - Fix a bug where pods that fail to launch through the APi are not retried - Remove the map from executor pod name to executor ID by using the Pod's labels to get the same information without having to track extra state.
df03462
to
0ab9ca7
Compare
e5a6a67
to
cb12fec
Compare
could you check the Integration Test? it seems to have failed to build |
@felixcheung we shouldn't be running integration tests when we submit upstream. Also, shouldn't we instead be tracking apache#19717 and thus this PR be closed? |
This PR is tracking the changes in the upstream one. It's the same branch
used for both PRs, and is simply for tracking.
…On Nov 28, 2017 10:32 AM, "mccheah" ***@***.***> wrote:
@felixcheung <https://github.com/felixcheung> we shouldn't be running
integration tests when we submit upstream. Also, shouldn't we instead be
tracking apache#19717 <apache#19717> and
thus this PR be closed?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#498 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AA3U5xdA-jTh_3QQGwd4KGQ-1C15LdWyks5s7FGxgaJpZM4PdiJJ>
.
|
Closing this one as the upstream PR has been merged. |
…ache-spark-on-k8s#498) * add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache-spark-on-k8s#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache-spark-on-k8s#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
…ache-spark-on-k8s#498) * add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache-spark-on-k8s#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache-spark-on-k8s#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
Continuing #492
Stripped out a lot of extraneous things, to create this. Our first PR upstream will likely be similar to this. (note that it is created against the master branch which is up-to-date.)
Following PRs will have:
etc
TODO before we can retarget this to apache/spark:master:
cc @ash211 @mccheah @apache-spark-on-k8s/contributors