Skip to content

Commit

Permalink
add output_shortcuts to latch metadata for nextflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldesai1 committed Jul 18, 2024
1 parent 4a7ce3d commit cb34381
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 48 deletions.
31 changes: 30 additions & 1 deletion latch/types/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from latch_cli.snakemake.config.utils import validate_snakemake_type
from latch_cli.utils import identifier_suffix_from_str

from .directory import LatchDir
from .directory import LatchDir, LatchOutputDir
from .file import LatchFile


Expand Down Expand Up @@ -477,6 +477,18 @@ class SnakemakeFileMetadata:
"""


@dataclass
class ShortcutPath:
display_name: str
"""
Name of shortcut as it appears in the Latch Console
"""
path: Path
"""
Sub-path to expose
"""


@dataclass
class NextflowParameter(Generic[T], LatchParameter):
type: Optional[Type[T]] = None
Expand All @@ -501,8 +513,25 @@ class NextflowParameter(Generic[T], LatchParameter):
Should return the path of the constructed samplesheet. If samplesheet_type is also specified, this takes precedence.
Only used if the provided parameter is a samplesheet (samplesheet=True)
"""
shortcut_paths: Optional[List[ShortcutPath]] = None
"""
Output paths that will be exposed in the UI as shortcuts. Should be used to
expose important workflow outputs to the user.
Only valid for LatchDir type
"""

def __post_init__(self):
if self.shortcut_paths is not None and self.type not in {
LatchDir,
LatchOutputDir,
}:
click.secho(
"Shortcut paths can only be defined for LatchDir parameters.",
fg="red",
)
raise click.exceptions.Exit(1)

if not self.samplesheet or self.samplesheet_constructor is not None:
return

Expand Down
142 changes: 95 additions & 47 deletions latch_cli/nextflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from enum import Enum
from pathlib import Path
from textwrap import dedent
from typing import Any, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click

import latch.types.metadata as metadata
from latch.types.directory import LatchDir, LatchOutputDir
from latch.types.file import LatchFile
from latch.types.metadata import NextflowParameter
from latch_cli.snakemake.config.utils import get_preamble, type_repr
from latch_cli.snakemake.utils import reindent
from latch_cli.utils import identifier_from_str, urljoins
Expand Down Expand Up @@ -67,59 +68,60 @@ def initialize() -> str:
@nextflow_runtime_task(cpu={cpu}, memory={memory}, storage_gib={storage_gib})
def nextflow_runtime(pvc_name: str, {param_signature}) -> None:
try:
shared_dir = Path("/nf-workdir")
shared_dir = Path("/nf-workdir")
{output_shortcuts}
{samplesheet_constructors}
ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]
shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)
ignore_list = [
"latch",
".latch",
".git",
"nextflow",
".nextflow",
"work",
"results",
"miniconda",
"anaconda3",
"mambaforge",
]
shutil.copytree(
Path("/root"),
shared_dir,
ignore=lambda src, names: ignore_list,
ignore_dangling_symlinks=True,
dirs_exist_ok=True,
)
profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])
if len(profile_list) == 0:
profile_list.append("standard")
profiles = ','.join(profile_list)
cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
profile_list = {execution_profile}
if {configurable_profiles}:
profile_list.extend([p.value for p in execution_profiles])
if len(profile_list) == 0:
profile_list.append("standard")
profiles = ','.join(profile_list)
cmd = [
"/root/nextflow",
"run",
str(shared_dir / "{nf_script}"),
"-work-dir",
str(shared_dir),
"-profile",
profiles,
"-c",
"latch.config",
"-resume",
{params_to_flags}
]
]
print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)
print("Launching Nextflow Runtime")
print(' '.join(cmd))
print(flush=True)
try:
env = {{
**os.environ,
"NXF_HOME": "/root/.nextflow",
Expand Down Expand Up @@ -207,6 +209,51 @@ def generate_nextflow_config(pkg_root: Path):
click.secho(f"Nextflow Latch config written to {config_path}", fg="green")


def get_output_shortcuts(parameters: Dict[str, NextflowParameter]) -> str:
output_shortcuts = [
(name, shortcut)
for name, param in parameters.items()
if param.shortcut_paths is not None
for shortcut in param.shortcut_paths
]

if len(output_shortcuts) == 0:
return ""

code_block = "output_shortcuts = []\n"

for name, shortcut in output_shortcuts:
code_block += dedent(f"""\
output_shortcuts.append({{
'display_name': '{shortcut.display_name}',
'shortcut': os.path.join({name}.remote_path, '{str(shortcut.path).lstrip("/")}'),
}})
""")

code_block += dedent(f"""\
metadata = {{"output_shortcuts": output_shortcuts}}
execute(
gql.gql(\"""
mutation UpdateNfExecutionMetadata(
$argToken: String!,
$argMetadata: JSON!
) {{
updateNfExecutionMetadata(
input: {{argToken: $argToken, argMetadata: $argMetadata}}
) {{
clientMutationId
}}
}}
\"""),
{{"argMetadata": metadata}},
)
""")

return code_block


def generate_nextflow_workflow(
pkg_root: Path,
metadata_root: Path,
Expand Down Expand Up @@ -342,6 +389,7 @@ def generate_nextflow_workflow(
execution_profile=(
execution_profile.split(",") if execution_profile is not None else []
),
output_shortcuts=reindent(get_output_shortcuts(parameters), 1),
configurable_profiles=len(profile_options) > 0,
preambles="\n\n".join(list(preambles)),
samplesheet_funs="\n".join(samplesheet_funs),
Expand Down
2 changes: 2 additions & 0 deletions latch_cli/snakemake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def load_snakemake_metadata(pkg_root: Path, metadata_root: Path) -> Optional[Pat

# todo(maximsmol): use a stateful writer that keeps track of indent level
def reindent(x: str, level: int) -> str:
if len(x) == 0:
return x
if x[0] == "\n":
x = x[1:]
return textwrap.indent(textwrap.dedent(x), " " * level)

0 comments on commit cb34381

Please sign in to comment.