Skip to content

Commit

Permalink
Merge pull request #2836 from Suor/lazy-cache-paths
Browse files Browse the repository at this point in the history
perf: optimize cache listing for local, ssh and hdfs
  • Loading branch information
efiop authored Nov 28, 2019
2 parents 00744bc + 8681cc3 commit 838efb2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 32 deletions.
33 changes: 13 additions & 20 deletions dvc/remote/hdfs.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
from __future__ import unicode_literals

from dvc.utils.compat import FileNotFoundError, urlparse
import io
import logging
import os
import posixpath
import re
from contextlib import closing
from contextlib import contextmanager
from subprocess import PIPE
from subprocess import Popen
from collections import deque
from contextlib import closing, contextmanager
import subprocess

from .base import RemoteBASE
from .base import RemoteCmdError
from .base import RemoteBASE, RemoteCmdError
from .pool import get_connection
from dvc.config import Config
from dvc.scheme import Schemes
from dvc.utils import fix_env
from dvc.utils import tmp_fname
from dvc.utils.compat import FileNotFoundError
from dvc.utils.compat import urlparse
from dvc.utils import fix_env, tmp_fname

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,15 +64,15 @@ def hadoop_fs(self, cmd, user=None):
close_fds = os.name != "nt"

executable = os.getenv("SHELL") if os.name != "nt" else None
p = Popen(
p = subprocess.Popen(
cmd,
shell=True,
close_fds=close_fds,
executable=executable,
env=fix_env(os.environ),
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate()
if p.returncode != 0:
Expand Down Expand Up @@ -156,17 +152,14 @@ def open(self, path_info, mode="r", encoding=None):

def list_cache_paths(self):
if not self.exists(self.path_info):
return []
return

files = []
dirs = [self.path_info.path]
dirs = deque([self.path_info.path])

with self.hdfs(self.path_info) as hdfs:
while dirs:
for entry in hdfs.ls(dirs.pop(), detail=True):
if entry["kind"] == "directory":
dirs.append(urlparse(entry["name"]).path)
elif entry["kind"] == "file":
files.append(urlparse(entry["name"]).path)

return files
yield urlparse(entry["name"]).path
12 changes: 1 addition & 11 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,7 @@ def supported(cls, config):

def list_cache_paths(self):
assert self.path_info is not None

clist = []
for entry in os.listdir(fspath_py35(self.path_info)):
subdir = self.path_info / entry
if not os.path.isdir(fspath_py35(subdir)):
continue
clist.extend(
subdir / cache for cache in os.listdir(fspath_py35(subdir))
)

return clist
return walk_files(self.path_info, None)

def get(self, md5):
if not md5:
Expand Down
4 changes: 3 additions & 1 deletion dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ def open(self, path_info, mode="r", encoding=None):

def list_cache_paths(self):
with self.ssh(self.path_info) as ssh:
return list(ssh.walk_files(self.path_info.path))
# If we simply return an iterator then with above closes instantly
for path in ssh.walk_files(self.path_info.path):
yield path

def walk_files(self, path_info):
with self.ssh(path_info) as ssh:
Expand Down

0 comments on commit 838efb2

Please sign in to comment.