Skip to content

Commit

Permalink
Support init_spark_on_yarn and RayContext (intel-analytics#1344)
Browse files Browse the repository at this point in the history
* rayrunner

* add a jvm killer

* disable killer from spark job and rely on jvm killer only

* add env and verify the cv2 installation

* enhance

* minor

* style

* comments

* local and enhancement

* better local strategy

* doc and style

* doc

* style

* revert

* doc

* disable

* comments

* fix test
  • Loading branch information
zhichao-li committed Jun 10, 2019
1 parent 0debd4e commit d12bd76
Show file tree
Hide file tree
Showing 11 changed files with 982 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python/orca/src/bigdl/orca/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
46 changes: 46 additions & 0 deletions python/orca/src/bigdl/orca/ray/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import signal

from zoo.ray.util.utils import to_list
import ray.services as rservices


def gen_shutdown_per_node(pgids, node_ips=None):
pgids = to_list(pgids)

def _shutdown_per_node(iter):
print("Stopping pgids: {}".format(pgids))
if node_ips:
current_node_ip = rservices.get_node_ip_address()
effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip]
else:
effect_pgids = pgids
for pgid in pgids:
print("Stopping by pgid {}".format(pgid))
try:
os.killpg(pgid, signal.SIGTERM)
except Exception:
print("WARNING: cannot kill pgid: {}".format(pgid))

return _shutdown_per_node


def is_local(sc):
master = sc._conf.get("spark.master")
return master == "local" or master.startswith("local[")
147 changes: 147 additions & 0 deletions python/orca/src/bigdl/orca/ray/util/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import subprocess
import signal
import atexit
import sys
import psutil

from zoo.ray.util import gen_shutdown_per_node, is_local


class ProcessInfo(object):
def __init__(self, out, err, errorcode, pgid, tag="default", pids=None, node_ip=None):
self.out = str(out.strip())
self.err = str(err.strip())
self.pgid = pgid
self.pids = pids
self.errorcode = errorcode
self.tag = tag
self.master_addr = None
self.node_ip = node_ip

def __str__(self):
return "node_ip: {} tag: {}, pgid: {}, pids: {}, returncode: {}, \
master_addr: {}, \n {} {}".format(self.node_ip, self.tag, self.pgid,
self.pids,
self.errorcode,
self.master_addr,
self.out,
self.err)


def pids_from_gpid(gpid):
processes = psutil.process_iter()
result = []
for proc in processes:
try:
if os.getpgid(proc.pid) == gpid:
result.append(proc.pid)
except Exception:
pass
return result


def session_execute(command, env=None, tag=None, fail_fast=False, timeout=120):
pro = subprocess.Popen(
command,
shell=True,
env=env,
cwd=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid)
pgid = os.getpgid(pro.pid)
out, err = pro.communicate(timeout=timeout)
out = out.decode("utf-8")
err = err.decode("utf-8")
print(out)
print(err)
errorcode = pro.returncode
if errorcode != 0:
if fail_fast:
raise Exception(err)
print(err)
else:
print(out)
return ProcessInfo(out=out,
err=err,
errorcode=pro.returncode,
pgid=pgid,
pids=pids_from_gpid(pgid),
tag=tag)


class ProcessMonitor:
def __init__(self, process_infos, sc, ray_rdd, verbose=False):
self.sc = sc
self.verbose = verbose
self.ray_rdd = ray_rdd
self.master = []
self.slaves = []
self.pgids = []
self.node_ips = []
self.process_infos = process_infos
for process_info in process_infos:
self.pgids.append(process_info.pgid)
self.node_ips.append(process_info.node_ip)
if process_info.master_addr:
self.master.append(process_info)
else:
self.slaves.append(process_info)
ProcessMonitor.register_shutdown_hook(extra_close_fn=self.clean_fn)
assert len(self.master) == 1, \
"We should got 1 master only, but we got {}".format(len(self.master))
self.master = self.master[0]
if not is_local(self.sc):
self.print_ray_remote_err_out()

def print_ray_remote_err_out(self):
if self.master.errorcode != 0:
raise Exception(str(self.master))
for slave in self.slaves:
if slave.errorcode != 0:
raise Exception(str(slave))
if self.verbose:
print(self.master)
for slave in self.slaves:
print(slave)

def clean_fn(self):
import ray
ray.shutdown()
if not is_local(self.sc):
self.ray_rdd.map(gen_shutdown_per_node(self.pgids, self.node_ips)).collect()
else:
gen_shutdown_per_node(self.pgids, self.node_ips)([])

@staticmethod
def register_shutdown_hook(pgid=None, extra_close_fn=None):
def _shutdown():
if pgid:
gen_shutdown_per_node(pgid)(0)
if extra_close_fn:
extra_close_fn()

def _signal_shutdown(_signo, _stack_frame):
_shutdown()
sys.exit(0)

atexit.register(_shutdown)
signal.signal(signal.SIGTERM, _signal_shutdown)
signal.signal(signal.SIGINT, _signal_shutdown)
Loading

0 comments on commit d12bd76

Please sign in to comment.