Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldesai1 committed Feb 17, 2024
1 parent 0eb5042 commit bc2a1f3
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 264 deletions.
7 changes: 1 addition & 6 deletions latch/ldata/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
from enum import Enum
from typing import Dict, List, TypedDict

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

import gql
import graphql.language as l
from latch_sdk_gql.execute import execute
Expand Down Expand Up @@ -179,7 +174,7 @@ def get_node_metadata(node_id: str) -> NodeMetadata:

return NodeMetadata(
id=node_id,
size=data["ldataObjectMeta"]["contentSize"],
size=int(data["ldataObjectMeta"]["contentSize"]),
content_type=data["ldataObjectMeta"]["contentType"],
)

Expand Down
149 changes: 26 additions & 123 deletions latch/ldata/path.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import io
from pathlib import Path
from typing import Generator, Optional, Union
from urllib.parse import urljoin
from uuid import uuid4

import gql
from gql.transport.exceptions import TransportQueryError
from latch_sdk_config.latch import NUCLEUS_URL
from latch_sdk_gql.execute import execute

Expand All @@ -16,11 +15,11 @@
get_node_metadata,
get_node_perms,
)
from latch.ldata.transfer import download, upload
from latch.types.json import JsonValue
from latch.ldata.transfer import download, remote_copy, upload
from latch.ldata.transfer.progress import Progress
from latch_cli.tinyrequests import post
from latch_cli.utils import get_auth_header, urljoins
from latch_cli.utils.path import get_name_from_path, get_path_error, is_remote_path
from latch_cli.utils.path import is_remote_path


class LPath:
Expand Down Expand Up @@ -53,7 +52,11 @@ def type(self) -> LDataNodeType:
return get_node_data(self.path).data[self.path].type

def is_dir(self) -> bool:
return self.type is LDataNodeType.dir
return self.type in {
LDataNodeType.dir,
LDataNodeType.account_root,
LDataNodeType.mount,
}

@property
def size(self) -> float:
Expand All @@ -66,6 +69,8 @@ def content_type(self) -> str:
return metadata.content_type

def iterdir(self) -> Generator[Path, None, None]:
if not self.is_dir():
raise ValueError(f"Not a directory: {self.path}")
data = execute(
gql.gql("""
query LDataChildren($argPath: String!) {
Expand Down Expand Up @@ -103,124 +108,21 @@ def rmr(self) -> None:
)

def copy(self, dst: Union["LPath", str]) -> None:
dst = str(dst)
node_data = get_node_data(self.path, dst, allow_resolve_to_parent=True)
remote_copy(self.path, str(dst))

src_data = node_data.data[self.path]
dst_data = node_data.data[dst]
acc_id = node_data.acc_id
def upload(self, src: Path, progress=Progress.tasks, verbose=False) -> None:
upload(src, self.path, progress, verbose)

path_by_id = {v.id: k for k, v in node_data.data.items()}
def download(
self, dst: Optional[Path] = None, progress=Progress.tasks, verbose=False
) -> Path:
if dst is None:
dir = Path(".") / "downloads" / str(uuid4())
dir.mkdir(parents=True, exist_ok=True)
dst = dir / self.name

if src_data.is_parent:
raise FileNotFoundError(get_path_error(self.path, "not found", acc_id))

new_name = None
if dst_data.is_parent:
new_name = get_name_from_path(dst)
elif dst_data.type in {LDataNodeType.obj, LDataNodeType.link}:
raise FileExistsError(
get_path_error(dst, "object already exists at path.", acc_id)
)

try:
execute(
gql.gql("""
mutation Copy(
$argSrcNode: BigInt!
$argDstParent: BigInt!
$argNewName: String
) {
ldataCopy(
input: {
argSrcNode: $argSrcNode
argDstParent: $argDstParent
argNewName: $argNewName
}
) {
clientMutationId
}
}"""),
{
"argSrcNode": src_data.id,
"argDstParent": dst_data.id,
"argNewName": new_name,
},
)
except TransportQueryError as e:
if e.errors is None or len(e.errors) == 0:
raise e

msg: str = e.errors[0]["message"]

if msg.startswith("Permission denied on node"):
node_id = msg.rsplit(" ", 1)[1]
path = path_by_id[node_id]

raise ValueError(get_path_error(path, "permission denied.", acc_id))
elif msg == "Refusing to make node its own parent":
raise ValueError(
get_path_error(dst, f"is a parent of {self.path}.", acc_id)
)
elif msg == "Refusing to parent node to an object node":
raise ValueError(get_path_error(dst, f"object exists at path.", acc_id))
elif msg == "Refusing to move a share link (or into a share link)":
raise ValueError(
get_path_error(
self.path if src_data.type is LDataNodeType.link else dst,
f"is a share link.",
acc_id,
)
)
elif msg.startswith("Refusing to copy account root"):
raise ValueError(
get_path_error(self.path, "is an account root.", acc_id)
)
elif msg.startswith("Refusing to copy removed node"):
raise ValueError(get_path_error(self.path, "not found.", acc_id))
elif msg.startswith("Refusing to copy already in-transit node"):
raise ValueError(
get_path_error(self.path, "copy already in progress.", acc_id)
)
elif msg == "Conflicting object in destination":
raise ValueError(get_path_error(dst, "object exists at path.", acc_id))

raise ValueError(get_path_error(self.path, str(e), acc_id))

def download(self, dst: Optional[Union[Path, io.IOBase]]) -> Optional[Path]:
# todo: perform different actions depending on dst type
return download(
self.path,
dst,
)

def read_bytes(self) -> bytes:
# todo: implement
pass

def read_text(self) -> str:
# todo: implement
pass

def read_json(self) -> JsonValue:
# todo: implement
pass

def read_chunks(self, chunk_size: int) -> Generator[bytes, None, None]:
# todo: implement
pass

def read_lines(self):
# todo: implement
pass

def read_at(self, offset: int, amount: int) -> bytes:
# todo: implement
pass

def upload(self, src: Union[Path, io.IOBase, bytes, JsonValue]) -> str:
# todo: implement
pass
download(self.path, dst, progress, verbose, confirm_overwrite=False)
return dst

@property
def perms(self) -> LDataPerms:
Expand Down Expand Up @@ -264,5 +166,6 @@ def __truediv__(self, other: Union[Path, str]) -> "LPath":


if __name__ == "__main__":
# add tests here
pass
# tests
file_path = LPath("latch://24030.account/test_dir/B.txt")
file_path.share_with("[email protected]", PermLevel.VIEWER)
1 change: 1 addition & 0 deletions latch/ldata/transfer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from latch.ldata.transfer.download import download
from latch.ldata.transfer.remote_copy import remote_copy
from latch.ldata.transfer.upload import upload
53 changes: 25 additions & 28 deletions latch/ldata/transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from concurrent.futures import ProcessPoolExecutor
from contextlib import closing
from dataclasses import dataclass
from http.client import HTTPException
from itertools import repeat
from pathlib import Path
from textwrap import dedent
from typing import Dict, List, Set, TypedDict

import click
Expand Down Expand Up @@ -34,25 +36,24 @@ class DownloadJob:


def download(
src: str, dest: Path, progress: Progress = Progress.tasks, verbose: bool = False
):
src: str,
dest: Path,
progress: Progress,
verbose: bool,
confirm_overwrite: bool = True,
) -> None:
if not dest.parent.exists():
click.secho(
raise ValueError(
f"Invalid copy destination {dest}. Parent directory {dest.parent} does not"
" exist.",
fg="red",
" exist."
)
raise click.exceptions.Exit(1)

normalized = normalize_path(src)
try:
data = get_node_data(src)
except FileNotFoundError as e:
click.echo(str(e))
raise click.exceptions.Exit(1) from e
data = get_node_data(src)

node_data = data.data[src]
click.secho(f"Downloading {node_data.name}", fg="blue")
if progress != Progress.none:
click.secho(f"Downloading {node_data.name}", fg="blue")

can_have_children = node_data.type in {
LDataNodeType.account_root,
Expand All @@ -70,14 +71,11 @@ def download(
headers={"Authorization": get_auth_header()},
json={"path": normalized},
)

if res.status_code != 200:
click.secho(
raise HTTPException(
f"failed to fetch presigned url(s) for path {src} with code"
f" {res.status_code}: {res.json()['error']}",
fg="red",
f" {res.status_code}: {res.json()['error']}"
)
raise click.exceptions.Exit(1)

json_data = res.json()
if can_have_children:
Expand All @@ -89,11 +87,9 @@ def download(
try:
dest.mkdir(exist_ok=True)
except FileNotFoundError as e:
click.secho(f"No such download destination {dest}", fg="red")
raise click.exceptions.Exit(1) from e
raise ValueError(f"No such download destination {dest}")
except (FileExistsError, NotADirectoryError) as e:
click.secho(f"Download destination {dest} is not a directory", fg="red")
raise click.exceptions.Exit(1) from e
raise ValueError(f"Download destination {dest} is not a directory")

unconfirmed_jobs: List[DownloadJob] = []
confirmed_jobs: List[DownloadJob] = []
Expand All @@ -116,14 +112,17 @@ def download(
job.dest.parent.mkdir(parents=True, exist_ok=True)
confirmed_jobs.append(job)
except FileExistsError:
if click.confirm(
if confirm_overwrite and click.confirm(
f"A file already exists at {job.dest.parent}. Overwrite?",
default=False,
):
job.dest.parent.unlink()
job.dest.parent.mkdir(parents=True, exist_ok=True)
confirmed_jobs.append(job)
else:
click.secho(
f"Skipping {job.dest.parent}, file already exists", fg="yellow"
)
rejected_jobs.add(job.dest.parent)

num_files = len(confirmed_jobs)
Expand Down Expand Up @@ -193,12 +192,10 @@ def download(

total_time = end - start

click.echo(
f"""{click.style("Download Complete", fg="green")}
{click.style("Time Elapsed: ", fg="blue")}{human_readable_time(total_time)}
{click.style("Files Downloaded: ", fg="blue")}{num_files} ({with_si_suffix(total_bytes)})"""
)
click.echo(dedent(f"""{click.style("Download Complete", fg="green")}
{click.style("Time Elapsed: ", fg="blue")}{human_readable_time(total_time)}
{click.style("Files Downloaded: ", fg="blue")}{num_files} ({with_si_suffix(total_bytes)})
"""))


# dest will always be a path which includes the copied file as its leaf
Expand Down
Loading

0 comments on commit bc2a1f3

Please sign in to comment.