Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] cache/remote: drop dos2unix MD5 by default #5449

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
67b7187
objects: add odb config/versioning methods
pmrowla Feb 11, 2021
fc9e13a
cache/remote: update ODB config file as needed on writes
pmrowla Feb 11, 2021
894884a
utils: add dos2unix flag for file_md5
pmrowla Feb 11, 2021
67706f3
fix schema issues
pmrowla Feb 11, 2021
977949f
update file_md5/dos2unix tests
pmrowla Feb 11, 2021
b134901
mock out config load calls for unit tests with no tree
pmrowla Feb 11, 2021
fd512b7
fix leftovers
efiop Feb 14, 2021
1ce4eae
tree -> fs
efiop Feb 14, 2021
6af643d
remote: use self.cache for odb config/versioning
pmrowla Feb 15, 2021
dcd8f3e
add enum for supported hash names
pmrowla Feb 15, 2021
1b64c0f
fs: replace PARAM_CHECKSUM with hash_name property
pmrowla Feb 15, 2021
5d75220
empty config should load version info by default
pmrowla Feb 15, 2021
556eede
oid: account for state not storing hash name
pmrowla Feb 15, 2021
821cd04
tests: update func/test_add
pmrowla Feb 15, 2021
99d741a
odb/cache: protect config file after writing
pmrowla Feb 15, 2021
764b962
tests: update func/test_cache
pmrowla Feb 15, 2021
7320534
tests: update func/test_gc
pmrowla Feb 15, 2021
345d81b
tests: update func/test_run_single_stage
pmrowla Feb 15, 2021
16cdb85
restore config option that was removed on rebase
pmrowla Feb 15, 2021
8f54529
nonexistent config should be loaded as v1 not latest
pmrowla Feb 15, 2021
b7f29d6
tests: s/PARAM_CHECKSUM/hash_name/
pmrowla Feb 15, 2021
14cd4d6
odb/cache: lazily load config
pmrowla Feb 15, 2021
33ff7fd
add odb config tests
pmrowla Feb 15, 2021
304afbb
fix hash_name in HashedStreamReader
pmrowla Feb 15, 2021
507dab5
use local tempfile and upload_fobj when dumping odb config
pmrowla Feb 15, 2021
8148835
ci fixes
pmrowla Feb 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from copy import copy
from typing import Optional

from funcy import cached_property

from dvc.exceptions import DvcException
from dvc.progress import Tqdm

from ..objects import HashFile, ObjectFormatError
Expand All @@ -26,6 +29,18 @@ def __init__(self, fs):
self.DEFAULT_CACHE_TYPES
)
self.cache_type_confirmed = False
self._config_modified = False

@property
def version(self):
from dvc.odb.versions import LATEST_VERSION
from dvc.parsing.versions import SCHEMA_KWD

return self.config.get(SCHEMA_KWD, LATEST_VERSION)

@cached_property
def config(self):
return self._load_config()

def move(self, from_info, to_info):
self.fs.move(from_info, to_info)
Expand All @@ -49,6 +64,8 @@ def add(self, path_info, fs, hash_info, **kwargs):
except (ObjectFormatError, FileNotFoundError):
pass

self.dump_config()

cache_info = self.hash_to_path_info(hash_info.value)
# using our makedirs to create dirs with proper permissions
self.makedirs(cache_info.parent)
Expand Down Expand Up @@ -411,3 +428,36 @@ def hashes_exist(self, hashes, jobs=None, name=None):
self.list_hashes_traverse(remote_size, remote_hashes, jobs, name)
)
return list(hashes & set(remote_hashes))

def _load_config(self):
from dvc.odb.config import CONFIG_FILENAME, load_config, migrate_config
from dvc.odb.versions import LATEST_VERSION, ODB_VERSION
from dvc.parsing.versions import SCHEMA_KWD

config_path = self.fs.path_info / CONFIG_FILENAME
config = load_config(self.fs.path_info / CONFIG_FILENAME, self.fs)
version = config.get(SCHEMA_KWD, LATEST_VERSION)

dos2unix = self.repo.config["core"].get("dos2unix", False)
if dos2unix and version != ODB_VERSION.V1:
raise DvcException(
f"dos2unix MD5 is incompatible with ODB version '{version}'"
)

if not dos2unix and version != ODB_VERSION.V2:
migrate_config(config)
self._config_modified = True
elif not self.fs.exists(config_path):
self._config_modified = True
return config

def dump_config(self):
from dvc.odb.config import CONFIG_FILENAME, dump_config

if self.config and self._config_modified:
config_path = self.fs.path_info / CONFIG_FILENAME
if self.fs.exists(config_path):
self.unprotect(config_path)
dump_config(self.config, config_path, self.fs)
self.protect(config_path)
self._config_modified = False
2 changes: 1 addition & 1 deletion dvc/cache/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def hashes_exist(
unit="file",
desc="Querying " + ("cache in " + name if name else "local cache"),
):
hash_info = HashInfo(self.fs.PARAM_CHECKSUM, hash_)
hash_info = HashInfo(self.fs.hash_name, hash_)
try:
self.check(hash_info)
ret.append(hash_)
Expand Down
2 changes: 1 addition & 1 deletion dvc/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _remove(path_info, fs, cache, force=False):
fs.remove(path_info)
return

current = get_hash(path_info, fs, fs.PARAM_CHECKSUM)
current = get_hash(path_info, fs, fs.hash_name)
try:
obj = load(cache, current)
check(cache, obj)
Expand Down
1 change: 1 addition & 0 deletions dvc/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class RelPath(str):
Optional("autostage", default=False): Bool,
Optional("experiments"): Bool, # obsoleted
Optional("check_update", default=True): Bool,
Optional("dos2unix", default=False): Bool,
},
"cache": {
"local": str,
Expand Down
5 changes: 3 additions & 2 deletions dvc/fs/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from funcy import cached_property, wrap_prop

from dvc.hash_info import HashName
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand All @@ -21,8 +22,8 @@ class AzureFileSystem(BaseFileSystem):
"azure-storage-blob": "azure.storage.blob",
"knack": "knack",
}
PARAM_CHECKSUM = "etag"
DETAIL_FIELDS = frozenset(("etag", "size"))
_DEFAULT_HASH = HashName.ETAG
DETAIL_FIELDS = frozenset((str(HashName.ETAG.value), "size"))

COPY_POLL_SECONDS = 5
LIST_OBJECT_PAGE_SIZE = 5000
Expand Down
13 changes: 12 additions & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from funcy import cached_property, decorator

from dvc.exceptions import DvcException
from dvc.hash_info import HashName
from dvc.path_info import URLInfo
from dvc.progress import Tqdm
from dvc.state import StateNoop
Expand Down Expand Up @@ -59,7 +60,7 @@ class BaseFileSystem:
# Needed for some providers, and http open()
CHUNK_SIZE = 64 * 1024 * 1024 # 64 MiB

PARAM_CHECKSUM: ClassVar[Optional[str]] = None
_DEFAULT_HASH: ClassVar[Optional["HashName"]] = None
DETAIL_FIELDS: FrozenSet[str] = frozenset()

state = StateNoop()
Expand All @@ -72,6 +73,16 @@ def __init__(self, repo, config):

self.path_info = None

@cached_property
def hash_name(self) -> Optional[str]:
if (
self._DEFAULT_HASH == HashName.MD5
and self.repo
and self.repo.config["core"].get("dos2unix", False)
):
return HashName.MD5_D2U.value
return self._DEFAULT_HASH.value if self._DEFAULT_HASH else None

@cached_property
def jobs(self):
return (
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing

from dvc.exceptions import OutputNotFoundError
from dvc.hash_info import HashName
from dvc.path_info import PathInfo
from dvc.utils import relpath

Expand All @@ -24,7 +25,7 @@ class DvcFileSystem(BaseFileSystem): # pylint:disable=abstract-method
"""

scheme = "local"
PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5

def __init__(self, repo):
super().__init__(repo, {"url": repo.root_dir})
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from funcy.py3 import cat

from dvc.exceptions import DvcException, FileMissingError
from dvc.hash_info import HashName
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand Down Expand Up @@ -88,7 +89,7 @@ def __init__(self, url):
class GDriveFileSystem(BaseFileSystem):
scheme = Schemes.GDRIVE
PATH_CLS = GDriveURLInfo
PARAM_CHECKSUM = "checksum"
_DEFAULT_HASH = HashName.CHECKSUM
REQUIRES = {"pydrive2": "pydrive2"}
# Always prefer traverse for GDrive since API usage quotas are a concern.
TRAVERSE_WEIGHT_MULTIPLIER = 1
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from funcy import cached_property, wrap_prop

from dvc.exceptions import DvcException
from dvc.hash_info import HashName
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand Down Expand Up @@ -54,7 +55,7 @@ class GSFileSystem(BaseFileSystem):
scheme = Schemes.GS
PATH_CLS = CloudURLInfo
REQUIRES = {"google-cloud-storage": "google.cloud.storage"}
PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5
DETAIL_FIELDS = frozenset(("md5", "size"))

def __init__(self, repo, config):
Expand Down
6 changes: 3 additions & 3 deletions dvc/fs/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from contextlib import closing, contextmanager
from urllib.parse import urlparse

from dvc.hash_info import HashInfo
from dvc.hash_info import HashInfo, HashName
from dvc.progress import Tqdm
from dvc.scheme import Schemes
from dvc.utils import fix_env, tmp_fname
Expand Down Expand Up @@ -62,7 +62,7 @@ class HDFSFileSystem(BaseFileSystem):
scheme = Schemes.HDFS
REQUIRES = {"pyarrow": "pyarrow"}
REGEX = r"^hdfs://((?P<user>.*)@)?.*$"
PARAM_CHECKSUM = "checksum"
_DEFAULT_HASH = HashName.CHECKSUM
TRAVERSE_PREFIX_LEN = 2

def __init__(self, repo, config):
Expand Down Expand Up @@ -229,7 +229,7 @@ def info(self, path_info):

def checksum(self, path_info):
return HashInfo(
"checksum",
HashName.CHECKSUM.value,
_hadoop_fs_checksum(path_info),
size=self.getsize(path_info),
)
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dvc.prompt as prompt
from dvc.exceptions import DvcException, HTTPError
from dvc.hash_info import HashName
from dvc.path_info import HTTPURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand All @@ -27,7 +28,7 @@ def ask_password(host, user):
class HTTPFileSystem(BaseFileSystem): # pylint:disable=abstract-method
scheme = Schemes.HTTP
PATH_CLS = HTTPURLInfo
PARAM_CHECKSUM = "etag"
_DEFAULT_HASH = HashName.ETAG
CAN_TRAVERSE = False

SESSION_RETRIES = 5
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from funcy import cached_property

from dvc.hash_info import HashName
from dvc.path_info import PathInfo
from dvc.scheme import Schemes
from dvc.system import System
Expand All @@ -18,7 +19,7 @@
class LocalFileSystem(BaseFileSystem):
scheme = Schemes.LOCAL
PATH_CLS = PathInfo
PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5
PARAM_PATH = "path"
TRAVERSE_PREFIX_LEN = 2

Expand Down
6 changes: 4 additions & 2 deletions dvc/fs/memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from dvc.hash_info import HashName

from .base import BaseFileSystem


class MemoryFileSystem(BaseFileSystem):
scheme = "local"
PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5

def __init__(self, repo, config):
from fsspec.implementations.memory import MemoryFileSystem as MemFS
Expand All @@ -13,7 +15,7 @@ def __init__(self, repo, config):
self.fs = MemFS()

def exists(self, path_info, use_dvcignore=True):
return self.fs.exists(path_info.path)
return self.fs.exists(path_info.fspath)

def open(self, path_info, mode="r", encoding=None, **kwargs):
return self.fs.open(
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from funcy import cached_property, wrap_prop

from dvc.hash_info import HashName
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand Down Expand Up @@ -35,7 +36,7 @@ class OSSFileSystem(BaseFileSystem): # pylint:disable=abstract-method
scheme = Schemes.OSS
PATH_CLS = CloudURLInfo
REQUIRES = {"oss2": "oss2"}
PARAM_CHECKSUM = "etag"
_DEFAULT_HASH = HashName.ETAG
COPY_POLL_SECONDS = 5
LIST_OBJECT_PAGE_SIZE = 100

Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from funcy import lfilter, wrap_with

from dvc.hash_info import HashName
from dvc.path_info import PathInfo

from .base import BaseFileSystem
Expand All @@ -32,7 +33,7 @@ class RepoFileSystem(BaseFileSystem): # pylint:disable=abstract-method
"""

scheme = "local"
PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5

def __init__(
self, repo, subrepos=False, repo_factory: RepoFactory = None,
Expand Down
6 changes: 3 additions & 3 deletions dvc/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dvc.config import ConfigError
from dvc.exceptions import DvcException, ETagMismatchError
from dvc.hash_info import HashInfo
from dvc.hash_info import HashInfo, HashName
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand All @@ -24,7 +24,7 @@ class S3FileSystem(BaseFileSystem):
scheme = Schemes.S3
PATH_CLS = CloudURLInfo
REQUIRES = {"boto3": "boto3"}
PARAM_CHECKSUM = "etag"
_DEFAULT_HASH = HashName.ETAG
DETAIL_FIELDS = frozenset(("etag", "size"))

def __init__(self, repo, config):
Expand Down Expand Up @@ -423,7 +423,7 @@ def _copy(cls, s3, from_info, to_info, extra_args):

def etag(self, path_info):
with self._get_obj(path_info) as obj:
return HashInfo("etag", size=obj.content_length,)
return HashInfo(HashName.ETAG.value, size=obj.content_length,)

def _upload_fobj(self, fobj, to_info):
with self._get_obj(to_info) as obj:
Expand Down
6 changes: 3 additions & 3 deletions dvc/fs/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from funcy import first, memoize, silent, wrap_with

import dvc.prompt as prompt
from dvc.hash_info import HashInfo
from dvc.hash_info import HashInfo, HashName
from dvc.scheme import Schemes

from ..base import BaseFileSystem
Expand All @@ -35,7 +35,7 @@ class SSHFileSystem(BaseFileSystem):
REQUIRES = {"paramiko": "paramiko"}
JOBS = 4

PARAM_CHECKSUM = "md5"
_DEFAULT_HASH = HashName.MD5
# At any given time some of the connections will go over network and
# paramiko stuff, so we would ideally have it double of server processors.
# We use conservative setting of 4 instead to not exhaust max sessions.
Expand Down Expand Up @@ -238,7 +238,7 @@ def reflink(self, from_info, to_info):
def md5(self, path_info):
with self.ssh(path_info) as ssh:
return HashInfo(
"md5",
HashName.MD5.value,
ssh.md5(path_info.path),
size=ssh.getsize(path_info.path),
)
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from dvc.config import ConfigError
from dvc.exceptions import DvcException
from dvc.hash_info import HashName
from dvc.path_info import HTTPURLInfo, WebDAVURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
Expand Down Expand Up @@ -42,7 +43,7 @@ class WebDAVFileSystem(BaseFileSystem): # pylint:disable=abstract-method
# Chunk size for buffered upload/download with progress bar
CHUNK_SIZE = 2 ** 16

PARAM_CHECKSUM = "etag"
_DEFAULT_HASH = HashName.ETAG
DETAIL_FIELDS = frozenset(("etag", "size"))

# Constructor
Expand Down
Loading