Skip to content

Commit

Permalink
[SPARK-529] allow for application args to have arguments withou equal…
Browse files Browse the repository at this point in the history
…s sign, test (apache#182)

* allow for application args to have arguments withou equals sign, test

* change setup and teardown back, oops
  • Loading branch information
Arthur Rand authored Sep 12, 2017
1 parent 5e22577 commit bab1e12
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 19 deletions.
14 changes: 13 additions & 1 deletion cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,24 @@ ARGLOOP:
// flush the rest and exit.
for i < len(args) {
arg = args[i]
// if we have a --flag going to the application we need to take the arg (flag) and the value ONLY
// if it's not of the format --flag=val which scopt allows
if strings.HasPrefix(arg, "-") {
appFlags = append(appFlags, arg)
if strings.Contains(arg, "=") || (i + 1) >= len(args) {
i += 1
} else {
// if there's a value with this flag, add it
if !strings.HasPrefix(args[i + 1], "-") {
appFlags = append(appFlags, args[i + 1])
i += 1
}
i += 1
}
} else {
argsEquals = append(argsEquals, arg)
i += 1
}
i += 1
}
break
}
Expand Down
9 changes: 4 additions & 5 deletions docs/run-job.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ enterprise: 'no'
to a location visible to the cluster (e.g., HTTP, S3, or HDFS). [Learn more][13].

1. Run the job.
Include all configuration flags before the jar url and the args for your spark job after the jar url. Generally following the template `dcos spark run --submit-args="<flags> URL [args]` where `<flags>` can be things like `--conf spark.cores.max=16` and `--class my.aprk.App`, `URL` is the location of the application, and `[args]` are any arguments for the application.

dcos spark run --submit-args=--class MySampleClass http://external.website/mysparkapp.jar"

dcos spark run --submit-args=`--class MySampleClass http://external.website/mysparkapp.jar 30`

dcos spark run --submit-args="--py-files mydependency.py http://external.website/mysparkapp.py 30"
dcos spark run --submit-args="--py-files mydependency.py http://external.website/mysparkapp.py"

dcos spark run --submit-args="http://external.website/mysparkapp.R"

You can submit arbitrary pass-through options to this script via the `--submit-args` options.

If your job runs successfully, you will get a message with the job’s submission ID:

Run job succeeded. Submission id: driver-20160126183319-0001
Expand Down
3 changes: 2 additions & 1 deletion tests/jobs/scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ lazy val root = (project in file("."))
scalaVersion := "2.11.8",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "2.6.0"
"org.apache.hadoop" % "hadoop-aws" % "2.6.0",
"com.github.scopt" %% "scopt" % "3.7.0"
)
)

Expand Down
40 changes: 30 additions & 10 deletions tests/jobs/scala/src/main/scala/S3Job.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,42 @@
import scopt.OptionParser
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object S3Job {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("S3 Test")
val sc = new SparkContext(conf)
object config {
var readurl: String = null
var writeurl: String = null
var countonly: Boolean = false
}

val readURL = args(0)
val writeURL = args(1)
println(s"Reading from ${readURL}. Writing to ${writeURL}.")
val parser = new OptionParser[Unit]("S3 Job") {
opt[String]("readUrl").action((x, _) => config.readurl = x)
opt[String]("writeUrl").action((x, _) => config.writeurl = x)
opt[Unit]("countOnly").action((_, _) => config.countonly = true)
}

val textRDD = sc.textFile(readURL)
println(s"Read ${textRDD.count()} lines from${readURL}.")
if (parser.parse(args)) {
println("RUNNING S3 JOB")
val conf = new SparkConf().setAppName("S3 Test")
val sc = new SparkContext(conf)

textRDD.map(_.length).saveAsTextFile(writeURL)
println(s"Wrote ${textRDD.count()} lines to ${writeURL}.")
val readURL = config.readurl
val writeURL = config.writeurl

sc.stop()
println(s"Reading from ${readURL}. Writing to ${writeURL}.")

val textRDD = sc.textFile(readURL)
println(s"Read ${textRDD.count()} lines from ${readURL}.")

textRDD.map(_.length).saveAsTextFile(writeURL)
println(s"Wrote ${textRDD.count()} lines to ${writeURL}.")

sc.stop()
} else {
println("Error bad arguments")
System.exit(1)
}
}
}
35 changes: 33 additions & 2 deletions tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,12 @@ def _check_task_network_info(task):


@pytest.mark.sanity
@pytest.mark.runnow
def test_s3():
linecount_path = os.path.join(THIS_DIR, 'resources', 'linecount.txt')
s3.upload_file(linecount_path)

app_args = "{} {}".format(
app_args = "--readUrl {} --writeUrl {}".format(
s3.s3n_url('linecount.txt'),
s3.s3n_url("linecount-out"))

Expand All @@ -248,12 +249,42 @@ def test_s3():
"--class", "S3Job"]
utils.run_tests(app_url=_scala_test_jar_url(),
app_args=app_args,
expected_output="",
expected_output="Read 3 lines",
app_name="/spark",
args=args)

assert len(list(s3.list("linecount-out"))) > 0

app_args = "--readUrl {} --countOnly".format(s3.s3n_url('linecount.txt'))

args = ["--conf",
"spark.mesos.driverEnv.AWS_ACCESS_KEY_ID={}".format(
os.environ["AWS_ACCESS_KEY_ID"]),
"--conf",
"spark.mesos.driverEnv.AWS_SECRET_ACCESS_KEY={}".format(
os.environ["AWS_SECRET_ACCESS_KEY"]),
"--class", "S3Job"]
utils.run_tests(app_url=_scala_test_jar_url(),
app_args=app_args,
expected_output="Read 3 lines",
app_name="/spark",
args=args)

app_args = "--countOnly --readUrl {}".format(s3.s3n_url('linecount.txt'))

args = ["--conf",
"spark.mesos.driverEnv.AWS_ACCESS_KEY_ID={}".format(
os.environ["AWS_ACCESS_KEY_ID"]),
"--conf",
"spark.mesos.driverEnv.AWS_SECRET_ACCESS_KEY={}".format(
os.environ["AWS_SECRET_ACCESS_KEY"]),
"--class", "S3Job"]
utils.run_tests(app_url=_scala_test_jar_url(),
app_args=app_args,
expected_output="Read 3 lines",
app_name="/spark",
args=args)


# Skip DC/OS < 1.10, because it doesn't have adminrouter support for service groups.
@pytest.mark.skipif('shakedown.dcos_version_less_than("1.10")')
Expand Down

0 comments on commit bab1e12

Please sign in to comment.