diff --git a/python/orca/src/bigdl/orca/data/file.py b/python/orca/src/bigdl/orca/data/file.py index 5cff0eaa660..d859a7832b4 100644 --- a/python/orca/src/bigdl/orca/data/file.py +++ b/python/orca/src/bigdl/orca/data/file.py @@ -155,13 +155,13 @@ def exists(path): invalidOperationError(False, str(ex), cause=ex) return True elif path.startswith("hdfs://"): - import pyarrow as pa - host_port = path.split("://")[1].split("/")[0].split(":") - classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], - stdout=subprocess.PIPE).communicate()[0] - os.environ["CLASSPATH"] = classpath.decode("utf-8") - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) - return fs.exists(path) + cmd = 'hdfs dfs -test -e {}; echo $?'.format(path) + result = subprocess.getstatusoutput(cmd) + if result[0] == 0: + return result[1] == '0' + else: + invalidOperationError(False, result[1]) + return False else: if path.startswith("file://"): path = path[len("file://"):] @@ -193,7 +193,10 @@ def makedirs(path): classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0] os.environ["CLASSPATH"] = classpath.decode("utf-8") - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) + if len(host_port) > 1: + fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) + else: + fs = pa.hdfs.connect(host=host_port[0]) return fs.mkdir(path) else: if path.startswith("file://"): @@ -267,13 +270,13 @@ def is_file(path): except Exception as ex: invalidOperationError(False, str(ex), cause=ex) elif path.startswith("hdfs://"): - import pyarrow as pa - host_port = path.split("://")[1].split("/")[0].split(":") - classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], - stdout=subprocess.PIPE).communicate()[0] - os.environ["CLASSPATH"] = classpath.decode("utf-8") - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) - return fs.isfile(path) + cmd = 'hdfs dfs -test -f {}; echo $?'.format(path) + result = subprocess.getstatusoutput(cmd) + if result[0] == 0: + return result[1] == '0' + else: + invalidOperationError(False, result[1]) + return False else: if path.startswith("file://"): path = path[len("file://"):] @@ -288,7 +291,10 @@ def put_local_dir_to_remote(local_dir, remote_dir): classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0] os.environ["CLASSPATH"] = classpath.decode("utf-8") - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) + if len(host_port) > 1: + fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) + else: + fs = pa.hdfs.connect(host=host_port[0]) if not fs.exists(remote_dir): fs.mkdir(remote_dir) for file in os.listdir(local_dir): @@ -369,20 +375,14 @@ def put_local_dir_tree_to_remote(local_dir, remote_dir): def put_local_file_to_remote(local_path, remote_path, filemode=None): if remote_path.startswith("hdfs"): # hdfs://url:port/file_path - import pyarrow as pa - host_port = remote_path.split("://")[1].split("/")[0].split(":") - classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], - stdout=subprocess.PIPE).communicate()[0] - os.environ["CLASSPATH"] = classpath.decode("utf-8") try: - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) - remote_dir = os.path.dirname(remote_path) - if not fs.exists(remote_dir): - fs.mkdir(remote_dir) - with open(local_path, "rb") as f: - fs.upload(remote_path, f) + cmd = 'hdfs dfs -put -f {} {}'.format(local_path, remote_path) + process = subprocess.Popen(cmd, shell=True) + process.wait() if filemode: - fs.chmod(remote_path, filemode) + chmod_cmd = 'hdfs dfs -chmod {} {}'.format(filemode, remote_path) + process = subprocess.Popen(chmod_cmd, shell=True) + process.wait() except Exception as e: logger.error("Cannot upload file {} to {}: error: " .format(local_path, remote_path, str(e))) diff --git a/python/orca/src/bigdl/orca/learn/utils.py b/python/orca/src/bigdl/orca/learn/utils.py index 2c8e9f4777e..deb59d7d2ce 100644 --- a/python/orca/src/bigdl/orca/learn/utils.py +++ b/python/orca/src/bigdl/orca/learn/utils.py @@ -469,14 +469,12 @@ def data_length(data): def save_pkl(data, path): if path.startswith("hdfs"): # hdfs://url:port/file_path - import pyarrow as pa - host_port = path.split("://")[1].split("/")[0].split(":") - classpath = subprocess.Popen(["hadoop", "classpath", "--glob"], - stdout=subprocess.PIPE).communicate()[0] - os.environ["CLASSPATH"] = classpath.decode("utf-8") - fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1])) - with fs.open(path, 'wb') as f: + import uuid + file_name = str(uuid.uuid1()) + ".pkl" + temp_path = os.path.join(tempfile.gettempdir(), file_name) + with open(temp_path, 'wb') as f: pickle.dump(data, f) + put_local_file_to_remote(temp_path, path) elif path.startswith("s3"): # s3://bucket/file_path access_key_id = os.environ["AWS_ACCESS_KEY_ID"] secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]