Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
Signed-off-by: Ayush Kamat <[email protected]>
  • Loading branch information
ayushkamat committed Aug 14, 2024
1 parent f03abb4 commit 6831b21
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 15 deletions.
16 changes: 13 additions & 3 deletions latch/ldata/_transfer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def upload(
verbose: bool,
create_parents: bool = False,
cores: Optional[int] = None,
chunk_size_mib: Optional[int] = None,
) -> UploadResult:
src_path = Path(src)
if not src_path.exists():
Expand Down Expand Up @@ -168,6 +169,7 @@ def upload(
url_generation_bar,
throttle,
latency_q,
chunk_size_mib,
)
)

Expand Down Expand Up @@ -247,7 +249,9 @@ def upload(
pbar_index = progress_bars.get_free_task_bar_index()

start = time.monotonic()
res = start_upload(src_path, normalized)
res = start_upload(
src_path, normalized, chunk_size_mib=chunk_size_mib
)

if res is not None:
progress_bars.set(
Expand Down Expand Up @@ -304,6 +308,7 @@ def start_upload(
progress_bars: Optional[ProgressBars] = None,
throttle: Optional[Throttle] = None,
latency_q: Optional["LatencyQueueType"] = None,
chunk_size_mib: Optional[int] = None,
) -> Optional[StartUploadReturnType]:
if not src.exists():
raise ValueError(f"could not find {src}: no such file or link")
Expand All @@ -330,12 +335,17 @@ def start_upload(
" upload size (5TiB)",
)

if chunk_size_mib is None:
chunk_size = latch_constants.file_chunk_size
else:
chunk_size = chunk_size_mib * Units.MiB

part_count = min(
latch_constants.maximum_upload_parts,
math.ceil(file_size / latch_constants.file_chunk_size),
math.ceil(file_size / chunk_size),
)
part_size = max(
latch_constants.file_chunk_size,
chunk_size,
math.ceil(file_size / latch_constants.maximum_upload_parts),
)

Expand Down
2 changes: 1 addition & 1 deletion latch_cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class LatchConstants:
nextflow_latest_version: str = "v1.1.7"

file_max_size: int = 4 * Units.MiB
file_chunk_size: int = 256 * Units.MiB
file_chunk_size: int = 64 * Units.MiB

# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
maximum_upload_parts = 10000
Expand Down
7 changes: 7 additions & 0 deletions latch_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,11 @@ def get_executions():
help="Manually specify number of cores to parallelize over",
type=int,
)
@click.option(
"--chunk-size-mib",
help="Manually specify the upload chunk size in MiB. Must be >= 5",
type=int,
)
@requires_login
def cp(
src: List[str],
Expand All @@ -726,6 +731,7 @@ def cp(
verbose: bool,
no_glob: bool,
cores: Optional[int] = None,
chunk_size_mib: Optional[int] = None,
):
"""
Copy files between Latch Data and local, or between two Latch Data locations.
Expand All @@ -744,6 +750,7 @@ def cp(
verbose=verbose,
expand_globs=not no_glob,
cores=cores,
chunk_size_mib=chunk_size_mib,
)


Expand Down
16 changes: 15 additions & 1 deletion latch_cli/services/cp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ def cp(
verbose: bool,
expand_globs: bool,
cores: Optional[int] = None,
chunk_size_mib: Optional[int] = None,
):
if chunk_size_mib is not None and chunk_size_mib < 5:
click.secho(
"The chunk size specified by --chunk-size-mib must be at least 5. You"
f" provided `{chunk_size_mib}`",
fg="red",
)
raise click.exceptions.Exit(1)

dest_remote = is_remote_path(dest)

for src in srcs:
Expand All @@ -63,7 +72,12 @@ def cp(
if progress != Progress.none:
click.secho(f"Uploading {src}", fg="blue")
res = _upload(
src, dest, progress=progress, verbose=verbose, cores=cores
src,
dest,
progress=progress,
verbose=verbose,
cores=cores,
chunk_size_mib=chunk_size_mib,
)
if progress != Progress.none:
click.echo(dedent(f"""
Expand Down
40 changes: 30 additions & 10 deletions latch_cli/tinyrequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __exit__(self, type, value, tb):
self._resp.close()


_cache: Dict[str, HTTPSConnection] = {}


def _req(
method: str,
url: str,
Expand All @@ -88,16 +91,33 @@ def _req(
body = _json.dumps(json)
headers["Content-Type"] = "application/json"

conn = HTTPSConnection(
parts.hostname, parts.port if parts.port is not None else 443, timeout=90
)
conn.request(
method,
urlunparse(parts._replace(scheme="", netloc="")),
headers=headers,
body=body,
)
resp = conn.getresponse()
port = parts.port if parts.port is not None else 443
key = f"{parts.hostname}:{port}"

# ayush: this is not threadsafe (as in the connection could be created
# multiple times) but its probably fine
if _cache.get(key) is None:
_cache[key] = HTTPSConnection(parts.hostname, port, timeout=90)

retries = 3
while True:
conn = _cache[key]

try:
conn.request(
method,
urlunparse(parts._replace(scheme="", netloc="")),
headers=headers,
body=body,
)
resp = conn.getresponse()
break
except ConnectionError as e:
_cache[key] = HTTPSConnection(parts.hostname, port, timeout=90)

retries += 1
if retries > 3:
raise e

return TinyResponse(resp, url, stream=stream)

Expand Down

0 comments on commit 6831b21

Please sign in to comment.