Skip to content

Commit

Permalink
dur
Browse files Browse the repository at this point in the history
Signed-off-by: Ayush Kamat <[email protected]>
  • Loading branch information
ayushkamat committed Feb 15, 2024
1 parent 4b26ef4 commit 333fe99
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 7 deletions.
6 changes: 4 additions & 2 deletions latch_cli/extras/nextflow/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def build_from_nextflow_dag(wf: NextflowWorkflow):
var = "res"

for o in dep.outputNames:
if edge.label.endswith(o):
parts = edge.label.split(".")
if parts[-1] == o:
param_name = var = o
break

Expand Down Expand Up @@ -419,6 +420,7 @@ def generate_nf_entrypoint(
import stat
import subprocess
import sys
import time
import traceback
import typing
from dataclasses import asdict, dataclass, fields, is_dataclass
Expand All @@ -429,7 +431,7 @@ def generate_nf_entrypoint(
from typing import Dict, List, NamedTuple
from flytekit.extras.persistence import LatchPersistence
from latch_cli.extras.nextflow.file_persistence import download_files, upload_files
from latch_cli.extras.nextflow.file_persistence import download_files, stage_for_output, upload_files
from latch_cli.utils import check_exists_and_rename, get_parameter_json_value, urljoins
from latch.resources.tasks import custom_task
Expand Down
21 changes: 21 additions & 0 deletions latch_cli/extras/nextflow/file_persistence.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Union

Expand Down Expand Up @@ -158,3 +159,23 @@ def upload_files(channels: Dict[str, List[JSONValue]], outdir: LatchDir):
lp.upload_directory(local, remote)

click.echo("Done.")


def stage_for_output(channels: List[List[JSONValue]]) -> Dict[str, List[JSONValue]]:
old: List[Path] = []
for channel in channels:
for param in channel:
_extract_paths(param, old)

new = []
for local_path in old:
new_path = Path("/root/outputs") / local_path.name

if local_path.is_dir():
shutil.copytree(local_path, new_path)
else:
shutil.copy(local_path, new_path)

new.append({"path": str(new_path)})

return {"output": new}
5 changes: 2 additions & 3 deletions latch_cli/extras/nextflow/tasks/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def get_fn_code(self, nf_script_path_in_container: Path):
code_block += reindent(
f"""
if wf_{k} is not None:
{k}_p = Path(default.wf_{k}).resolve()
{k}_p = Path(wf_{k}).resolve()
{k}_dest_p = Path({repr(v)}).resolve()
check_exists_and_rename(
Expand All @@ -129,12 +129,11 @@ def get_fn_code(self, nf_script_path_in_container: Path):
)
""",
1,
2,
)

code_block += reindent(
rf"""
channel_vals = [{", ".join([f"json.loads({x})" for x in self.channel_inputs])}]
download_files(channel_vals, LatchDir({repr(self.wf.output_directory.remote_path)}))
Expand Down
13 changes: 13 additions & 0 deletions latch_cli/extras/nextflow/tasks/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,18 @@ def get_fn_code(self, nf_script_path_in_container: Path):
code_block = self.get_fn_interface()
code_block += self.get_fn_conditions()

code_block += reindent(
rf"""
if cond:
channel_vals = [{", ".join([f"json.loads({x})" for x in self.channel_inputs])}]
download_files(channel_vals, LatchDir({repr(self.wf.output_directory.remote_path)}))
staged = stage_for_output(channel_vals)
upload_files(staged, LatchDir({repr(self.wf.output_directory.remote_path)}))
""",
1,
)

code_block += self.get_fn_return_stmt()
return code_block
4 changes: 3 additions & 1 deletion latch_cli/extras/nextflow/tasks/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def get_fn_code(self, nf_script_path_in_container: Path):
for k, v in self.wf.downloadable_params.items():
code_block += reindent(
f"""
if wf_{k} is not None:
if default.wf_{k} is not None:
{k}_p = Path(default.wf_{k}).resolve()
{k}_dest_p = Path({repr(v)}).resolve()
Expand Down Expand Up @@ -163,6 +163,8 @@ def get_fn_code(self, nf_script_path_in_container: Path):
for file in files:
out_channels[file.stem] = file.read_text()
print(out_channels)
upload_files({{k: json.loads(v) for k, v in out_channels.items()}}, LatchDir({repr(self.wf.output_directory.remote_path)}))
""",
Expand Down
2 changes: 1 addition & 1 deletion latch_cli/services/cp/remote_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def remote_copy(
elif msg.startswith("Refusing to copy already in-transit node"):
click.echo(get_path_error(src, "copy already in progress.", acc_id))
raise click.exceptions.Exit(1) from e
elif msg == "Conflicting object in destination":
elif msg == "Conflicting node in destination":
click.echo(get_path_error(dest, "object exists at path.", acc_id))
raise click.exceptions.Exit(1) from e

Expand Down

0 comments on commit 333fe99

Please sign in to comment.