diff --git a/CHANGELOG.md b/CHANGELOG.md index cd98f331..4f9242f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,13 @@ Types of changes # Latch SDK Changelog +## 2.50.1 - 2024-08-05 + +### Added + +* Nextflow + - Report workdir size on workflow completion + ## 2.50.0 - 2024-08-02 ### Added diff --git a/latch/executions.py b/latch/executions.py index 44e958aa..f74db43a 100644 --- a/latch/executions.py +++ b/latch/executions.py @@ -121,16 +121,38 @@ def add_execution_results(results: List[str]): execute( gql.gql(""" - mutation addExecutionResults( - $argToken: String!, - $argPaths: [String]! + mutation addExecutionResults( + $argToken: String!, + $argPaths: [String]! + ) { + executionInfoMetadataPublishResults( + input: {argToken: $argToken, argPaths: $argPaths} ) { - executionInfoMetadataPublishResults( - input: {argToken: $argToken, argPaths: $argPaths} - ) { - clientMutationId - } + clientMutationId } - """), + } + """), {"argToken": token, "argPaths": results}, ) + + +def report_nextflow_used_storage(used_bytes: int): + token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID") + if token is None: + return + + execute( + gql.gql(""" + mutation updateNextflowStorageSize( + $argToken: String!, + $argUsedStorageBytes: BigInt! + ) { + nfExecutionInfoUpdateUsedStorageBytes( + input: {argToken: $argToken, argUsedStorageBytes: $argUsedStorageBytes} + ) { + clientMutationId + } + } + """), + {"argToken": token, "argUsedStorageBytes": used_bytes}, + ) diff --git a/latch_cli/nextflow/workflow.py b/latch_cli/nextflow/workflow.py index 704dd92a..8cffc1af 100644 --- a/latch_cli/nextflow/workflow.py +++ b/latch_cli/nextflow/workflow.py @@ -31,6 +31,7 @@ from latch.types.file import LatchFile from latch.types.directory import LatchDir, LatchOutputDir from latch.ldata.path import LPath +from latch.executions import report_nextflow_used_storage from latch_cli.nextflow.workflow import get_flag from latch_cli.nextflow.utils import _get_execution_name from latch_cli.utils import urljoins @@ -155,6 +156,27 @@ def nextflow_runtime(pvc_name: str, {param_signature}) -> None: print(f"Uploading .nextflow.log to {{remote.path}}") remote.upload_from(nextflow_log) + print("Computing size of workdir... ", end="") + try: + result = subprocess.run( + ['du', '-sb', str(shared_dir)], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=5 * 60 + ) + + size = int(result.stdout.split()[0]) + report_nextflow_used_storage(size) + print(f"Done. Workdir size: {{size / 1024 / 1024 / 1024: .2f}} GiB") + except subprocess.TimeoutExpired: + print("Failed to compute storage size: Operation timed out after 5 minutes.") + except subprocess.CalledProcessError as e: + print(f"Failed to compute storage size: {{e.stderr}}") + except Exception as e: + print(f"Failed to compute storage size: {{e}}") + if failed: sys.exit(1) diff --git a/setup.py b/setup.py index 5fda16d1..7ff42560 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="latch", - version="v2.50.0", + version="v2.50.1", author_email="kenny@latch.bio", description="The Latch SDK", packages=find_packages(),