From 8681cc3cd1b50c13ea6c90f7eca5c7f9788d78c7 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Fri, 22 Nov 2019 14:39:56 +0100 Subject: [PATCH] perf: optimize cache listing for local, ssh and hdfs - make all three lazy - simplify local one - use deque instead of list for hdfs --- dvc/remote/hdfs.py | 33 +++++++++++++-------------------- dvc/remote/local.py | 12 +----------- dvc/remote/ssh/__init__.py | 4 +++- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/dvc/remote/hdfs.py b/dvc/remote/hdfs.py index 75469ed18b..0301fce3ca 100644 --- a/dvc/remote/hdfs.py +++ b/dvc/remote/hdfs.py @@ -1,24 +1,20 @@ from __future__ import unicode_literals +from dvc.utils.compat import FileNotFoundError, urlparse import io import logging import os import posixpath import re -from contextlib import closing -from contextlib import contextmanager -from subprocess import PIPE -from subprocess import Popen +from collections import deque +from contextlib import closing, contextmanager +import subprocess -from .base import RemoteBASE -from .base import RemoteCmdError +from .base import RemoteBASE, RemoteCmdError from .pool import get_connection from dvc.config import Config from dvc.scheme import Schemes -from dvc.utils import fix_env -from dvc.utils import tmp_fname -from dvc.utils.compat import FileNotFoundError -from dvc.utils.compat import urlparse +from dvc.utils import fix_env, tmp_fname logger = logging.getLogger(__name__) @@ -68,15 +64,15 @@ def hadoop_fs(self, cmd, user=None): close_fds = os.name != "nt" executable = os.getenv("SHELL") if os.name != "nt" else None - p = Popen( + p = subprocess.Popen( cmd, shell=True, close_fds=close_fds, executable=executable, env=fix_env(os.environ), - stdin=PIPE, - stdout=PIPE, - stderr=PIPE, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) out, err = p.communicate() if p.returncode != 0: @@ -156,10 +152,9 @@ def open(self, path_info, mode="r", encoding=None): def list_cache_paths(self): if not self.exists(self.path_info): - return [] + return - files = [] - dirs = [self.path_info.path] + dirs = deque([self.path_info.path]) with self.hdfs(self.path_info) as hdfs: while dirs: @@ -167,6 +162,4 @@ def list_cache_paths(self): if entry["kind"] == "directory": dirs.append(urlparse(entry["name"]).path) elif entry["kind"] == "file": - files.append(urlparse(entry["name"]).path) - - return files + yield urlparse(entry["name"]).path diff --git a/dvc/remote/local.py b/dvc/remote/local.py index bd20e91b55..6e7900fd5f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -85,17 +85,7 @@ def supported(cls, config): def list_cache_paths(self): assert self.path_info is not None - - clist = [] - for entry in os.listdir(fspath_py35(self.path_info)): - subdir = self.path_info / entry - if not os.path.isdir(fspath_py35(subdir)): - continue - clist.extend( - subdir / cache for cache in os.listdir(fspath_py35(subdir)) - ) - - return clist + return walk_files(self.path_info, None) def get(self, md5): if not md5: diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index e8469f968c..589bea8d1e 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -264,7 +264,9 @@ def open(self, path_info, mode="r", encoding=None): def list_cache_paths(self): with self.ssh(self.path_info) as ssh: - return list(ssh.walk_files(self.path_info.path)) + # If we simply return an iterator then with above closes instantly + for path in ssh.walk_files(self.path_info.path): + yield path def walk_files(self, path_info): with self.ssh(path_info) as ssh: