diff --git a/pyzoo/zoo/common/nncontext.py b/pyzoo/zoo/common/nncontext.py index d66bfc13861..7bbfb7dacbe 100644 --- a/pyzoo/zoo/common/nncontext.py +++ b/pyzoo/zoo/common/nncontext.py @@ -122,7 +122,7 @@ def init_spark_standalone(num_executors, executor_memory="10g", driver_memory="1g", driver_cores=4, - spark_master=None, + master=None, extra_executor_memory_for_ray=None, extra_python_lib=None, spark_log_level="WARN", @@ -142,8 +142,8 @@ def init_spark_standalone(num_executors, :param executor_memory: The memory for each executor. Default to be '2g'. :param driver_cores: The number of cores for the Spark driver. Default to be 4. :param driver_memory: The memory for the Spark driver. Default to be '1g'. - :param spark_master: The master URL of an existing Spark standalone cluster starting with - 'spark://'. You only need to specify this if you have already started a standalone cluster. + :param master: The master URL of an existing Spark standalone cluster: 'spark://master:port'. + You only need to specify this if you have already started a standalone cluster. Default to be None and a new standalone cluster would be started in this case. :param extra_executor_memory_for_ray: The extra memory for Ray services. Default to be None. :param extra_python_lib: Extra python files or packages needed for distribution. @@ -167,7 +167,7 @@ def init_spark_standalone(num_executors, executor_memory=executor_memory, driver_memory=driver_memory, driver_cores=driver_cores, - spark_master=spark_master, + master=master, extra_executor_memory_for_ray=extra_executor_memory_for_ray, extra_python_lib=extra_python_lib, conf=conf, diff --git a/pyzoo/zoo/util/spark.py b/pyzoo/zoo/util/spark.py index d84714eb24c..1792c72028a 100644 --- a/pyzoo/zoo/util/spark.py +++ b/pyzoo/zoo/util/spark.py @@ -235,7 +235,7 @@ def init_spark_standalone(self, executor_memory="10g", driver_memory="1g", driver_cores=4, - spark_master=None, + master=None, extra_executor_memory_for_ray=None, extra_python_lib=None, conf=None, @@ -248,7 +248,7 @@ def init_spark_standalone(self, if 'PYSPARK_PYTHON' not in os.environ: os.environ["PYSPARK_PYTHON"] = self._detect_python_location() - if not spark_master: + if not master: pyspark_home = os.path.abspath(pyspark.__file__ + "/../") zoo_standalone_home = os.path.abspath(__file__ + "/../../share/bin/standalone") node_ip = get_node_ip() @@ -266,18 +266,19 @@ def init_spark_standalone(self, "{}/sbin/start-master.sh".format(zoo_standalone_home), shell=True, env=SparkRunner.standalone_env) os.waitpid(start_master_pro.pid, 0) - spark_master = "spark://{}:7077".format(node_ip) # 7077 is the default port + master = "spark://{}:7077".format(node_ip) # 7077 is the default port # Start worker start_worker_pro = subprocess.Popen( - "{}/sbin/start-worker.sh {}".format(zoo_standalone_home, spark_master), + "{}/sbin/start-worker.sh {}".format(zoo_standalone_home, master), shell=True, env=SparkRunner.standalone_env) os.waitpid(start_worker_pro.pid, 0) else: # A Spark standalone cluster has already been started by the user. - assert spark_master.startswith("spark://"), \ - "Please input a valid master address for Spark standalone: spark://master:port" + assert master.startswith("spark://"), \ + "Please input a valid master address for your Spark standalone cluster: " \ + "spark://master:port" # Start pyspark-shell - submit_args = " --master " + spark_master + submit_args = " --master " + master submit_args = submit_args + " --driver-cores {} --driver-memory {} --num-executors {}" \ " --executor-cores {} --executor-memory {}"\ .format(driver_cores, driver_memory, num_executors, executor_cores, executor_memory) @@ -318,6 +319,8 @@ def stop_spark_standalone(): import pyspark pyspark_home = os.path.abspath(pyspark.__file__ + "/../") zoo_standalone_home = os.path.abspath(__file__ + "/../../share/bin/standalone") + pro = subprocess.Popen(["chmod", "-R", "+x", "{}/sbin".format(zoo_standalone_home)]) + os.waitpid(pro.pid, 0) env = {"SPARK_HOME": pyspark_home, "ZOO_STANDALONE_HOME": zoo_standalone_home} stop_worker_pro = subprocess.Popen(