Skip to content

Commit

Permalink
Merge pull request #473 from latchbio/ayush/latch-cp-link-fixies
Browse files Browse the repository at this point in the history
fix cp issues
  • Loading branch information
ayushkamat authored Jul 17, 2024
2 parents 4a7ce3d + 7ed20db commit b976b31
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 44 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Types of changes

# Latch SDK Changelog

## 2.49.1 - 2024-07-17

### Fixed

* `latch cp` bug where directories containing symlinks would hang and not complete uploading

## 2.49.0 - 2024-07-17

### Changed
Expand Down
106 changes: 63 additions & 43 deletions latch/ldata/_transfer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class UploadJob:
class UploadResult:
num_files: int
total_bytes: int
total_time: int
total_time: float


def upload(
Expand Down Expand Up @@ -218,7 +218,10 @@ def upload(
)
)

wait(chunk_futs)
for fut in as_completed(chunk_futs):
exc = fut.exception()
if exc is not None:
raise exc

if progress != Progress.none:
print("\x1b[0GFinalizing uploads...")
Expand Down Expand Up @@ -261,7 +264,10 @@ def upload(
)
)

wait(chunk_futs)
for fut in as_completed(chunk_futs):
exc = fut.exception()
if exc is not None:
raise exc

end_upload(
normalized,
Expand Down Expand Up @@ -298,12 +304,13 @@ def start_upload(
if not src.exists():
raise ValueError(f"could not find {src}: no such file or link")

resolved = src
if src.is_symlink():
src = src.resolve()
resolved = src.resolve()

content_type, _ = mimetypes.guess_type(src)
content_type, _ = mimetypes.guess_type(resolved)
if content_type is None:
with open(src, "rb") as f:
with open(resolved, "rb") as f:
sample = f.read(Units.KiB)

try:
Expand All @@ -312,7 +319,7 @@ def start_upload(
except UnicodeDecodeError:
content_type = "application/octet-stream"

file_size = src.stat().st_size
file_size = resolved.stat().st_size
if file_size > latch_constants.maximum_upload_size:
raise ValueError(
f"file is {with_si_suffix(file_size)} which exceeds the maximum"
Expand Down Expand Up @@ -383,48 +390,61 @@ def upload_file_chunk(
upload_id: Optional[str] = None,
dest: Optional[str] = None,
) -> CompletedPart:
time.sleep(0.1 * random.random())

with open(src, "rb") as f:
f.seek(part_size * part_index)
data = f.read(part_size)
# todo(ayush): proper exception handling that aborts everything
try:
time.sleep(0.1 * random.random())

res = tinyrequests.put(url, data=data)
if res.status_code != 200:
raise HTTPException(
f"failed to upload part {part_index} of {src}: {res.status_code}"
with open(src, "rb") as f:
f.seek(part_size * part_index)
data = f.read(part_size)

res = tinyrequests.put(url, data=data)
if res.status_code != 200:
raise HTTPException(
f"failed to upload part {part_index} of {src}: {res.status_code}"
)

etag = res.headers["ETag"]
assert etag is not None, (
f"Malformed response from chunk upload for {src}, Part {part_index},"
f" Headers: {res.headers}"
)

ret = CompletedPart(
src=src,
etag=res.headers["ETag"],
part_number=part_index + 1,
)

if parts_by_source is not None:
parts_by_source[src].append(ret)
ret = CompletedPart(
src=src,
etag=etag,
part_number=part_index + 1,
)

if progress_bars is not None:
progress_bars.update(pbar_index, len(data))
pending_parts = progress_bars.dec_usage(str(src))
if parts_by_source is not None:
parts_by_source[src].append(ret)

if progress_bars is not None:
progress_bars.update(pbar_index, len(data))
pending_parts = progress_bars.dec_usage(str(src))

if pending_parts == 0:
progress_bars.return_task_bar(pbar_index)
progress_bars.update_total_progress(1)
progress_bars.write(f"Copied {src}")

if (
dest is not None
and parts_by_source is not None
and upload_id is not None
):
end_upload(
dest=dest,
upload_id=upload_id,
parts=list(parts_by_source[src]),
)

if pending_parts == 0:
return ret
except:
if progress_bars is not None:
progress_bars.return_task_bar(pbar_index)
progress_bars.update_total_progress(1)
progress_bars.write(f"Copied {src}")

if (
dest is not None
and parts_by_source is not None
and upload_id is not None
):
end_upload(
dest=dest,
upload_id=upload_id,
parts=list(parts_by_source[src]),
)

return ret

raise


def end_upload(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="latch",
version="v2.49.0",
version="v2.49.1",
author_email="[email protected]",
description="The Latch SDK",
packages=find_packages(),
Expand Down

0 comments on commit b976b31

Please sign in to comment.