Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote: locally index list of checksums available on cloud remotes #3634

Merged
merged 32 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3314bf5
remote: locally index list of checksums available on cloud remotes
pmrowla Apr 6, 2020
da8a9d6
fix review issues
pmrowla Apr 7, 2020
aac712a
remote: use .dir checksums to validate index
pmrowla Apr 15, 2020
f586eca
command: add --drop-index option for push/pull/status
pmrowla Apr 15, 2020
581dd76
tests: --drop-index unit tests
pmrowla Apr 15, 2020
cc6fe9f
remote: make index threadsafe
pmrowla Apr 15, 2020
68c6a9c
remote: force re-index after gc
pmrowla Apr 15, 2020
fbd76a3
only save/dump index once per command
pmrowla Apr 15, 2020
adc50bd
remote: add helper index functions update_all/replace_all
pmrowla Apr 15, 2020
7c194bd
remote: store index as sqlite3 database
pmrowla Apr 15, 2020
e90e901
fix deepsource warnings
pmrowla Apr 16, 2020
bb1748b
rename --drop-index to --clear-index
pmrowla Apr 16, 2020
53516f9
rename index.invalidate to index.clear
pmrowla Apr 16, 2020
ebb0940
functional tests for remote.index
pmrowla Apr 16, 2020
6b2fcd2
fix index not being updated on status -c
pmrowla Apr 16, 2020
26bedbe
remote: add index.intersection()
pmrowla Apr 16, 2020
4251d88
remote: use index when making assumptions about remote .dir contents
pmrowla Apr 16, 2020
2674eaf
add missing func tests
pmrowla Apr 16, 2020
aef4973
do not index standalone files
pmrowla Apr 17, 2020
b31034b
cleanup index test fixture
pmrowla Apr 17, 2020
0d5b3f4
fix deepsource warnings
pmrowla Apr 17, 2020
7ba6305
update autocompletion scripts/cli help message for --clear-index
pmrowla Apr 17, 2020
ed77f9b
tests: remove unused mocker
pmrowla Apr 17, 2020
6f8745a
push: only index successfully pushed dirs
pmrowla Apr 20, 2020
9102838
remote: revert behavior to trust .dir files on remote
pmrowla Apr 20, 2020
dd845e0
fix missing RemoteIndexNoop functions
pmrowla Apr 20, 2020
3e0d258
separate index validation from status dir exists query
pmrowla Apr 20, 2020
739202b
bugfix: include indexed checksums for all cache_exists cases
pmrowla Apr 20, 2020
4a7d5a0
review fix: cleanup debug messages
pmrowla Apr 20, 2020
f6a684a
tests: fix DS warning
pmrowla Apr 20, 2020
63c7d9d
remove --clear-index
pmrowla Apr 21, 2020
3ec83f6
remove unused index functions
pmrowla Apr 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import errno
import hashlib
import itertools
import json
import logging
import tempfile
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from functools import partial
from functools import partial, wraps
from multiprocessing import cpu_count
from operator import itemgetter

Expand All @@ -23,6 +24,7 @@
from dvc.ignore import DvcIgnore
from dvc.path_info import PathInfo, URLInfo, WindowsPathInfo
from dvc.progress import Tqdm
from dvc.remote.index import RemoteIndex, RemoteIndexNoop
from dvc.remote.slow_link_detection import slow_link_guard
from dvc.state import StateNoop
from dvc.utils import tmp_fname
Expand Down Expand Up @@ -70,11 +72,24 @@ def __init__(self, checksum):
)


def index_locked(f):
efiop marked this conversation as resolved.
Show resolved Hide resolved
@wraps(f)
def wrapper(remote_obj, *args, **kwargs):
remote = kwargs.get("remote")
if remote:
with remote.index:
return f(remote_obj, *args, **kwargs)
return f(remote_obj, *args, **kwargs)

return wrapper


class RemoteBASE(object):
scheme = "base"
path_cls = URLInfo
REQUIRES = {}
JOBS = 4 * cpu_count()
INDEX_CLS = RemoteIndex

PARAM_RELPATH = "relpath"
CHECKSUM_DIR_SUFFIX = ".dir"
Expand Down Expand Up @@ -111,6 +126,15 @@ def __init__(self, repo, config):
self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES)
self.cache_type_confirmed = False

url = config.get("url")
if url:
index_name = hashlib.sha256(url.encode("utf-8")).hexdigest()
self.index = self.INDEX_CLS(
self.repo, index_name, dir_suffix=self.CHECKSUM_DIR_SUFFIX
)
else:
self.index = RemoteIndexNoop()

@classmethod
def get_missing_deps(cls):
import importlib
Expand Down Expand Up @@ -734,6 +758,7 @@ def all(self, jobs=None, name=None):
remote_size, remote_checksums, jobs, name
)

@index_locked
def gc(self, named_cache, jobs=None):
used = self.extract_used_local_checksums(named_cache)

Expand All @@ -754,6 +779,8 @@ def gc(self, named_cache, jobs=None):
self._remove_unpacked_dir(checksum)
self.remove(path_info)
removed = True
if removed:
self.index.clear()
return removed

def is_protected(self, path_info):
Expand Down Expand Up @@ -872,10 +899,18 @@ def cache_exists(self, checksums, jobs=None, name=None):
# cache_exists() (see ssh, local)
assert self.TRAVERSE_PREFIX_LEN >= 2

if len(checksums) == 1 or not self.CAN_TRAVERSE:
return self._cache_object_exists(checksums, jobs, name)
checksums = set(checksums)
indexed_checksums = set(self.index.intersection(checksums))
checksums -= indexed_checksums
logger.debug(
"Matched '{}' indexed checksums".format(len(indexed_checksums))
)
if not checksums:
return indexed_checksums

checksums = frozenset(checksums)
if len(checksums) == 1 or not self.CAN_TRAVERSE:
remote_checksums = self._cache_object_exists(checksums, jobs, name)
return list(indexed_checksums) + remote_checksums

# Max remote size allowed for us to use traverse method
remote_size, remote_checksums = self._estimate_cache_size(
Expand All @@ -898,19 +933,25 @@ def cache_exists(self, checksums, jobs=None, name=None):
len(checksums), traverse_weight
)
)
return list(
checksums & remote_checksums
) + self._cache_object_exists(
checksums - remote_checksums, jobs, name
return (
list(indexed_checksums)
+ list(checksums & remote_checksums)
+ self._cache_object_exists(
checksums - remote_checksums, jobs, name
)
)

logger.debug(
"Querying {} checksums via traverse".format(len(checksums))
"Querying '{}' checksums via traverse".format(len(checksums))
)
remote_checksums = self._cache_checksums_traverse(
remote_size, remote_checksums, jobs, name
remote_checksums = set(
self._cache_checksums_traverse(
remote_size, remote_checksums, jobs, name
)
)
return list(indexed_checksums) + list(
checksums & set(remote_checksums)
)
return list(checksums & set(remote_checksums))

def _checksums_with_limit(
self, limit, prefix=None, progress_callback=None
Expand Down
255 changes: 255 additions & 0 deletions dvc/remote/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import logging
import os
import sqlite3
import threading

from funcy import lchunks

from dvc.state import _connect_sqlite

logger = logging.getLogger(__name__)


class RemoteIndexNoop:
"""No-op class for remotes which are not indexed (i.e. local)."""

def __init__(self, *args, **kwargs):
pass

def __enter__(self):
pass

def __exit__(self, typ, value, tbck):
pass

def __iter__(self):
return iter([])

def __contains__(self, checksum):
return False

@staticmethod
def checksums():
return []

@staticmethod
def dir_checksums():
return []

def load(self):
pass

def dump(self):
pass

def clear(self):
pass

def replace(self, *args):
pass

def replace_all(self, *args):
pass

def update(self, *args):
pass

def update_all(self, *args):
pass

@staticmethod
def intersection(*args):
return []


class RemoteIndex:
"""Class for indexing remote checksums in a sqlite3 database.

Args:
repo: repo for this remote index.
name: name for this index. Index db will be loaded from and saved to
``.dvc/tmp/index/{name}.idx``.
dir_suffix: suffix used for naming directory checksums
"""

INDEX_SUFFIX = ".idx"
VERSION = 1
INDEX_TABLE = "remote_index"
INDEX_TABLE_LAYOUT = "checksum TEXT PRIMARY KEY, " "dir INTEGER NOT NULL"
Comment on lines +68 to +69
Copy link
Contributor Author

@pmrowla pmrowla Apr 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using a database, would it be better to have a single .dvc/tmp/index db, with appropriate tables/columns for handling multiple remotes, vs having multiple single-table .dvc/tmp/index/... dbs?

I'm not sure how common it is for users to have multiple remotes configured, but in the case where you have multiple large remotes, as it is right now we would be duplicating data across multiple (potentially large) index db files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla If we are talking about storing unpacked *.dir cache files, then there doesn't seem to be a point to store them for each remote, since no matter the remote if it has *.dir present we trust that it also has all cache files from within it. So maybe it is a premature optimization for later general indexes for whole remotes.


def __init__(self, repo, name, dir_suffix=".dir"):
self.path = os.path.join(
repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX)
)

self.dir_suffix = dir_suffix
self.database = None
self.cursor = None
self.modified = False
self.lock = threading.Lock()

def __iter__(self):
cmd = "SELECT checksum FROM {}".format(self.INDEX_TABLE)
for (checksum,) in self._execute(cmd):
yield checksum

def __enter__(self):
self.lock.acquire()
self.load()

def __exit__(self, typ, value, tbck):
self.dump()
self.lock.release()

def __contains__(self, checksum):
cmd = "SELECT checksum FROM {} WHERE checksum = (?)".format(
self.INDEX_TABLE
)
self._execute(cmd, (checksum,))
return self.cursor.fetchone() is not None

def checksums(self):
"""Iterate over checksums stored in the index."""
return iter(self)

def dir_checksums(self):
"""Iterate over .dir checksums stored in the index."""
cmd = "SELECT checksum FROM {} WHERE dir = 1".format(self.INDEX_TABLE)
for (checksum,) in self._execute(cmd):
yield checksum

def is_dir_checksum(self, checksum):
return checksum.endswith(self.dir_suffix)

def _execute(self, cmd, parameters=()):
return self.cursor.execute(cmd, parameters)

def _executemany(self, cmd, seq_of_parameters):
return self.cursor.executemany(cmd, seq_of_parameters)

def _prepare_db(self, empty=False):
if not empty:
cmd = "PRAGMA user_version;"
self._execute(cmd)
ret = self.cursor.fetchall()
assert len(ret) == 1
assert len(ret[0]) == 1
assert isinstance(ret[0][0], int)
version = ret[0][0]

if version != self.VERSION:
logger.error(
"Index file version '{}' will be reformatted "
"to the current version '{}'.".format(
version, self.VERSION,
)
)
cmd = "DROP TABLE IF EXISTS {};"
self._execute(cmd.format(self.INDEX_TABLE))

cmd = "CREATE TABLE IF NOT EXISTS {} ({})"
self._execute(cmd.format(self.INDEX_TABLE, self.INDEX_TABLE_LAYOUT))

cmd = "PRAGMA user_version = {};"
self._execute(cmd.format(self.VERSION))

def load(self):
"""(Re)load this index database."""
retries = 1
while True:
assert self.database is None
assert self.cursor is None

empty = not os.path.isfile(self.path)
self.database = _connect_sqlite(self.path, {"nolock": 1})
self.cursor = self.database.cursor()

try:
self._prepare_db(empty=empty)
return
except sqlite3.DatabaseError:
self.cursor.close()
self.database.close()
self.database = None
self.cursor = None
if retries > 0:
os.unlink(self.path)
retries -= 1
else:
raise

def dump(self):
"""Save this index database."""
assert self.database is not None

self.database.commit()
self.cursor.close()
self.database.close()
self.database = None
self.cursor = None

def clear(self):
"""Clear this index (to force re-indexing later).

Changes to the index will not committed until dump() is called.
"""
cmd = "DELETE FROM {}".format(self.INDEX_TABLE)
self._execute(cmd)

def replace(self, dir_checksums, file_checksums):
"""Replace the contents of this index with the specified checksums.

Changes to the index will not committed until dump() is called.
"""
self.clear()
self.update(dir_checksums, file_checksums)

def replace_all(self, checksums):
pmrowla marked this conversation as resolved.
Show resolved Hide resolved
"""Replace the contents of this index with the specified checksums.

Changes to the index will not committed until dump() is called.
"""
self.clear()
self.update_all(checksums)

def update(self, dir_checksums, file_checksums):
"""Update this index, adding the specified checksums.

Changes to the index will not committed until dump() is called.
"""
cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format(
self.INDEX_TABLE
)
self._executemany(
cmd, ((checksum, True) for checksum in dir_checksums)
)
self._executemany(
cmd, ((checksum, False) for checksum in file_checksums)
)

def update_all(self, checksums):
"""Update this index, adding the specified checksums.

Changes to the index will not committed until dump() is called.
"""
cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format(
self.INDEX_TABLE
)
self._executemany(
cmd,
(
(checksum, self.is_dir_checksum(checksum))
for checksum in checksums
),
)

def intersection(self, checksums):
"""Iterate over values from `checksums` which exist in the index."""
# sqlite has a compile time limit of 999, see:
# https://www.sqlite.org/c3ref/c_limit_attached.html#sqlitelimitvariablenumber
for chunk in lchunks(999, checksums):
cmd = "SELECT checksum FROM {} WHERE checksum IN ({})".format(
self.INDEX_TABLE, ",".join("?" for checksum in chunk)
)
for (checksum,) in self._execute(cmd, chunk):
yield checksum
Loading