Skip to content

Commit

Permalink
Orca: refactor HDFS operation in estimator (intel#5325)
Browse files Browse the repository at this point in the history
* fix: use default value if no hdfs port specified.

* fix: add default hdfs port when saving pkl

* refactor: replace pyarrow deps in estimator.

* fix: add process wait()

* fix: resolve unnecessary change
  • Loading branch information
lalalapotter authored and ForJadeForest committed Sep 20, 2022
1 parent f83e509 commit 1e24b29
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
56 changes: 28 additions & 28 deletions python/orca/src/bigdl/orca/data/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ def exists(path):
invalidOperationError(False, str(ex), cause=ex)
return True
elif path.startswith("hdfs://"):
import pyarrow as pa
host_port = path.split("://")[1].split("/")[0].split(":")
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
return fs.exists(path)
cmd = 'hdfs dfs -test -e {}; echo $?'.format(path)
result = subprocess.getstatusoutput(cmd)
if result[0] == 0:
return result[1] == '0'
else:
invalidOperationError(False, result[1])
return False
else:
if path.startswith("file://"):
path = path[len("file://"):]
Expand Down Expand Up @@ -193,7 +193,10 @@ def makedirs(path):
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
if len(host_port) > 1:
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
else:
fs = pa.hdfs.connect(host=host_port[0])
return fs.mkdir(path)
else:
if path.startswith("file://"):
Expand Down Expand Up @@ -267,13 +270,13 @@ def is_file(path):
except Exception as ex:
invalidOperationError(False, str(ex), cause=ex)
elif path.startswith("hdfs://"):
import pyarrow as pa
host_port = path.split("://")[1].split("/")[0].split(":")
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
return fs.isfile(path)
cmd = 'hdfs dfs -test -f {}; echo $?'.format(path)
result = subprocess.getstatusoutput(cmd)
if result[0] == 0:
return result[1] == '0'
else:
invalidOperationError(False, result[1])
return False
else:
if path.startswith("file://"):
path = path[len("file://"):]
Expand All @@ -288,7 +291,10 @@ def put_local_dir_to_remote(local_dir, remote_dir):
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
if len(host_port) > 1:
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
else:
fs = pa.hdfs.connect(host=host_port[0])
if not fs.exists(remote_dir):
fs.mkdir(remote_dir)
for file in os.listdir(local_dir):
Expand Down Expand Up @@ -369,20 +375,14 @@ def put_local_dir_tree_to_remote(local_dir, remote_dir):

def put_local_file_to_remote(local_path, remote_path, filemode=None):
if remote_path.startswith("hdfs"): # hdfs://url:port/file_path
import pyarrow as pa
host_port = remote_path.split("://")[1].split("/")[0].split(":")
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
try:
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
remote_dir = os.path.dirname(remote_path)
if not fs.exists(remote_dir):
fs.mkdir(remote_dir)
with open(local_path, "rb") as f:
fs.upload(remote_path, f)
cmd = 'hdfs dfs -put -f {} {}'.format(local_path, remote_path)
process = subprocess.Popen(cmd, shell=True)
process.wait()
if filemode:
fs.chmod(remote_path, filemode)
chmod_cmd = 'hdfs dfs -chmod {} {}'.format(filemode, remote_path)
process = subprocess.Popen(chmod_cmd, shell=True)
process.wait()
except Exception as e:
logger.error("Cannot upload file {} to {}: error: "
.format(local_path, remote_path, str(e)))
Expand Down
12 changes: 5 additions & 7 deletions python/orca/src/bigdl/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,12 @@ def data_length(data):

def save_pkl(data, path):
if path.startswith("hdfs"): # hdfs://url:port/file_path
import pyarrow as pa
host_port = path.split("://")[1].split("/")[0].split(":")
classpath = subprocess.Popen(["hadoop", "classpath", "--glob"],
stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
fs = pa.hdfs.connect(host=host_port[0], port=int(host_port[1]))
with fs.open(path, 'wb') as f:
import uuid
file_name = str(uuid.uuid1()) + ".pkl"
temp_path = os.path.join(tempfile.gettempdir(), file_name)
with open(temp_path, 'wb') as f:
pickle.dump(data, f)
put_local_file_to_remote(temp_path, path)
elif path.startswith("s3"): # s3://bucket/file_path
access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
Expand Down

0 comments on commit 1e24b29

Please sign in to comment.