Skip to content

Commit

Permalink
implement caching
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldesai1 committed Feb 18, 2024
1 parent dca9c49 commit 9c6d7c3
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 215 deletions.
260 changes: 151 additions & 109 deletions latch/ldata/path.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Generator, Optional, Type, Union
from urllib.parse import urljoin
from uuid import uuid4
from typing import Generator, Optional, Type

import gql
from flytekit import (
Expand All @@ -14,78 +14,141 @@
Scalar,
)
from flytekit.extend import TypeEngine, TypeTransformer
from latch_sdk_config.latch import NUCLEUS_URL
from latch_sdk_gql.execute import execute

from latch.ldata.node import (
LDataNodeType,
LDataPerms,
PermLevel,
get_node_data,
get_node_metadata,
get_node_perms,
)
from latch.ldata.transfer import download, remote_copy, upload
from latch.ldata.transfer.download import download
from latch.ldata.transfer.node import LDataNodeType
from latch.ldata.transfer.progress import Progress
from latch_cli.tinyrequests import post
from latch_cli.utils import get_auth_header, urljoins
from latch_cli.utils.path import is_remote_path
from latch.ldata.transfer.remote_copy import remote_copy
from latch.ldata.transfer.upload import upload
from latch_cli.utils import urljoins

node_id_regex = re.compile(r"^latch://(?P<id>[0-9]+)\.node$")

dir_types = {
LDataNodeType.dir,
LDataNodeType.account_root,
LDataNodeType.mount,
}


@dataclass
class _Cache:
"""Internal cache class to organize information for a `LPath`."""

path: Optional[str] = None
node_id: Optional[str] = None
name: Optional[str] = None
type: Optional[LDataNodeType] = None
size: Optional[int] = None
content_type: Optional[str] = None


@dataclass
class LPath:
_cache: _Cache = field(
default_factory=lambda: _Cache(),
init=False,
repr=False,
hash=False,
compare=False,
)

path: str

def __init__(self, path: str):
if not is_remote_path(path):
if not path.startswith("latch://"):
raise ValueError(f"Invalid LPath: {path} is not a Latch path")
self.path = path
self._node_id = None

@property
def node_id(self) -> str:
if self._node_id is None:
self._node_id = get_node_data(self.path).data[self.path].id
return self._node_id

@property
def exists(self) -> bool:
try:
node_data = get_node_data(self.path).data[self.path]
except FileNotFoundError:
return False
return not node_data.removed

@property
def name(self) -> str:
return get_node_data(self.path).data[self.path].name

@property
def type(self) -> LDataNodeType:
return get_node_data(self.path).data[self.path].type
self._download_idx = 0

def load(self):
"""(Re-)populate this LPath's instance's cache.
Future calls to most getters will return immediately without making a network request.
Always makes a network request.
"""
data = execute(
gql.gql("""
query GetNodeData($path: String!) {
ldataResolvePathToNode(path: {}) {
path
ldataNode {
finalLinkTarget {
id
name
type
removed
ldataObjectMeta {
contentSize
contentType
}
}
}
}
}"""),
{"path": self.path},
)["ldataResolvePathToNode"]

self._cache.path = self.path

final_link_target = data["ldataNode"]["finalLinkTarget"]
self._cache.node_id = final_link_target["id"]
self._cache.name = final_link_target["name"]
self._cache.type = LDataNodeType(final_link_target["type"].lower())
self._cache.size = int(final_link_target["ldataObjectMeta"]["contentSize"])
self._cache.content_type = final_link_target["ldataObjectMeta"]["contentType"]

def node_id(self, *, load_if_missing: bool = True) -> str:
match = node_id_regex.match(self.path)
if match:
self._node_id = match.group("id")

if self._cache.node_id is None or self._cache.path != self.path:
if not load_if_missing:
return None
self.load()
return self._cache.node_id

def name(self, *, load_if_missing: bool = True) -> str:
if self._cache.name is None or self._cache.path != self.path:
if not load_if_missing:
return None
self.load()
return self._cache.name

def type(self, *, load_if_missing: bool = True) -> LDataNodeType:
if self._cache.type is None or self._cache.path != self.path:
if not load_if_missing:
return None
self.load()
return self._cache.type

def size(self, *, load_if_missing: bool = True) -> float:
if self._cache.size is None or self._cache.path != self.path:
if not load_if_missing:
return None
self.load()
return self._cache.size

def content_type(self, *, load_if_missing: bool = True) -> str:
if self._cache.content_type is None or self._cache.path != self.path:
if not load_if_missing:
return None
self.load()
return self._cache.content_type

def is_dir(self) -> bool:
return self.type in {
LDataNodeType.dir,
LDataNodeType.account_root,
LDataNodeType.mount,
}

@property
def size(self) -> float:
metadata = get_node_metadata(self.node_id)
return metadata.size

@property
def content_type(self) -> str:
metadata = get_node_metadata(self.node_id)
return metadata.content_type

def iterdir(self) -> Generator[Path, None, None]:
if not self.is_dir():
raise ValueError(f"Not a directory: {self.path}")
return self.type() in dir_types

def iterdir(self) -> Generator["LPath", None, None]:
data = execute(
gql.gql("""
query LDataChildren($argPath: String!) {
ldataResolvePathData(argPath: $argPath) {
finalLinkTarget {
type
childLdataTreeEdges(filter: { child: { removed: { equalTo: false } } }) {
nodes {
child {
Expand All @@ -100,10 +163,12 @@ def iterdir(self) -> Generator[Path, None, None]:
)["ldataResolvePathData"]

if data is None:
raise ValueError(f"No directory found at path: {self.path}")
raise FileNotFoundError(f"No such Latch file or directory: {self.path}")
if data["finalLinkTarget"]["type"].lower() not in dir_types:
raise ValueError(f"{self.path} is not a directory")

for node in data["finalLinkTarget"]["childLdataTreeEdges"]["nodes"]:
yield urljoins(self.path, node["child"]["name"])
yield LPath(urljoins(self.path, node["child"]["name"]))

def rmr(self) -> None:
execute(
Expand All @@ -117,62 +182,39 @@ def rmr(self) -> None:
{"nodeId": self.node_id},
)

def copy(self, dst: Union["LPath", str]) -> None:
remote_copy(self.path, str(dst))
def copy(self, dst: "LPath") -> None:
remote_copy(self.path, dst.path)

def upload(self, src: Path, progress=Progress.tasks, verbose=False) -> None:
upload(src, self.path, progress, verbose)
def upload(self, src: Path, *, show_progress_bar: bool = False) -> None:
upload(
src,
self.path,
progress=Progress.tasks if show_progress_bar else Progress.none,
verbose=show_progress_bar,
)

def download(
self, dst: Optional[Path] = None, progress=Progress.tasks, verbose=False
self, dst: Optional[Path] = None, *, show_progress_bar: bool = False
) -> Path:
if dst is None:
dir = Path(".") / "downloads" / str(uuid4())
dir = Path.home() / "lpath" / str(self._download_idx)
self._download_idx += 1
dir.mkdir(parents=True, exist_ok=True)
dst = dir / self.name

download(self.path, dst, progress, verbose, confirm_overwrite=False)
return dst

@property
def perms(self) -> LDataPerms:
return get_node_perms(self.node_id)

def share_with(self, email: str, perm_level: PermLevel) -> None:
resp = post(
url=urljoin(NUCLEUS_URL, "/ldata/send-share-email"),
json={
"node_id": self.node_id,
"perm_level": str(perm_level),
"receiver_email": email,
},
headers={"Authorization": get_auth_header()},
dst = dir / self.name()

download(
self.path,
dst,
progress=Progress.tasks if show_progress_bar else Progress.none,
verbose=show_progress_bar,
confirm_overwrite=False,
)
resp.raise_for_status()

def _toggle_share_link(self, enable: bool) -> None:
execute(
gql.gql("""
mutation LDataShare($nodeId: BigInt!, $value: Boolean!) {
ldataShare(input: { argNodeId: $nodeId, argValue: $value }) {
clientMutationId
}
}
"""),
{"nodeId": self.node_id, "value": enable},
)

def enable_share_link(self) -> None:
self._toggle_share_link(True)

def disable_share_link(self) -> None:
self._toggle_share_link(False)

def __str__(self) -> str:
return self.path
return dst

def __truediv__(self, other: Union[Path, str]) -> "LPath":
return LPath(f"{Path(self.path) / other}")
def __truediv__(self, other: object) -> "LPath":
if not isinstance(other, (LPath, str)):
return NotImplemented
return LPath(urljoins(self.path, other))


class LPathTransformer(TypeTransformer[LPath]):
Expand Down
3 changes: 0 additions & 3 deletions latch/ldata/transfer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from latch.ldata.transfer.download import download
from latch.ldata.transfer.remote_copy import remote_copy
from latch.ldata.transfer.upload import upload
11 changes: 6 additions & 5 deletions latch/ldata/transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import click
from latch_sdk_config.latch import config as latch_config

from latch.ldata.node import LDataNodeType, get_node_data
from latch.ldata.transfer.manager import TransferStateManager
from latch.ldata.transfer.node import LDataNodeType, get_node_data
from latch.ldata.transfer.progress import Progress, ProgressBars, get_free_index
from latch.ldata.transfer.utils import get_max_workers, human_readable_time
from latch_cli import tinyrequests
Expand Down Expand Up @@ -192,10 +192,11 @@ def download(

total_time = end - start

click.echo(dedent(f"""{click.style("Download Complete", fg="green")}
{click.style("Time Elapsed: ", fg="blue")}{human_readable_time(total_time)}
{click.style("Files Downloaded: ", fg="blue")}{num_files} ({with_si_suffix(total_bytes)})
"""))
if progress != Progress.none:
click.echo(dedent(f"""{click.style("Download Complete", fg="green")}
{click.style("Time Elapsed: ", fg="blue")}{human_readable_time(total_time)}
{click.style("Files Downloaded: ", fg="blue")}{num_files} ({with_si_suffix(total_bytes)})
"""))


# dest will always be a path which includes the copied file as its leaf
Expand Down
Loading

0 comments on commit 9c6d7c3

Please sign in to comment.