Skip to content

Commit

Permalink
Merge pull request #430 from yairl/master
Browse files Browse the repository at this point in the history
dagshub upload: variable batch size.
  • Loading branch information
kbolashev authored Feb 13, 2024
2 parents 6ce3983 + cf33421 commit 570d20c
Showing 1 changed file with 54 additions and 25 deletions.
79 changes: 54 additions & 25 deletions dagshub/upload/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def upload_files(
new_branch: str = None,
last_commit: str = None,
force: bool = False,
quiet: bool = False,
):
"""
Upload a list of binary files to the specified directory.
Expand All @@ -313,6 +314,8 @@ def upload_files(
Exists to prevent accidental overwrites of data.
force (bool): Force the upload of a file even if it is already present on the server.
Sets last_commit to be the tip of the branch
quiet (bool): Don't show messages about starting/successfully completing an upload.
Set to True when uploading a directory
"""

if commit_message is None:
Expand Down Expand Up @@ -366,23 +369,24 @@ def upload_files(
if force:
data["last_commit"] = self._last_upload_revision

log_message(f'Uploading files ({len(files)}) to "{self._api.full_name}"...', logger)
if not quiet:
log_message(f'Uploading files ({len(files)}) to "{self._api.full_name}"...', logger)
res = s.put(
upload_url,
data=data,
files=[("files", file) for file in files],
auth=self.auth,
timeout=None,
)
self._log_upload_details(data, res, files)
self._log_upload_details(data, res, files, quiet)

# The ETag header contains the hash of the uploaded commit,
# check against the one we have to determine if anything changed
if "ETag" in res.headers:
new_tip = res.headers["ETag"]
self._last_upload_had_changes = new_tip != self._last_upload_revision

def _log_upload_details(self, data: Dict[str, Any], res: httpx.Response, files):
def _log_upload_details(self, data: Dict[str, Any], res: httpx.Response, files, quiet: bool):
"""
The _log_upload_details function debug logs the request URL, data, and files.
It also prints for the user the status of their upload if it was successful
Expand All @@ -392,6 +396,7 @@ def _log_upload_details(self, data: Dict[str, Any], res: httpx.Response, files):
data: Executed request's body
res: Server's response
files: Uploaded file contents
quiet: Log successful upload
"""

logger.debug(
Expand All @@ -406,7 +411,8 @@ def _log_upload_details(self, data: Dict[str, Any], res: httpx.Response, files):
if new_tip == self._last_upload_revision:
log_message("Upload successful, content was identical and no new commit was created", logger)
return
log_message("Upload finished successfully!", logger)
if not quiet:
log_message("Upload finished successfully!", logger)
elif res.status_code == HTTPStatus.NO_CONTENT:
log_message("Upload successful, content was identical and no new commit was created", logger)
elif 200 < res.status_code < 300:
Expand Down Expand Up @@ -582,8 +588,16 @@ def add_dir(self, local_path, glob_exclude="", commit_message=None, **upload_kwa
The keyword arguments are passed to :func:`Repo.upload_files() <dagshub.upload.Repo.upload_files>`.
"""
upload_file_number = 100
max_batch_file_size = 100 * 1024 * 1024
file_counter = 0

total_num_files = 0
for root, dirs, files in os.walk(local_path):
for filename in files:
rel_file_path = posixpath.join(root, filename)
if glob_exclude == "" or fnmatch.fnmatch(rel_file_path, glob_exclude) is False:
total_num_files += 1

progress = rich.progress.Progress(
rich.progress.SpinnerColumn(),
*rich.progress.Progress.get_default_columns(),
Expand All @@ -592,45 +606,60 @@ def add_dir(self, local_path, glob_exclude="", commit_message=None, **upload_kwa
transient=True,
disable=config.quiet,
)
total_task = progress.add_task("Uploading files...", total=None)
total_task = progress.add_task("Uploading files...", total=total_num_files)
self.repo.current_progress = progress

# If user hasn't specified versioning, then assume we're uploading dvc (this makes most sense for folders)
if "versioning" not in upload_kwargs:
upload_kwargs["versioning"] = "dvc"

upload_kwargs["quiet"] = True

try:
with progress:
for root, dirs, files in os.walk(local_path):
if len(files) == 0:
continue

folder_task = progress.add_task(f"Uploading files from {root}", total=len(files))

if commit_message is None:
commit_message = upload_kwargs.get("commit_message", f"Commit data points in folder {root}")
if "commit_message" in upload_kwargs:
del upload_kwargs["commit_message"]

if len(files) > 0:
for filename in files:
rel_file_path = posixpath.join(root, filename)
file_batches = []
current_file_batch = []
current_batch_file_size = 0

for filename in files:
rel_file_path = posixpath.join(root, filename)
if glob_exclude == "" or fnmatch.fnmatch(rel_file_path, glob_exclude) is False:
current_file_batch.append(rel_file_path)
current_batch_file_size += os.path.getsize(rel_file_path)

if (
len(current_file_batch) >= upload_file_number
or current_batch_file_size >= max_batch_file_size
):
file_batches.append(current_file_batch)
current_file_batch = []
current_batch_file_size = 0

if current_file_batch:
file_batches.append(current_file_batch)

for batch in file_batches:
for rel_file_path in batch:
rel_remote_file_path = rel_file_path.replace(local_path, "")
if glob_exclude == "" or fnmatch.fnmatch(rel_file_path, glob_exclude) is False:
self.add(file=rel_file_path, path=rel_remote_file_path)
if len(self.files) >= upload_file_number:
file_counter += len(self.files)
self.commit(commit_message, **upload_kwargs)
progress.update(folder_task, advance=len(self.files), refresh=True)
progress.update(total_task, completed=file_counter, refresh=True)
if len(self.files) >= upload_file_number:
file_counter += len(self.files)
self.commit(commit_message, **upload_kwargs)
progress.update(folder_task, advance=len(self.files), refresh=True)
progress.update(total_task, completed=file_counter, refresh=True)
progress.remove_task(folder_task)
self.add(file=rel_file_path, path=rel_remote_file_path)

file_counter += len(self.files)
self.commit(commit_message, **upload_kwargs)
progress.update(folder_task, advance=len(batch), refresh=True)
progress.update(total_task, completed=file_counter, refresh=True)

if len(self.files) > 0:
file_counter += len(self.files)
self.commit(commit_message, **upload_kwargs)
progress.update(total_task, completed=file_counter)
progress.remove_task(folder_task)

log_message(
f"Directory upload complete, uploaded {file_counter} files"
Expand Down

0 comments on commit 570d20c

Please sign in to comment.