Skip to content

Commit

Permalink
Clear SPARK_WORKER_INSTANCES when using YARN
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaram committed May 27, 2015
1 parent 8f20824 commit 0593d1b
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def parse_args():
"(default: %default).")
parser.add_option(
"--hadoop-major-version", default="1",
help="Major version of Hadoop (default: %default)")
help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " +
"(Hadoop 2.4.0) (default: %default)")
parser.add_option(
"-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
Expand Down Expand Up @@ -271,7 +272,8 @@ def parse_args():
help="Launch fresh slaves, but use an existing stopped master if possible")
parser.add_option(
"--worker-instances", type="int", default=1,
help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: %default)")
help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " +
"is used as Hadoop major version (default: %default)")
parser.add_option(
"--master-opts", type="string", default="",
help="Extra options to give to master through SPARK_MASTER_OPTS variable " +
Expand Down Expand Up @@ -761,6 +763,11 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
if opts.ganglia:
modules.append('ganglia')

# Clear SPARK_WORKER_INSTANCES if running on YARN
if opts.hadoop_major_version == "yarn":
opts.worker_instances = ""


# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
Expand Down Expand Up @@ -998,6 +1005,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):

master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else ""
template_vars = {
"master_list": '\n'.join(master_addresses),
"active_master": active_master,
Expand All @@ -1011,7 +1019,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
"spark_version": spark_v,
"tachyon_version": tachyon_v,
"hadoop_major_version": opts.hadoop_major_version,
"spark_worker_instances": "%d" % opts.worker_instances,
"spark_worker_instances": worker_instances_str,
"spark_master_opts": opts.master_opts
}

Expand Down

0 comments on commit 0593d1b

Please sign in to comment.