Skip to content

Commit

Permalink
We should search and find the necessary jars from python env rather t…
Browse files Browse the repository at this point in the history
…han upload them again (intel-analytics#1460)

* fix path

* jars
  • Loading branch information
zhichao-li committed Jun 17, 2019
1 parent df43335 commit 8a0e626
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions python/orca/src/bigdl/orca/ray/util/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
#

import os
import glob

from pyspark import SparkContext

from zoo.common.nncontext import get_analytics_zoo_conf, init_spark_conf
from zoo.common.nncontext import init_spark_conf

from zoo import init_nncontext

Expand All @@ -27,10 +28,10 @@ class SparkRunner():
def __init__(self, spark_log_level="WARN", redirect_spark_log=True):
self.spark_log_level = spark_log_level
self.redirect_spark_log = redirect_spark_log
self.PYTHON_ENV = "python_env"
with SparkContext._lock:
if SparkContext._active_spark_context:
raise Exception("There's existing SparkContext. Please close it first.")

import pyspark
print("Current pyspark location is : {}".format(pyspark.__file__))

Expand Down Expand Up @@ -72,7 +73,7 @@ def _pack_conda_main(self, args):
def pack_penv(self, conda_name):
import tempfile
tmp_dir = tempfile.mkdtemp()
tmp_path = "{}/python_env.tar.gz".format(tmp_dir)
tmp_path = "{}/{}.tar.gz".format(tmp_dir, self.PYTHON_ENV)
print("Start to pack current python env")
self._pack_conda_main(["--output", tmp_path, "--n-threads", "8", "--name", conda_name])
print("Packing has been completed: {}".format(tmp_path))
Expand All @@ -98,21 +99,34 @@ def _detect_python_location(self):
"Cannot detect current python location. Please set it manually by python_location")
return process_info.out

def _gather_essential_jars(self):
def _get_bigdl_jar_name_on_driver(self):
from bigdl.util.engine import get_bigdl_classpath
from zoo.util.engine import get_analytics_zoo_classpath
bigdl_classpath = get_bigdl_classpath()
zoo_classpath = get_analytics_zoo_classpath()
assert bigdl_classpath, "Cannot find bigdl classpath"
return bigdl_classpath.split("/")[-1]

def _get_zoo_jar_name_on_driver(self):
from zoo.util.engine import get_analytics_zoo_classpath
zoo_classpath = get_analytics_zoo_classpath()
assert zoo_classpath, "Cannot find Analytics-Zoo classpath"
if bigdl_classpath == zoo_classpath:
return [zoo_classpath]
else:
return [zoo_classpath, bigdl_classpath]
return zoo_classpath.split("/")[-1]

def _assemble_zoo_classpath_for_executor(self):
conda_env_path = "/".join(self._detect_python_location().split("/")[:-2])
python_interpreters = glob.glob("{}/lib/python*".format(conda_env_path))
assert len(python_interpreters) == 1, \
"Conda env should contain a single python, but got: {}:".format(python_interpreters)
python_interpreter_name = python_interpreters[0].split("/")[-1]
prefix = "{}/lib/{}/site-packages/".format(self.PYTHON_ENV, python_interpreter_name)
return ["{}/zoo/share/lib/{}".format(prefix,
self._get_zoo_jar_name_on_driver()),
"{}/bigdl/share/lib/{}".format(prefix,
self._get_bigdl_jar_name_on_driver())
]

def init_spark_on_local(self, cores, conf=None, python_location=None):
print("Start to getOrCreate SparkContext")
os.environ['PYSPARK_PYTHON'] =\
os.environ['PYSPARK_PYTHON'] = \
python_location if python_location else self._detect_python_location()
master = "local[{}]".format(cores)
zoo_conf = init_spark_conf().setMaster(master)
Expand Down Expand Up @@ -140,20 +154,17 @@ def init_spark_on_yarn(self,
jars=None):
os.environ["HADOOP_CONF_DIR"] = hadoop_conf
os.environ['HADOOP_USER_NAME'] = hadoop_user_name
os.environ['PYSPARK_PYTHON'] = "python_env/bin/python"
os.environ['PYSPARK_PYTHON'] = "{}/bin/python".format(self.PYTHON_ENV)

def _yarn_opt(jars):
command = " --archives {}#python_env --num-executors {} " \
" --executor-cores {} --executor-memory {}".\
format(penv_archive, num_executor, executor_cores, executor_memory)
jars_list = self._gather_essential_jars()
if jars:
jars_list.append(jars)
command = " --archives {}#{} --num-executors {} " \
" --executor-cores {} --executor-memory {}". \
format(penv_archive, self.PYTHON_ENV, num_executor, executor_cores, executor_memory)

if extra_python_lib:
command = command + " --py-files {} ".format(extra_python_lib)

command = command + " --jars {}".format(",".join(jars_list))
if jars:
command = command + " --jars {}".format(jars)
return command

def _submit_opt():
Expand All @@ -176,9 +187,19 @@ def _submit_opt():
pack_env = True

submit_args, conf = _submit_opt()
if spark_conf:
for item in spark_conf.items():
conf[str(item[0])] = str(item[1])

if not spark_conf:
spark_conf = {}
zoo_bigdl_path_on_executor = ":".join(self._assemble_zoo_classpath_for_executor())

if "spark.executor.extraClassPath" in spark_conf:
spark_conf["spark.executor.extraClassPath"] = "{}:{}".format(
zoo_bigdl_path_on_executor, spark_conf["spark.executor.extraClassPath"])
else:
spark_conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor

for item in spark_conf.items():
conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, conf)
finally:
if conda_name and penv_archive and pack_env:
Expand Down

0 comments on commit 8a0e626

Please sign in to comment.