From 6831b213a9ded465f24f2c438e3175c63b707855 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 14 Aug 2024 13:12:55 -0700 Subject: [PATCH] init Signed-off-by: Ayush Kamat --- latch/ldata/_transfer/upload.py | 16 ++++++++++--- latch_cli/constants.py | 2 +- latch_cli/main.py | 7 ++++++ latch_cli/services/cp/main.py | 16 ++++++++++++- latch_cli/tinyrequests.py | 40 ++++++++++++++++++++++++--------- 5 files changed, 66 insertions(+), 15 deletions(-) diff --git a/latch/ldata/_transfer/upload.py b/latch/ldata/_transfer/upload.py index 5d42f117..bc87f335 100644 --- a/latch/ldata/_transfer/upload.py +++ b/latch/ldata/_transfer/upload.py @@ -59,6 +59,7 @@ def upload( verbose: bool, create_parents: bool = False, cores: Optional[int] = None, + chunk_size_mib: Optional[int] = None, ) -> UploadResult: src_path = Path(src) if not src_path.exists(): @@ -168,6 +169,7 @@ def upload( url_generation_bar, throttle, latency_q, + chunk_size_mib, ) ) @@ -247,7 +249,9 @@ def upload( pbar_index = progress_bars.get_free_task_bar_index() start = time.monotonic() - res = start_upload(src_path, normalized) + res = start_upload( + src_path, normalized, chunk_size_mib=chunk_size_mib + ) if res is not None: progress_bars.set( @@ -304,6 +308,7 @@ def start_upload( progress_bars: Optional[ProgressBars] = None, throttle: Optional[Throttle] = None, latency_q: Optional["LatencyQueueType"] = None, + chunk_size_mib: Optional[int] = None, ) -> Optional[StartUploadReturnType]: if not src.exists(): raise ValueError(f"could not find {src}: no such file or link") @@ -330,12 +335,17 @@ def start_upload( " upload size (5TiB)", ) + if chunk_size_mib is None: + chunk_size = latch_constants.file_chunk_size + else: + chunk_size = chunk_size_mib * Units.MiB + part_count = min( latch_constants.maximum_upload_parts, - math.ceil(file_size / latch_constants.file_chunk_size), + math.ceil(file_size / chunk_size), ) part_size = max( - latch_constants.file_chunk_size, + chunk_size, math.ceil(file_size / latch_constants.maximum_upload_parts), ) diff --git a/latch_cli/constants.py b/latch_cli/constants.py index 8907c437..9a768f0f 100644 --- a/latch_cli/constants.py +++ b/latch_cli/constants.py @@ -25,7 +25,7 @@ class LatchConstants: nextflow_latest_version: str = "v1.1.7" file_max_size: int = 4 * Units.MiB - file_chunk_size: int = 256 * Units.MiB + file_chunk_size: int = 64 * Units.MiB # https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html maximum_upload_parts = 10000 diff --git a/latch_cli/main.py b/latch_cli/main.py index 578411f9..9767ea47 100644 --- a/latch_cli/main.py +++ b/latch_cli/main.py @@ -718,6 +718,11 @@ def get_executions(): help="Manually specify number of cores to parallelize over", type=int, ) +@click.option( + "--chunk-size-mib", + help="Manually specify the upload chunk size in MiB. Must be >= 5", + type=int, +) @requires_login def cp( src: List[str], @@ -726,6 +731,7 @@ def cp( verbose: bool, no_glob: bool, cores: Optional[int] = None, + chunk_size_mib: Optional[int] = None, ): """ Copy files between Latch Data and local, or between two Latch Data locations. @@ -744,6 +750,7 @@ def cp( verbose=verbose, expand_globs=not no_glob, cores=cores, + chunk_size_mib=chunk_size_mib, ) diff --git a/latch_cli/services/cp/main.py b/latch_cli/services/cp/main.py index 82c6c8c6..4fbbcb09 100644 --- a/latch_cli/services/cp/main.py +++ b/latch_cli/services/cp/main.py @@ -44,7 +44,16 @@ def cp( verbose: bool, expand_globs: bool, cores: Optional[int] = None, + chunk_size_mib: Optional[int] = None, ): + if chunk_size_mib is not None and chunk_size_mib < 5: + click.secho( + "The chunk size specified by --chunk-size-mib must be at least 5. You" + f" provided `{chunk_size_mib}`", + fg="red", + ) + raise click.exceptions.Exit(1) + dest_remote = is_remote_path(dest) for src in srcs: @@ -63,7 +72,12 @@ def cp( if progress != Progress.none: click.secho(f"Uploading {src}", fg="blue") res = _upload( - src, dest, progress=progress, verbose=verbose, cores=cores + src, + dest, + progress=progress, + verbose=verbose, + cores=cores, + chunk_size_mib=chunk_size_mib, ) if progress != Progress.none: click.echo(dedent(f""" diff --git a/latch_cli/tinyrequests.py b/latch_cli/tinyrequests.py index 2b1c46b6..f787dcd2 100644 --- a/latch_cli/tinyrequests.py +++ b/latch_cli/tinyrequests.py @@ -67,6 +67,9 @@ def __exit__(self, type, value, tb): self._resp.close() +_cache: Dict[str, HTTPSConnection] = {} + + def _req( method: str, url: str, @@ -88,16 +91,33 @@ def _req( body = _json.dumps(json) headers["Content-Type"] = "application/json" - conn = HTTPSConnection( - parts.hostname, parts.port if parts.port is not None else 443, timeout=90 - ) - conn.request( - method, - urlunparse(parts._replace(scheme="", netloc="")), - headers=headers, - body=body, - ) - resp = conn.getresponse() + port = parts.port if parts.port is not None else 443 + key = f"{parts.hostname}:{port}" + + # ayush: this is not threadsafe (as in the connection could be created + # multiple times) but its probably fine + if _cache.get(key) is None: + _cache[key] = HTTPSConnection(parts.hostname, port, timeout=90) + + retries = 3 + while True: + conn = _cache[key] + + try: + conn.request( + method, + urlunparse(parts._replace(scheme="", netloc="")), + headers=headers, + body=body, + ) + resp = conn.getresponse() + break + except ConnectionError as e: + _cache[key] = HTTPSConnection(parts.hostname, port, timeout=90) + + retries += 1 + if retries > 3: + raise e return TinyResponse(resp, url, stream=stream)