Skip to content

Commit

Permalink
Change to use Optional for ExecutorResourceRequest instead of ""
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Feb 3, 2020
1 parent 5435640 commit 5449cda
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.resource

import java.util.Optional

/**
* An Executor resource request. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
Expand Down Expand Up @@ -53,8 +55,8 @@ package org.apache.spark.resource
private[spark] class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
val vendor: String = "") extends Serializable {
val discoveryScript: Optional[String] = Optional.empty(),
val vendor: Optional[String] = Optional.empty()) extends Serializable {

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.resource

import java.util.Optional
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -94,6 +95,7 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
* like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
* that the cluster manager doesn't support the result is undefined, it may error or may just
* be ignored.
* (Java friendly) using Optional instead of scala Option
*
* @param resourceName Name of the resource.
* @param amount amount of that resource per executor to use.
Expand All @@ -106,10 +108,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
def resource(
resourceName: String,
amount: Long,
discoveryScript: String = "",
vendor: String = ""): this.type = {
// a bit weird but for Java api use empty string as meaning None because empty
// string is otherwise invalid for those parameters anyway
discoveryScript: Optional[String] = Optional.empty(),
vendor: Optional[String] = Optional.empty()): this.type = {
val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
_executorResources.put(resourceName, req)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ object ResourceProfile extends Logging {
val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
execReq.foreach { req =>
val name = req.id.resourceName
ereqs.resource(name, req.amount, req.discoveryScript.orElse(""),
req.vendor.orElse(""))
ereqs.resource(name, req.amount, req.discoveryScript,
req.vendor)
}
ereqs.requests
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,11 @@ private[spark] object ResourceUtils extends Logging {
val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) }
val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
val resourceId = new ResourceID(componentName, rName)
val scriptOpt = emptyStringToOptional(execRequest.discoveryScript)
val vendorOpt = emptyStringToOptional(execRequest.vendor)
val resourceReq = new ResourceRequest(resourceId, execRequest.amount, scriptOpt, vendorOpt)
val resourceReq = new ResourceRequest(
resourceId,
execRequest.amount,
execRequest.discoveryScript,
execRequest.vendor)
val addrs = discoverResource(sparkConf, resourceReq).addresses
(rName, new ResourceInformation(rName, addrs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.resource;

import java.util.Map;
import java.util.Optional;

import static org.junit.Assert.*;
import org.junit.Test;
Expand All @@ -30,10 +31,10 @@ public class JavaResourceProfileSuite {

@Test
public void testResourceProfileAccessFromJava() throws Exception {
ExecutorResourceRequests execReqGpu =
new ExecutorResourceRequests().resource(GpuResource, 2,"myscript", "");
ExecutorResourceRequests execReqFpga =
new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia");
ExecutorResourceRequests execReqGpu = new ExecutorResourceRequests()
.resource(GpuResource, 2,Optional.of("myscript"), Optional.empty());
ExecutorResourceRequests execReqFpga = new ExecutorResourceRequests()
.resource(FPGAResource, 3, Optional.of("myfpgascript"), Optional.of("nvidia"));

ResourceProfileBuilder rprof = new ResourceProfileBuilder();
rprof.require(execReqGpu);
Expand All @@ -46,14 +47,14 @@ public void testResourceProfileAccessFromJava() throws Exception {
assert(eresources.containsKey(GpuResource));
ExecutorResourceRequest gpuReq = eresources.get(GpuResource);
assertEquals(gpuReq.amount(), 2);
assertEquals(gpuReq.discoveryScript(), "myscript");
assertEquals(gpuReq.vendor(), "");
assertEquals(gpuReq.discoveryScript().get(), "myscript");
assertEquals(gpuReq.vendor(), Optional.empty());

assert(eresources.containsKey(FPGAResource));
ExecutorResourceRequest fpgaReq = eresources.get(FPGAResource);
assertEquals(fpgaReq.amount(), 3);
assertEquals(fpgaReq.discoveryScript(), "myfpgascript");
assertEquals(fpgaReq.vendor(), "nvidia");
assertEquals(fpgaReq.discoveryScript().get(), "myfpgascript");
assertEquals(fpgaReq.vendor().get(), "nvidia");

assertEquals(rprof.taskResources().size(), 1);
Map<String, TaskResourceRequest> tresources = rprof.taskResourcesJMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.executor
import java.io.File
import java.net.URL
import java.nio.ByteBuffer
import java.util.Optional
import java.util.Properties

import scala.collection.mutable
Expand Down Expand Up @@ -239,7 +240,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
"""{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
val rpBuilder = new ResourceProfileBuilder
val ereqs = new ExecutorResourceRequests().resource(FPGA, 3, scriptPath)
val ereqs = new ExecutorResourceRequests().resource(FPGA, 3, Optional.of(scriptPath))
ereqs.resource(GPU, 2)
val rp = rpBuilder.require(ereqs).build
allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, new SparkConf, rp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.resource

import java.util.Optional

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
Expand Down Expand Up @@ -58,7 +60,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript")
new ExecutorResourceRequests().resource("gpu", 2, Optional.of("someScript"))
val immrprof = rprof.require(gpuExecReq).build
val error = intercept[SparkException] {
rpmanager.isSupported(immrprof)
Expand All @@ -76,7 +78,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript")
new ExecutorResourceRequests().resource("gpu", 2, Optional.of("someScript"))
val immrprof = rprof.require(gpuExecReq).build
assert(rpmanager.isSupported(immrprof) == true)
}
Expand All @@ -89,7 +91,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript")
new ExecutorResourceRequests().resource("gpu", 2, Optional.of("someScript"))
val immrprof = rprof.require(gpuExecReq).build
var error = intercept[SparkException] {
rpmanager.isSupported(immrprof)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.resource

import java.util.Optional

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
Expand Down Expand Up @@ -107,7 +109,8 @@ class ResourceProfileSuite extends SparkFunSuite {
val rprof = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
new ExecutorResourceRequests().resource("gpu", 2,
Optional.of("myscript"), Optional.of("nvidia"))
rprof.require(taskReq).require(execReq)
val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
assert(immrprof.limitingResource(sparkConf) == "cpus")
Expand All @@ -119,7 +122,8 @@ class ResourceProfileSuite extends SparkFunSuite {
val rprof = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
new ExecutorResourceRequests().resource("gpu", 2,
Optional.of("myscript"), Optional.of("nvidia"))
rprof.require(taskReq).require(execReq)
val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
assert(immrprof.limitingResource(sparkConf) == "gpu")
Expand All @@ -140,7 +144,8 @@ class ResourceProfileSuite extends SparkFunSuite {
val rprof = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
new ExecutorResourceRequests().resource("gpu", 2,
Optional.of("myscript"), Optional.of("nvidia"))
rprof.require(taskReq).require(execReq)
val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources)
assert(immrprof.limitingResource(sparkConf) == ResourceProfile.CPUS)
Expand All @@ -152,15 +157,16 @@ class ResourceProfileSuite extends SparkFunSuite {
test("Create ResourceProfile") {
val rprof = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
val eReq = new ExecutorResourceRequests().resource("gpu", 2,
Optional.of("myscript"), Optional.of("nvidia"))
rprof.require(taskReq).require(eReq)

assert(rprof.executorResources.size === 1)
assert(rprof.executorResources.contains("gpu"),
"Executor resources should have gpu")
assert(rprof.executorResources.get("gpu").get.vendor === "nvidia",
assert(rprof.executorResources.get("gpu").get.vendor.get() === "nvidia",
"gpu vendor should be nvidia")
assert(rprof.executorResources.get("gpu").get.discoveryScript === "myscript",
assert(rprof.executorResources.get("gpu").get.discoveryScript.get() === "myscript",
"discoveryScript should be myscript")
assert(rprof.executorResources.get("gpu").get.amount === 2,
"gpu amount should be 2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ResourceUtilsSuite extends SparkFunSuite
dir, "gpuDiscoveryScript",
"""{"name": "gpu", "addresses": ["0", "1"]}""")
val rpBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests().resource(GPU, 2, gpuDiscovery)
val ereqs = new ExecutorResourceRequests().resource(GPU, 2, Optional.of(gpuDiscovery))
val treqs = new TaskResourceRequests().resource(GPU, 1)

val rp = rpBuilder.require(ereqs).require(treqs).build
Expand Down

0 comments on commit 5449cda

Please sign in to comment.