Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1397,10 +1397,10 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
<td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
<td>2.3.0 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.8 for KUBERNETES mode ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, totally mis-interpreted it. Fixed.

<td>
The minimum ratio of registered resources (registered resources / total expected resources)
(resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
(resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
Regardless of whether the minimum ratio of resources has been reached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ private[spark] trait ExecutorPodFactory {
private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
extends ExecutorPodFactory {

import ExecutorPodFactoryImpl._

private val executorExtraClasspath =
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)

Expand Down Expand Up @@ -76,7 +74,6 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)

private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
private val blockManagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming the port's will always be available (for executor and driver) to bind to ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - because we're deploying the driver and executors in a containerized setting, we're a lot more free to make looser assumptions about available ports. The exception will be if Spark applications are running sidecar containers along with the main driver/executor containers, but support for that is further out in the future when/if we expect Pod Presets to interact with our code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I wanted to make sure I understood it right that we are making the assumption about port being unbound and available for spark.


Expand Down Expand Up @@ -139,7 +136,6 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
}
}.getOrElse(Seq.empty[EnvVar])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this getting used ? I see it getting set, but not used anywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the executor Docker file included in #19717.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, somehow it did not show up in my searches.

val executorEnv = (Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
// Executor backend expects integral value for executor cores, so round it up to an int.
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
Expand All @@ -159,7 +155,6 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace use of ip with hostname ?
Typically within spark, hostnames are used and not ip's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no stable hostname for a Kubernetes Pod unless we put the Pod behind a Service. We can't put executors behind services because Services are considered as heavyweight objects, and we want to constrain the number of them we have to make.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the above comment should be more precise.

DNS is handled by kube-dns in any given Kubernetes context. (kube-dns is an optional component, but in future commits it will become clear that Spark will require kube-dns to be installed on a cluster, and we will document as such). kube-dns creates a DNS entry for services that route to the IPs of pods in the system. But kube-dns does not create a DNS entry for every pod, unless there is a service that maps to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are various parts of spark which assert that a hostname is used and not IP - that is, spark makes the assumption about executors and drivers being reachable via hostnames.
Am I right in understanding that this assumption does not hold for kubernetes case ?
That is, containers need not have a routable hostname assigned to them ?

Copy link
Contributor

@mridulm mridulm Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, the hostname need not be stable - but needs to be stable only for the duration the container is 'alive' : and is not immediately reused (in case some other executor container comes up with exact same hostname after previous exits for the same spark app).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add, if the list above is not accurate (needs additions/mods/deletions), that is fine - I was listing potential concerns from top of my head.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm In response to each of your four points:

  • IPv6: IP addresses are strictly managed by the Kubernetes framework, so it's unlikely we're going to run into differences between Ipv4 and Ipv6 in different Kubernetes clusters. We should assume that one of these two address types are being used across all clusters and work with that. So far I've only seen Kubernetes assign IPv4 addresses.
  • No support for multihomed machines of multi-routable IP's: Again, since Kubernetes is managing the IP address and routability of pods, we can understand what the framework will do and work with that. I don't think the framework does anything fancy in this space, but maybe @foxish or others have ideas?
  • Distributed stores: We've thought about this and have some work done on our fork in regards to this that we will eventually contribute back here. @kimoonkim has done some work on this. The short version of the situation we've had to work around is that the container that runs the Spark processes now has a different IP address from the HDFS node that might be colocated with it physically.
  • Rack awareness - @kimoonkim has also done work on this, and similar concerns to the point above have come up.

As a general note, locality is non-trivial in Kubernetes because no two pods will ever share the same IP address, and pods do not share the same IP address as the physical host that is running it. The Kubernetes code needs to be intelligent about knowing which pods are co-located on the same underlying Kubelet. And of course, it's reasonable to believe that the above four considerations are not exhaustive, but we'll address unforeseen factors as they come up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying @mccheah.

One clarification though:

IP addresses are strictly managed by the Kubernetes framework, so it's unlikely we're going to run into differences between Ipv4 and Ipv6 in different Kubernetes clusters. We should assume that one of these two address types are being used across all clusters and work with that

When I referred to IPv6 support, I was not referring to interop between IPv4 and IPv6 (that is a different can of worms !). What I wanted to clarify was that IPv6 is not supported as IP (supported via hostnames though).
If all we are getting are IPv4 ip's, I do not see any obvious issues.

The Kubernetes code needs to be intelligent about knowing which pods are co-located on the same underlying Kubelet

This would be critical to get HOST locality level working properly. The performance implications of not getting this right would be non trivial.

Looking forward to how this has been addressed !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was doing another pass over this thread. I think Matt covered this mostly. IPv6 support is being added in kubernetes/enhancements#508 but will be in alpha for now. We should target testing with that once that goes beta (probably Kubernetes 1.10 timeframe). As for routing, that's entirely handled by k8s network plugins and we shouldn't have to do anything special to take advantage of it.

Conceptually, the notion of needing a network identity like DNS is separated from containers/pods themselves by design. We could if necessary, create headless services if there's a strong need for a DNS name for each executor, but that would be a heavyweight approach that I would prefer we didn't unless we absolutely had to.

Lastly, +1 on the locality comment. I do think that's very important and there's ways we can use the k8s control plane to figure out a global view - different executors and their corresponding physical (or virtual nodes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks for the details.

) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
Expand Down Expand Up @@ -220,7 +215,3 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.build()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is logging handled ? Aggregation of stdout/stderr/other custom log files configured via log4j.
If they are to be specified via driver/executor pod config (I assume ?) - we need a way to introduce user specified kv while creating it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logs are written to stdout/stderr so they are handled by the Kubernetes logging system. See https://kubernetes.io/docs/concepts/cluster-administration/logging/#logging-at-the-node-level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a limitation of what can be supported in current integration ? Or a general restriction of kubernetes itself ?
For example, in yarn integration, files written to spark.yarn.app.container.log.dir get log aggregated - not specific file names. (Actually it does not matter what contents of the files are - it can even be arbitrary binaries iirc)

This allows us to have multiple loggers in log4j based on user customization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's somewhat a limitation of the current integration. Allowing users to use customized log4j configuration needs the ability to inject user-specified configuration files like log4j.properties through ConfigMaps into the driver and executor pods. This is not yet supported but will likely be supported in the near future. On the other hand, it is recommended in Kubernetes to log to stdout/stderr so logs are handled by kubelets and can be retrieved using kubectl logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, that sounds fine - do we have a docs/running-on-kubernetes.md ?
Would be good idea to list these design choices, restrictions and suggestions there to model user expectations.
For reference, there is a docs/running-on-yarn.md and docs/running-on-mesos.md.

Ofcourse, this does not need to be part of this PR ! Just something to add in a followup PR before 2.3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private object ExecutorPodFactoryImpl {
private val DEFAULT_STATIC_PORT = 10000
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def stop(): Unit = {
// stop allocation of new resources and caches.
allocatorExecutor.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocatorExecutor.awaitTermination ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS). Ultimately I think we should use https://google.github.io/guava/releases/18.0/api/docs/com/google/common/util/concurrent/MoreExecutors.html#shutdownAndAwaitTermination(java.util.concurrent.ExecutorService, long, java.util.concurrent.TimeUnit). But the version of Guava used by Spark is too old and does not have that method.

allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)

// send stop message to executors so they shut down cleanly
super.stop()
Expand Down