Skip to content

Commit

Permalink
Merge pull request #483 from latchbio/rahuldesai1/nf-storage-bytes
Browse files Browse the repository at this point in the history
Report nextflow used_storage_bytes
  • Loading branch information
rahuldesai1 authored Aug 6, 2024
2 parents 5d90fd2 + 203ea4e commit e29d709
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ Types of changes

# Latch SDK Changelog

## 2.50.1 - 2024-08-05

### Added

* Nextflow
- Report workdir size on workflow completion

## 2.50.0 - 2024-08-02

### Added
Expand Down
40 changes: 31 additions & 9 deletions latch/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,38 @@ def add_execution_results(results: List[str]):

execute(
gql.gql("""
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
clientMutationId
}
clientMutationId
}
"""),
}
"""),
{"argToken": token, "argPaths": results},
)


def report_nextflow_used_storage(used_bytes: int):
token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID")
if token is None:
return

execute(
gql.gql("""
mutation updateNextflowStorageSize(
$argToken: String!,
$argUsedStorageBytes: BigInt!
) {
nfExecutionInfoUpdateUsedStorageBytes(
input: {argToken: $argToken, argUsedStorageBytes: $argUsedStorageBytes}
) {
clientMutationId
}
}
"""),
{"argToken": token, "argUsedStorageBytes": used_bytes},
)
22 changes: 22 additions & 0 deletions latch_cli/nextflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from latch.types.file import LatchFile
from latch.types.directory import LatchDir, LatchOutputDir
from latch.ldata.path import LPath
from latch.executions import report_nextflow_used_storage
from latch_cli.nextflow.workflow import get_flag
from latch_cli.nextflow.utils import _get_execution_name
from latch_cli.utils import urljoins
Expand Down Expand Up @@ -155,6 +156,27 @@ def nextflow_runtime(pvc_name: str, {param_signature}) -> None:
print(f"Uploading .nextflow.log to {{remote.path}}")
remote.upload_from(nextflow_log)
print("Computing size of workdir... ", end="")
try:
result = subprocess.run(
['du', '-sb', str(shared_dir)],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5 * 60
)
size = int(result.stdout.split()[0])
report_nextflow_used_storage(size)
print(f"Done. Workdir size: {{size / 1024 / 1024 / 1024: .2f}} GiB")
except subprocess.TimeoutExpired:
print("Failed to compute storage size: Operation timed out after 5 minutes.")
except subprocess.CalledProcessError as e:
print(f"Failed to compute storage size: {{e.stderr}}")
except Exception as e:
print(f"Failed to compute storage size: {{e}}")
if failed:
sys.exit(1)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="latch",
version="v2.50.0",
version="v2.50.1",
author_email="[email protected]",
description="The Latch SDK",
packages=find_packages(),
Expand Down

0 comments on commit e29d709

Please sign in to comment.