Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objects: fix gdrive CI issue #6338

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def pull(
jobs=jobs,
src_index=get_index(odb),
cache_odb=self.repo.odb.local,
verify=odb.verify,
)

def status(
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def info(self, path_info):
item_id = self._get_item_id(path_info)
gdrive_file = self._drive.CreateFile({"id": item_id})
gdrive_file.FetchMetadata(fields="fileSize")
return {"size": gdrive_file.get("fileSize"), "type": "file"}
return {"size": int(gdrive_file.get("fileSize")), "type": "file"}

def _upload_fobj(self, fobj, to_info, **kwargs):
dirname = to_info.parent
Expand Down
17 changes: 13 additions & 4 deletions dvc/objects/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def wrapper(odb, obj, *args, **kwargs):
return wrapper


def _transfer(src, dest, dir_objs, file_objs, missing, jobs, **kwargs):
def _transfer(src, dest, dir_objs, file_objs, missing, jobs, verify, **kwargs):
from . import save
from .stage import is_memfs_staging

Expand All @@ -54,7 +54,14 @@ def _transfer(src, dest, dir_objs, file_objs, missing, jobs, **kwargs):
func = pbar.wrap_fn(func)
with ThreadPoolExecutor(max_workers=jobs) as executor:
processor = partial(
_create_tasks, executor, jobs, func, src, dest, is_staged
_create_tasks,
executor,
jobs,
func,
src,
dest,
is_staged,
verify,
)
processor.save_func = func
_do_transfer(
Expand All @@ -69,7 +76,7 @@ def _transfer(src, dest, dir_objs, file_objs, missing, jobs, **kwargs):
return total


def _create_tasks(executor, jobs, func, src, dest, is_staged, objs):
def _create_tasks(executor, jobs, func, src, dest, is_staged, verify, objs):
fails = 0
obj_iter = iter(objs)

Expand All @@ -80,7 +87,7 @@ def create_taskset(amount):
dest,
_raw_obj(src, obj, is_staged),
move=False,
verify=src.verify,
verify=verify,
)
for obj in itertools.islice(obj_iter, amount)
}
Expand Down Expand Up @@ -180,6 +187,7 @@ def transfer(
dest: "ObjectDB",
objs: Iterable["HashFile"],
jobs: Optional[int] = None,
verify: bool = False,
**kwargs,
) -> int:
"""Transfer (copy) the specified objects from one ODB to another.
Expand Down Expand Up @@ -214,5 +222,6 @@ def transfer(
files,
status.missing,
jobs,
verify,
**kwargs,
)