Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize cache listing for local, ssh and hdfs #2836

Merged
merged 1 commit into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions dvc/remote/hdfs.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
from __future__ import unicode_literals

from dvc.utils.compat import FileNotFoundError, urlparse
import io
import logging
import os
import posixpath
import re
from contextlib import closing
from contextlib import contextmanager
from subprocess import PIPE
from subprocess import Popen
from collections import deque
from contextlib import closing, contextmanager
import subprocess

from .base import RemoteBASE
from .base import RemoteCmdError
from .base import RemoteBASE, RemoteCmdError
from .pool import get_connection
from dvc.config import Config
from dvc.scheme import Schemes
from dvc.utils import fix_env
from dvc.utils import tmp_fname
from dvc.utils.compat import FileNotFoundError
from dvc.utils.compat import urlparse
from dvc.utils import fix_env, tmp_fname

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,15 +64,15 @@ def hadoop_fs(self, cmd, user=None):
close_fds = os.name != "nt"

executable = os.getenv("SHELL") if os.name != "nt" else None
p = Popen(
p = subprocess.Popen(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

cmd,
shell=True,
close_fds=close_fds,
executable=executable,
env=fix_env(os.environ),
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate()
if p.returncode != 0:
Expand Down Expand Up @@ -156,17 +152,14 @@ def open(self, path_info, mode="r", encoding=None):

def list_cache_paths(self):
if not self.exists(self.path_info):
return []
return

files = []
dirs = [self.path_info.path]
dirs = deque([self.path_info.path])

with self.hdfs(self.path_info) as hdfs:
while dirs:
for entry in hdfs.ls(dirs.pop(), detail=True):
if entry["kind"] == "directory":
dirs.append(urlparse(entry["name"]).path)
elif entry["kind"] == "file":
files.append(urlparse(entry["name"]).path)

return files
yield urlparse(entry["name"]).path
12 changes: 1 addition & 11 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,7 @@ def supported(cls, config):

def list_cache_paths(self):
assert self.path_info is not None

clist = []
for entry in os.listdir(fspath_py35(self.path_info)):
subdir = self.path_info / entry
if not os.path.isdir(fspath_py35(subdir)):
continue
clist.extend(
subdir / cache for cache in os.listdir(fspath_py35(subdir))
)

return clist
return walk_files(self.path_info, None)

def get(self, md5):
if not md5:
Expand Down
4 changes: 3 additions & 1 deletion dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ def open(self, path_info, mode="r", encoding=None):

def list_cache_paths(self):
with self.ssh(self.path_info) as ssh:
return list(ssh.walk_files(self.path_info.path))
# If we simply return an iterator then with above closes instantly
for path in ssh.walk_files(self.path_info.path):
yield path
Comment on lines +267 to +269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that mean that connection will be open until the iterator is closed?
Considering that now it is used only in remote.all, which was supposed to create a generator in the first place this change is good.

I am just wondering if it will not become fragile if, in the future, we will start to wrap some logic around this method (for any reason). But also, I am worrying prematurely here. I just wanted to ask this and keep in mind for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will hold the connection open. Not sure this will be an issue.


def walk_files(self, path_info):
with self.ssh(path_info) as ssh:
Expand Down