diff --git a/python/orca/src/bigdl/orca/ray/__init__.py b/python/orca/src/bigdl/orca/ray/__init__.py new file mode 100644 index 00000000000..5976dc4df02 --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/orca/src/bigdl/orca/ray/util/__init__.py b/python/orca/src/bigdl/orca/ray/util/__init__.py new file mode 100755 index 00000000000..81a5766f40c --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/util/__init__.py @@ -0,0 +1,46 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal + +from zoo.ray.util.utils import to_list +import ray.services as rservices + + +def gen_shutdown_per_node(pgids, node_ips=None): + pgids = to_list(pgids) + + def _shutdown_per_node(iter): + print("Stopping pgids: {}".format(pgids)) + if node_ips: + current_node_ip = rservices.get_node_ip_address() + effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip] + else: + effect_pgids = pgids + for pgid in pgids: + print("Stopping by pgid {}".format(pgid)) + try: + os.killpg(pgid, signal.SIGTERM) + except Exception: + print("WARNING: cannot kill pgid: {}".format(pgid)) + + return _shutdown_per_node + + +def is_local(sc): + master = sc._conf.get("spark.master") + return master == "local" or master.startswith("local[") diff --git a/python/orca/src/bigdl/orca/ray/util/process.py b/python/orca/src/bigdl/orca/ray/util/process.py new file mode 100644 index 00000000000..a911998e51a --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/util/process.py @@ -0,0 +1,147 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import subprocess +import signal +import atexit +import sys +import psutil + +from zoo.ray.util import gen_shutdown_per_node, is_local + + +class ProcessInfo(object): + def __init__(self, out, err, errorcode, pgid, tag="default", pids=None, node_ip=None): + self.out = str(out.strip()) + self.err = str(err.strip()) + self.pgid = pgid + self.pids = pids + self.errorcode = errorcode + self.tag = tag + self.master_addr = None + self.node_ip = node_ip + + def __str__(self): + return "node_ip: {} tag: {}, pgid: {}, pids: {}, returncode: {}, \ + master_addr: {}, \n {} {}".format(self.node_ip, self.tag, self.pgid, + self.pids, + self.errorcode, + self.master_addr, + self.out, + self.err) + + +def pids_from_gpid(gpid): + processes = psutil.process_iter() + result = [] + for proc in processes: + try: + if os.getpgid(proc.pid) == gpid: + result.append(proc.pid) + except Exception: + pass + return result + + +def session_execute(command, env=None, tag=None, fail_fast=False, timeout=120): + pro = subprocess.Popen( + command, + shell=True, + env=env, + cwd=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid) + pgid = os.getpgid(pro.pid) + out, err = pro.communicate(timeout=timeout) + out = out.decode("utf-8") + err = err.decode("utf-8") + print(out) + print(err) + errorcode = pro.returncode + if errorcode != 0: + if fail_fast: + raise Exception(err) + print(err) + else: + print(out) + return ProcessInfo(out=out, + err=err, + errorcode=pro.returncode, + pgid=pgid, + pids=pids_from_gpid(pgid), + tag=tag) + + +class ProcessMonitor: + def __init__(self, process_infos, sc, ray_rdd, verbose=False): + self.sc = sc + self.verbose = verbose + self.ray_rdd = ray_rdd + self.master = [] + self.slaves = [] + self.pgids = [] + self.node_ips = [] + self.process_infos = process_infos + for process_info in process_infos: + self.pgids.append(process_info.pgid) + self.node_ips.append(process_info.node_ip) + if process_info.master_addr: + self.master.append(process_info) + else: + self.slaves.append(process_info) + ProcessMonitor.register_shutdown_hook(extra_close_fn=self.clean_fn) + assert len(self.master) == 1, \ + "We should got 1 master only, but we got {}".format(len(self.master)) + self.master = self.master[0] + if not is_local(self.sc): + self.print_ray_remote_err_out() + + def print_ray_remote_err_out(self): + if self.master.errorcode != 0: + raise Exception(str(self.master)) + for slave in self.slaves: + if slave.errorcode != 0: + raise Exception(str(slave)) + if self.verbose: + print(self.master) + for slave in self.slaves: + print(slave) + + def clean_fn(self): + import ray + ray.shutdown() + if not is_local(self.sc): + self.ray_rdd.map(gen_shutdown_per_node(self.pgids, self.node_ips)).collect() + else: + gen_shutdown_per_node(self.pgids, self.node_ips)([]) + + @staticmethod + def register_shutdown_hook(pgid=None, extra_close_fn=None): + def _shutdown(): + if pgid: + gen_shutdown_per_node(pgid)(0) + if extra_close_fn: + extra_close_fn() + + def _signal_shutdown(_signo, _stack_frame): + _shutdown() + sys.exit(0) + + atexit.register(_shutdown) + signal.signal(signal.SIGTERM, _signal_shutdown) + signal.signal(signal.SIGINT, _signal_shutdown) diff --git a/python/orca/src/bigdl/orca/ray/util/raycontext.py b/python/orca/src/bigdl/orca/ray/util/raycontext.py new file mode 100755 index 00000000000..890deb72259 --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/util/raycontext.py @@ -0,0 +1,348 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import re +import time +import signal +import random +import multiprocessing + +from pyspark import BarrierTaskContext + +from zoo.ray.util import is_local +from zoo.ray.util.process import session_execute, ProcessMonitor +from zoo.ray.util.utils import resourceToBytes +import ray.services as rservices + + +class JVMGuard(): + """ + The registered pids would be put into the killing list of Spark Executor. + """ + + @staticmethod + def registerPids(pids): + import traceback + try: + from bigdl.util.common import callBigDlFunc + import zoo + callBigDlFunc("float", + "jvmGuardRegisterPids", + pids) + except Exception as err: + print(traceback.format_exc()) + print("Cannot sucessfully register pid into JVMGuard") + for pid in pids: + os.kill(pid, signal.SIGKILL) + raise err + + +class RayServiceFuncGenerator(object): + """ + This should be a pickable class. + """ + + def _get_MKL_config(self, cores): + return {"intra_op_parallelism_threads": str(cores), + "inter_op_parallelism_threads": str(cores), + "OMP_NUM_THREADS": str(cores), + "KMP_BLOCKTIME": "0", + "KMP_AFFINITY": "granularity = fine, verbose, compact, 1, 0", + "KMP_SETTINGS": "0"} + + def _prepare_env(self, cores=None): + modified_env = os.environ.copy() + if self.env: + modified_env.update(self.env) + cwd = os.getcwd() + modified_env["PATH"] = "{}/{}:{}".format(cwd, "/".join(self.python_loc.split("/")[:-1]), + os.environ["PATH"]) + modified_env.pop("MALLOC_ARENA_MAX", None) + modified_env.pop("RAY_BACKEND_LOG_LEVEL", None) + # unset all MKL setting + modified_env.pop("intra_op_parallelism_threads", None) + modified_env.pop("inter_op_parallelism_threads", None) + modified_env.pop("OMP_NUM_THREADS", None) + modified_env.pop("KMP_BLOCKTIME", None) + modified_env.pop("KMP_AFFINITY", None) + modified_env.pop("KMP_SETTINGS", None) + if cores: + cores = int(cores) + print("MKL cores is {}".format(cores)) + modified_env.update(self._get_MKL_config(cores)) + if self.verbose: + print("Executing with these environment setting:") + for pair in modified_env.items(): + print(pair) + print("The $PATH is: {}".format(modified_env["PATH"])) + return modified_env + + def __init__(self, python_loc, redis_port, ray_node_cpu_cores, mkl_cores, + password, object_store_memory, waitting_time_sec=6, verbose=False, env=None): + """object_store_memory: integer in bytes""" + self.env = env + self.python_loc = python_loc + self.redis_port = redis_port + self.password = password + self.ray_node_cpu_cores = ray_node_cpu_cores + self.mkl_cores = mkl_cores + self.ray_exec = self._get_ray_exec() + self.object_store_memory = object_store_memory + self.waiting_time_sec = waitting_time_sec + self.verbose = verbose + self.labels = """--resources='{"trainer": %s, "ps": %s }' """ % (1, 1) + + def gen_stop(self): + def _stop(iter): + command = "{} stop".format(self.ray_exec) + print("Start to end the ray services: {}".format(command)) + session_execute(command=command, fail_fast=True) + return iter + + return _stop + + def _gen_master_command(self): + command = "{} start --head " \ + "--include-webui --redis-port {} \ + --redis-password {} --num-cpus {} ". \ + format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores) + if self.object_store_memory: + command = command + "--object-store-memory {} ".format(str(self.object_store_memory)) + return command + + def _get_raylet_command(self, redis_address): + command = "{} start --redis-address {} --redis-password {} --num-cpus {} {} ".format( + self.ray_exec, redis_address, self.password, self.ray_node_cpu_cores, self.labels) + + if self.object_store_memory: + command = command + "--object-store-memory {} ".format(str(self.object_store_memory)) + return command + + def _start_ray_node(self, command, tag, wait_before=5, wait_after=5): + modified_env = self._prepare_env(self.mkl_cores) + print("Starting {} by running: {}".format(tag, command)) + print("Wait for {} sec before launching {}".format(wait_before, tag)) + time.sleep(wait_before) + process_info = session_execute(command=command, env=modified_env, tag=tag) + JVMGuard.registerPids(process_info.pids) + process_info.node_ip = rservices.get_node_ip_address() + print("Wait for {} sec before return process info for {}".format(wait_after, tag)) + time.sleep(wait_after) + return process_info + + def _get_ray_exec(self): + python_bin_dir = "/".join(self.python_loc.split("/")[:-1]) + return "{}/python {}/ray".format(python_bin_dir, python_bin_dir) + + def gen_ray_start(self): + def _start_ray_services(iter): + tc = BarrierTaskContext.get() + # The address is sorted by partitionId according to the comments + # Partition 0 is the Master + task_addrs = [taskInfo.address for taskInfo in tc.getTaskInfos()] + print(task_addrs) + master_ip = task_addrs[0].split(":")[0] + print("current address {}".format(task_addrs[tc.partitionId()])) + print("master address {}".format(master_ip)) + redis_address = "{}:{}".format(master_ip, self.redis_port) + if tc.partitionId() == 0: + print("partition id is : {}".format(tc.partitionId())) + process_info = self._start_ray_node(command=self._gen_master_command(), + tag="ray-master", + wait_after=self.waiting_time_sec) + process_info.master_addr = redis_address + yield process_info + else: + print("partition id is : {}".format(tc.partitionId())) + process_info = self._start_ray_node( + command=self._get_raylet_command(redis_address=redis_address), + tag="raylet", + wait_before=self.waiting_time_sec) + yield process_info + tc.barrier() + + return _start_ray_services + + +class RayContext(object): + def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None, + verbose=False, env=None, local_ray_node_num=2, waitting_time_sec=8): + """ + The RayContext would init a ray cluster on top of the configuration of the SparkContext. + For spark cluster mode: The number of raylets is equal to number of executors. + For Spark local mode: The number of raylets is controlled by local_ray_node_num. + CPU cores for each raylet equals to spark_cores/local_ray_node_num. + :param sc: + :param redis_port: redis port for the "head" node. + The value would be randomly picked if not specified + :param local_ray_node_num number of raylets to be created. + :param object_store_memory: Memory size for the object_store. + :param env: The environment variable dict for running Ray. + """ + self.sc = sc + self.stopped = False + self.is_local = is_local(sc) + self.local_ray_node_num = local_ray_node_num + self.ray_node_cpu_cores = self._get_ray_node_cpu_cores() + self.num_ray_nodes = self._get_num_ray_nodes() + self.python_loc = os.environ['PYSPARK_PYTHON'] + self.ray_processesMonitor = None + self.verbose = verbose + self.redis_port = self._new_port() if not redis_port else redis_port + self.ray_service = RayServiceFuncGenerator( + python_loc=self.python_loc, + redis_port=self.redis_port, + ray_node_cpu_cores=self.ray_node_cpu_cores, + mkl_cores=self._get_mkl_cores(), + password=password, + object_store_memory=self._enrich_object_sotre_memory(sc, object_store_memory), + verbose=verbose, + env=env, + waitting_time_sec=waitting_time_sec) + self._gather_cluster_ips() + from bigdl.util.common import init_executor_gateway + print("Start to launch the JVM guarding process") + init_executor_gateway(sc) + print("JVM guarding process has been successfully launched") + + def _new_port(self): + return random.randint(10000, 65535) + + def _enrich_object_sotre_memory(self, sc, object_store_memory): + if is_local(sc): + assert not object_store_memory, "you should not set object_store_memory on spark local" + return resourceToBytes(self._get_ray_plasma_memory_local()) + else: + return resourceToBytes( + str(object_store_memory)) if object_store_memory else None + + def _gather_cluster_ips(self): + total_cores = int(self._get_num_ray_nodes()) * int(self._get_ray_node_cpu_cores()) + + def info_fn(iter): + tc = BarrierTaskContext.get() + task_addrs = [taskInfo.address.split(":")[0] for taskInfo in tc.getTaskInfos()] + yield task_addrs + tc.barrier() + + ips = self.sc.range(0, total_cores, + numSlices=total_cores).barrier().mapPartitions(info_fn).collect() + return ips[0] + + def stop(self): + if self.stopped: + print("This instance has been stopped.") + return + import ray + ray.shutdown() + if not self.ray_processesMonitor: + print("Please start the runner first before closing it") + else: + self.ray_processesMonitor.clean_fn() + self.stopped = True + + def purge(self): + """ + Invoke ray stop to clean ray processes + """ + if self.stopped: + print("This instance has been stopped.") + return + self.sc.range(0, + self.num_ray_nodes, + numSlices=self.num_ray_nodes).barrier().mapPartitions( + self.ray_service.gen_stop()).collect() + self.stopped = True + + def _get_mkl_cores(self): + if "local" in self.sc.master: + return 1 + else: + return int(self.sc._conf.get("spark.executor.cores")) + + def _get_ray_node_cpu_cores(self): + if "local" in self.sc.master: + + assert self._get_spark_local_cores() % self.local_ray_node_num == 0, \ + "Spark cores number: {} should be divided by local_ray_node_num {} ".format( + self._get_spark_local_cores(), self.local_ray_node_num) + return int(self._get_spark_local_cores() / self.local_ray_node_num) + else: + return self.sc._conf.get("spark.executor.cores") + + def _get_ray_driver_memory(self): + """ + :return: memory in bytes + """ + if "local" in self.sc.master: + from psutil import virtual_memory + # Memory in bytes + total_mem = virtual_memory().total + return int(total_mem) + else: + return resourceToBytes(self.sc._conf.get("spark.driver.memory")) + + def _get_ray_plasma_memory_local(self): + return "{}b".format(int(self._get_ray_driver_memory() / self._get_num_ray_nodes() * 0.4)) + + def _get_spark_local_cores(self): + local_symbol = re.match(r"local\[(.*)\]", self.sc.master).group(1) + if local_symbol == "*": + return multiprocessing.cpu_count() + else: + return int(local_symbol) + + def _get_num_ray_nodes(self): + if "local" in self.sc.master: + return int(self.local_ray_node_num) + else: + return int(self.sc._conf.get("spark.executor.instances")) + + def init(self): + self._start_cluster() + self._start_driver(object_store_memory=self._get_ray_plasma_memory_local()) + + def _start_cluster(self): + print("Start to launch ray on cluster") + ray_rdd = self.sc.range(0, self.num_ray_nodes, + numSlices=self.num_ray_nodes) + process_infos = ray_rdd.barrier().mapPartitions( + self.ray_service.gen_ray_start()).collect() + + self.ray_processesMonitor = ProcessMonitor(process_infos, self.sc, ray_rdd, + verbose=self.verbose) + self.redis_address = self.ray_processesMonitor.master.master_addr + return self + + def _start_restricted_worker(self, redis_address, redis_password, object_store_memory): + num_cores = 0 + command = "ray start --redis-address {} " \ + "--redis-password {} --num-cpus {} --object-store-memory {}".\ + format(redis_address, redis_password, num_cores, object_store_memory) + print("Executing command: {}".format(command)) + process_info = session_execute(command=command, fail_fast=True) + ProcessMonitor.register_shutdown_hook(pgid=process_info.pgid) + + def _start_driver(self, object_store_memory="10g"): + print("Start to launch ray on local") + import ray + if not self.is_local: + self._start_restricted_worker(self.redis_address, self.ray_service.password, + object_store_memory=resourceToBytes(object_store_memory)) + ray.shutdown() + ray.init(redis_address=self.redis_address, + redis_password=self.ray_service.password) diff --git a/python/orca/src/bigdl/orca/ray/util/spark.py b/python/orca/src/bigdl/orca/ray/util/spark.py new file mode 100644 index 00000000000..b0c58c84aef --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/util/spark.py @@ -0,0 +1,176 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +from pyspark import SparkContext + +from zoo.common.nncontext import get_analytics_zoo_conf, init_spark_conf + +from zoo import init_nncontext + + +class SparkRunner(): + def __init__(self, spark_log_level="WARN", redirect_spark_log=True): + self.spark_log_level = spark_log_level + self.redirect_spark_log = redirect_spark_log + with SparkContext._lock: + if SparkContext._active_spark_context: + raise Exception("There's existing SparkContext. Please close it first.") + + import pyspark + print("Current pyspark location is : {}".format(pyspark.__file__)) + + # This is adopted from conda-pack. + def _pack_conda_main(self, args): + import sys + import traceback + from conda_pack.cli import fail, PARSER, context + import conda_pack + from conda_pack import pack, CondaPackException + args = PARSER.parse_args(args=args) + # Manually handle version printing to output to stdout in python < 3.4 + if args.version: + print('conda-pack %s' % conda_pack.__version__) + sys.exit(0) + + try: + with context.set_cli(): + pack(name=args.name, + prefix=args.prefix, + output=args.output, + format=args.format, + force=args.force, + compress_level=args.compress_level, + n_threads=args.n_threads, + zip_symlinks=args.zip_symlinks, + zip_64=not args.no_zip_64, + arcroot=args.arcroot, + dest_prefix=args.dest_prefix, + verbose=not args.quiet, + filters=args.filters) + except CondaPackException as e: + fail("CondaPackError: %s" % e) + except KeyboardInterrupt: + fail("Interrupted") + except Exception: + fail(traceback.format_exc()) + + def pack_penv(self, conda_name): + import tempfile + tmp_dir = tempfile.mkdtemp() + tmp_path = "{}/python_env.tar.gz".format(tmp_dir) + print("Start to pack current python env") + self._pack_conda_main(["--output", tmp_path, "--n-threads", "8", "--name", conda_name]) + print("Packing has been completed: {}".format(tmp_path)) + return tmp_path + + def _create_sc(self, submit_args, conf): + from pyspark.sql import SparkSession + print("pyspark_submit_args is: {}".format(submit_args)) + os.environ['PYSPARK_SUBMIT_ARGS'] = submit_args + zoo_conf = init_spark_conf() + for key, value in conf.items(): + zoo_conf.set(key=key, value=value) + sc = init_nncontext(conf=zoo_conf, redirect_spark_log=self.redirect_spark_log) + sc.setLogLevel(self.spark_log_level) + + return sc + + def _detect_python_location(self): + from zoo.ray.util.process import session_execute + process_info = session_execute("command -v python") + if 0 != process_info.errorcode: + raise Exception( + "Cannot detect current python location. Please set it manually by python_location") + return process_info.out + + def init_spark_on_local(self, cores, conf=None, python_location=None): + print("Start to getOrCreate SparkContext") + os.environ['PYSPARK_PYTHON'] =\ + python_location if python_location else self._detect_python_location() + master = "local[{}]".format(cores) + zoo_conf = init_spark_conf().setMaster(master) + if conf: + zoo_conf.setAll(conf.items()) + sc = init_nncontext(conf=zoo_conf, redirect_spark_log=self.redirect_spark_log) + sc.setLogLevel(self.spark_log_level) + print("Successfully got a SparkContext") + return sc + + def init_spark_on_yarn(self, + hadoop_conf, + conda_name, + num_executor, + executor_cores, + executor_memory="10g", + driver_memory="1g", + driver_cores=4, + extra_executor_memory_for_ray=None, + extra_python_lib=None, + penv_archive=None, + hadoop_user_name="root", + spark_yarn_archive=None, + jars=None): + os.environ["HADOOP_CONF_DIR"] = hadoop_conf + os.environ['HADOOP_USER_NAME'] = hadoop_user_name + os.environ['PYSPARK_PYTHON'] = "python_env/bin/python" + + def _yarn_opt(jars): + from zoo.util.engine import get_analytics_zoo_classpath + command = " --archives {}#python_env --num-executors {} " \ + " --executor-cores {} --executor-memory {}".\ + format(penv_archive, num_executor, executor_cores, executor_memory) + path_to_zoo_jar = get_analytics_zoo_classpath() + + if extra_python_lib: + command = command + " --py-files {} ".format(extra_python_lib) + + if jars: + command = command + " --jars {},{} ".format(jars, path_to_zoo_jar) + elif path_to_zoo_jar: + command = command + " --jars {} ".format(path_to_zoo_jar) + + if path_to_zoo_jar: + command = command + " --conf spark.driver.extraClassPath={} ".\ + format(get_analytics_zoo_classpath()) + return command + + def _submit_opt(): + conf = { + "spark.driver.memory": driver_memory, + "spark.driver.cores": driver_cores, + "spark.scheduler.minRegisterreResourcesRatio": "1.0"} + if extra_executor_memory_for_ray: + conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray + if spark_yarn_archive: + conf.insert("spark.yarn.archive", spark_yarn_archive) + return " --master yarn " + _yarn_opt(jars) + 'pyspark-shell', conf + + pack_env = False + assert penv_archive or conda_name, \ + "You should either specify penv_archive or conda_name explicitly" + try: + if not penv_archive: + penv_archive = self.pack_penv(conda_name) + pack_env = True + + submit_args, conf = _submit_opt() + sc = self._create_sc(submit_args, conf) + finally: + if conda_name and penv_archive and pack_env: + os.remove(penv_archive) + return sc diff --git a/python/orca/src/bigdl/orca/ray/util/utils.py b/python/orca/src/bigdl/orca/ray/util/utils.py new file mode 100644 index 00000000000..e1dc0922cf6 --- /dev/null +++ b/python/orca/src/bigdl/orca/ray/util/utils.py @@ -0,0 +1,50 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import re + + +def to_list(input): + if isinstance(input, (list, tuple)): + return list(input) + else: + return [input] + + +def resourceToBytes(resource_str): + matched = re.compile("([0-9]+)([a-z]+)?").match(resource_str.lower()) + fraction_matched = re.compile("([0-9]+\\.[0-9]+)([a-z]+)?").match(resource_str.lower()) + if fraction_matched: + raise Exception( + "Fractional values are not supported. Input was: {}".format(resource_str)) + try: + value = int(matched.group(1)) + postfix = matched.group(2) + if postfix == 'b': + value = value + elif postfix == 'k': + value = value * 1000 + elif postfix == "m": + value = value * 1000 * 1000 + elif postfix == 'g': + value = value * 1000 * 1000 * 1000 + else: + raise Exception("Not supported type: {}".format(resource_str)) + return value + except Exception: + raise Exception("Size must be specified as bytes(b)," + "kilobytes(k), megabytes(m), gigabytes(g). " + "E.g. 50b, 100k, 250m, 30g") diff --git a/python/orca/src/test/bigdl/orca/ray/__init__.py b/python/orca/src/test/bigdl/orca/ray/__init__.py new file mode 100644 index 00000000000..5976dc4df02 --- /dev/null +++ b/python/orca/src/test/bigdl/orca/ray/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/orca/src/test/bigdl/orca/ray/integration/__init__.py b/python/orca/src/test/bigdl/orca/ray/integration/__init__.py new file mode 100644 index 00000000000..5976dc4df02 --- /dev/null +++ b/python/orca/src/test/bigdl/orca/ray/integration/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/orca/src/test/bigdl/orca/ray/integration/ray_on_yarn.py b/python/orca/src/test/bigdl/orca/ray/integration/ray_on_yarn.py new file mode 100644 index 00000000000..cc9207bf087 --- /dev/null +++ b/python/orca/src/test/bigdl/orca/ray/integration/ray_on_yarn.py @@ -0,0 +1,71 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import ray + +from zoo import init_spark_on_yarn +from zoo.ray.util.raycontext import RayContext + +slave_num = 2 + +sc = init_spark_on_yarn( + hadoop_conf="/opt/work/almaren-yarn-config/", + conda_name="ray36-dev", + num_executor=slave_num, + executor_cores=28, + executor_memory="10g", + driver_memory="2g", + driver_cores=4, + extra_executor_memory_for_ray="30g") + +ray_ctx = RayContext(sc=sc, + object_store_memory="25g", + env={"http_proxy": "http://child-prc.intel.com:913", + "http_proxys": "http://child-prc.intel.com:913"}) +ray_ctx.init() + + +@ray.remote +class TestRay(): + def hostname(self): + import socket + return socket.gethostname() + + def check_cv2(self): + # conda install -c conda-forge opencv==3.4.2 + import cv2 + return cv2.__version__ + + def ip(self): + import ray.services as rservices + return rservices.get_node_ip_address() + + def network(self): + from urllib.request import urlopen + try: + urlopen('http://www.baidu.com', timeout=3) + return True + except Exception as err: + return False + + +actors = [TestRay.remote() for i in range(0, slave_num)] +print([ray.get(actor.hostname.remote()) for actor in actors]) +print([ray.get(actor.ip.remote()) for actor in actors]) +# print([ray.get(actor.network.remote()) for actor in actors]) + +ray_ctx.stop() diff --git a/python/orca/src/test/bigdl/orca/ray/test_ray_on_local.py b/python/orca/src/test/bigdl/orca/ray/test_ray_on_local.py new file mode 100644 index 00000000000..689ac0ba8c2 --- /dev/null +++ b/python/orca/src/test/bigdl/orca/ray/test_ray_on_local.py @@ -0,0 +1,54 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from unittest import TestCase + +import numpy as np +import psutil +import pytest +import ray + +from zoo import init_spark_on_local +from zoo.ray.util.raycontext import RayContext + +np.random.seed(1337) # for reproducibility + + +@ray.remote +class TestRay(): + def hostname(self): + import socket + return socket.gethostname() + + +class TestUtil(TestCase): + + def test_local(self): + node_num = 4 + sc = init_spark_on_local(cores=node_num) + ray_ctx = RayContext(sc=sc) + ray_ctx.init() + actors = [TestRay.remote() for i in range(0, node_num)] + print([ray.get(actor.hostname.remote()) for actor in actors]) + ray_ctx.stop() + sc.stop() + for process_info in ray_ctx.ray_processesMonitor.process_infos: + for pid in process_info.pids: + assert not psutil.pid_exists(pid) + sc.stop() + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/python/orca/src/test/bigdl/orca/ray/test_util.py b/python/orca/src/test/bigdl/orca/ray/test_util.py new file mode 100644 index 00000000000..ee2b4d70bfe --- /dev/null +++ b/python/orca/src/test/bigdl/orca/ray/test_util.py @@ -0,0 +1,45 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from unittest import TestCase + +import numpy as np +import pytest + +import zoo.ray.util.utils as rutils + +np.random.seed(1337) # for reproducibility + + +class TestUtil(TestCase): + + def test_split(self): + vector = np.ones([10]) + result = rutils.split(vector, 4) + assert len(result) == 4 + assert len(result[0]) == 3 + assert len(result[1]) == 3 + assert len(result[2]) == 2 + assert len(result[3]) == 2 + + def test_resource_to_bytes(self): + assert 10 == rutils.resourceToBytes("10b") + assert 10000 == rutils.resourceToBytes("10k") + assert 10000000 == rutils.resourceToBytes("10m") + assert 10000000000 == rutils.resourceToBytes("10g") + + +if __name__ == "__main__": + pytest.main([__file__])