Skip to content

Commit

Permalink
Merge pull request #494 from latchbio/ayush/cp-conn-reuse
Browse files Browse the repository at this point in the history
switch to session
  • Loading branch information
ayushkamat authored Oct 2, 2024
2 parents 7abc4ab + 469ba35 commit e39509d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 23 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Types of changes

# Latch SDK Changelog

## v2.53.2 - 2024-10-02

### Changed

* All requests done by `latch cp` now
- use connection pooling / reuses
- have automatic retries with backoff

## v2.53.1 - 2024-10-02

### Added
Expand Down
9 changes: 4 additions & 5 deletions latch/ldata/_transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
from latch_sdk_config.latch import config as latch_config

from latch.ldata.type import LDataNodeType
from latch_cli import tinyrequests
from latch_cli.constants import Units
from latch_cli.utils import get_auth_header, human_readable_time, with_si_suffix
from latch_cli.utils.path import normalize_path

from .manager import TransferStateManager
from .node import get_node_data
from .progress import Progress, ProgressBars, get_free_index
from .utils import get_max_workers
from .utils import get_max_workers, http_session


class GetSignedUrlData(TypedDict):
Expand All @@ -39,7 +38,7 @@ class DownloadJob:
class DownloadResult:
num_files: int
total_bytes: int
total_time: int
total_time: float


def download(
Expand Down Expand Up @@ -73,7 +72,7 @@ def download(
else:
endpoint = latch_config.api.data.get_signed_url

res = tinyrequests.post(
res = http_session.post(
endpoint,
headers={"Authorization": get_auth_header()},
json={"path": normalized},
Expand Down Expand Up @@ -212,7 +211,7 @@ def download_file(
) -> int:
# todo(ayush): benchmark parallelized downloads using the range header
with open(job.dest, "wb") as f:
res = tinyrequests.get(job.signed_url, stream=True)
res = http_session.get(job.signed_url, stream=True)
if res.status_code != 200:
raise RuntimeError(
f"failed to download {job.dest.name}: {res.status_code}:"
Expand Down
13 changes: 4 additions & 9 deletions latch/ldata/_transfer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing_extensions import TypeAlias

from latch.ldata.type import LatchPathError, LDataNodeType
from latch_cli import tinyrequests
from latch_cli.constants import Units, latch_constants
from latch_cli.utils import get_auth_header, urljoins, with_si_suffix
from latch_cli.utils.path import normalize_path
Expand All @@ -25,7 +24,7 @@
from .node import get_node_data
from .progress import Progress, ProgressBars
from .throttle import Throttle
from .utils import get_max_workers
from .utils import get_max_workers, http_session

if TYPE_CHECKING:
PathQueueType: TypeAlias = "Queue[Optional[Path]]"
Expand Down Expand Up @@ -299,9 +298,6 @@ class StartUploadReturnType:
dest: str


MAX_RETRIES = 5


def start_upload(
src: Path,
dest: str,
Expand Down Expand Up @@ -353,15 +349,14 @@ def start_upload(
time.sleep(throttle.get_delay())

start = time.monotonic()
res = tinyrequests.post(
res = http_session.post(
latch_config.api.data.start_upload,
headers={"Authorization": get_auth_header()},
json={
"path": dest,
"content_type": content_type,
"part_count": part_count,
},
num_retries=MAX_RETRIES,
)
end = time.monotonic()

Expand Down Expand Up @@ -412,7 +407,7 @@ def upload_file_chunk(
f.seek(part_size * part_index)
data = f.read(part_size)

res = tinyrequests.put(url, data=data)
res = http_session.put(url, data=data)
if res.status_code != 200:
raise HTTPException(
f"failed to upload part {part_index} of {src}: {res.status_code}"
Expand Down Expand Up @@ -467,7 +462,7 @@ def end_upload(
parts: List[CompletedPart],
progress_bars: Optional[ProgressBars] = None,
):
res = tinyrequests.post(
res = http_session.post(
latch_config.api.data.end_upload,
headers={"Authorization": get_auth_header()},
json={
Expand Down
22 changes: 19 additions & 3 deletions latch/ldata/_transfer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@
import time
from typing import Any, Dict, Optional

from gql.gql import DocumentNode
import requests
import requests.adapters
from gql.transport.exceptions import TransportClosed, TransportServerError
from graphql.language import DocumentNode
from latch_sdk_gql import JsonValue
from latch_sdk_gql.execute import execute

from latch_cli import tinyrequests
http_session = requests.Session()

_adapter = requests.adapters.HTTPAdapter(
max_retries=requests.adapters.Retry(
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
allowed_methods=["GET", "PUT", "POST"],
)
)
http_session.mount("https://", _adapter)
http_session.mount("http://", _adapter)


# todo(rahul): move this function into latch_sdk_gql.execute
Expand All @@ -21,6 +33,7 @@ def query_with_retry(
Send GraphQL query request. Retry on Server or Connection failures.
Implements exponential backoff between retries
"""
err = None
attempt = 0
while attempt < num_retries:
attempt += 1
Expand All @@ -34,7 +47,10 @@ def query_with_retry(
# todo(rahul): tune the sleep interval based on the startup time of the vacuole
time.sleep(2**attempt * 5)

raise err
if err is not None:
raise err

raise RuntimeError("gql retries exceeded")


def get_max_workers() -> int:
Expand Down
5 changes: 0 additions & 5 deletions latch_cli/tinyrequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ def _req(

port = parts.port if parts.port is not None else 443

# ayush: this is not threadsafe (as in the connection could be created
# multiple times) but its probably fine

# todo(rteqs): removed caching single connections, implement a connection pool instead.

retries = 3
while True:
conn = HTTPSConnection(parts.hostname, port, timeout=90)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="latch",
version="v2.53.1",
version="v2.53.2",
author_email="[email protected]",
description="The Latch SDK",
packages=find_packages(),
Expand Down

0 comments on commit e39509d

Please sign in to comment.