Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
hdfs remote: add support for directories as output for dvc run
Browse files Browse the repository at this point in the history
  • Loading branch information
amjadsaadeh committed Oct 30, 2018
1 parent c4295fa commit 79aae45
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions dvc/remote/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import getpass
import posixpath
from subprocess import Popen, PIPE
import json

from dvc.config import Config
from dvc.remote.base import RemoteBase
from dvc.remote.local import RemoteLOCAL
from dvc.exceptions import DvcException
from dvc.logger import Logger
from dvc.utils import fix_env
from dvc.utils import fix_env, bytes_md5


class RemoteHDFS(RemoteBase):
Expand Down Expand Up @@ -45,7 +46,7 @@ def hadoop_fs(self, cmd, user=None):
stderr=PIPE)
out, err = p.communicate()
if p.returncode != 0:
raise DvcException('HDFS command failed: {}: {}'.format(cmd, err))
raise DvcException('HDFS command failed: {}: {}'.format(cmd, err), cause=p.returncode)
return out.decode('utf-8')

@staticmethod
Expand All @@ -56,9 +57,39 @@ def _group(regex, s, gname):

def checksum(self, path_info):
regex = r'.*\t.*\t(?P<checksum>.*)'
stdout = self.hadoop_fs('checksum {}'.format(path_info['url']),
user=path_info['user'])
return self._group(regex, stdout, 'checksum')
# Check for dir
try:
self.hadoop_fs('test -d {}'.format(path_info['url']),
user=path_info['user'])
is_dir = True
except DvcException as e:
if e.cause == 1:
is_dir = False
else:
raise e

if is_dir:
msg = "Computing md5 for a directory hdfs path {}. " \
"This might take a while, due to latency of hadoop command."
Logger.info(msg.format(os.path.relpath(path_info['url'])))
out = self.hadoop_fs('ls {}'.format(path_info['url']),
user=path_info['user'])
# First line is summary, last line is empty
file_list = [file_path.split(' ')[-1] for file_path in out.split('\n')[1:-1]]
checksums = list()
for file_path in file_list:
stdout = self.hadoop_fs('checksum {}'.format(file_path),
user=path_info['user'])
checksums.append(self._group(regex, stdout, 'checksum'))
# Sort to guarantee determnistic checksum
checksums = sorted(checksums)
byts = json.dumps(checksums, sort_keys=True).encode('utf-8')
checksum = bytes_md5(byts)
else:
stdout = self.hadoop_fs('checksum {}'.format(path_info['url']),
user=path_info['user'])
checksum = self._group(regex, stdout, 'checksum')
return checksum

def cp(self, from_info, to_info):
self.hadoop_fs('mkdir -p {}'.format(posixpath.dirname(to_info['url'])),
Expand Down

0 comments on commit 79aae45

Please sign in to comment.