Skip to content

Commit

Permalink
[SPARK-27061][K8S] Expose Driver UI port on driver service to access …
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Expose Spark UI port on driver service to access logs from service.

## How was this patch tested?

The patch was tested using unit tests being contributed as a part of the PR

Closes #23990 from chandulal/SPARK-27061.

Authored-by: chandulal.kavar <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
chandulal.kavar authored and Marcelo Vanzin committed Mar 11, 2019
1 parent 31878c9 commit d4542a8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[spark] class DriverServiceFeatureStep(
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
private val driverUIPort = kubernetesConf.get(config.UI.UI_PORT)

override def configurePod(pod: SparkPod): SparkPod = pod

Expand Down Expand Up @@ -82,6 +83,11 @@ private[spark] class DriverServiceFeatureStep(
.withPort(driverBlockManagerPort)
.withNewTargetPort(driverBlockManagerPort)
.endPort()
.addNewPort()
.withName(UI_PORT_NAME)
.withPort(driverUIPort)
.withNewTargetPort(driverUIPort)
.endPort()
.endSpec()
.build()
Seq(driverService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.ManualClock

class DriverServiceFeatureStepSuite extends SparkFunSuite {
Expand All @@ -38,10 +39,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
"label1key" -> "label1value",
"label2key" -> "label2value")

test("Headless service has a port for the driver RPC and the block manager.") {
test("Headless service has a port for the driver RPC, the block manager and driver ui.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(UI_PORT, 4080)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = DRIVER_LABELS)
Expand All @@ -56,6 +58,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
verifyService(
9000,
8080,
4080,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
driverService)
}
Expand Down Expand Up @@ -85,6 +88,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
verifyService(
DEFAULT_DRIVER_PORT,
DEFAULT_BLOCKMANAGER_PORT,
UI_PORT.defaultValue.get,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
resolvedService)
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
Expand Down Expand Up @@ -152,20 +156,24 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
private def verifyService(
driverPort: Int,
blockManagerPort: Int,
drierUIPort: Int,
expectedServiceName: String,
service: Service): Unit = {
assert(service.getMetadata.getName === expectedServiceName)
assert(service.getSpec.getClusterIP === "None")
DRIVER_LABELS.foreach { case (k, v) =>
assert(service.getSpec.getSelector.get(k) === v)
}
assert(service.getSpec.getPorts.size() === 2)
assert(service.getSpec.getPorts.size() === 3)
val driverServicePorts = service.getSpec.getPorts.asScala
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
assert(driverServicePorts.head.getPort.intValue() === driverPort)
assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
assert(driverServicePorts(2).getName === UI_PORT_NAME)
assert(driverServicePorts(2).getPort.intValue() === drierUIPort)
assert(driverServicePorts(2).getTargetPort.getIntVal === drierUIPort)
}
}

0 comments on commit d4542a8

Please sign in to comment.