Skip to content

Commit

Permalink
update storage bytes at end of workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldesai1 committed Aug 3, 2024
1 parent 5d90fd2 commit c9b096d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
40 changes: 31 additions & 9 deletions latch/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,38 @@ def add_execution_results(results: List[str]):

execute(
gql.gql("""
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
mutation addExecutionResults(
$argToken: String!,
$argPaths: [String]!
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
executionInfoMetadataPublishResults(
input: {argToken: $argToken, argPaths: $argPaths}
) {
clientMutationId
}
clientMutationId
}
"""),
}
"""),
{"argToken": token, "argPaths": results},
)


def report_nextflow_used_storage(used_bytes: int):
token = os.environ.get("FLYTE_INTERNAL_EXECUTION_ID", None)
if token is None:
return

execute(
gql.gql("""
mutation updateNextflowStorageSize(
$argToken: String!,
$argUsedStorageBytes: BigInt!
) {
nfExecutionInfoUpdateUsedStorageBytes(
input: {argToken: $argToken, argUsedStorageBytes: $argUsedStorageBytes}
) {
clientMutationId
}
}
"""),
{"argToken": token, "argUsedStorageBytes": used_bytes},
)
21 changes: 21 additions & 0 deletions latch_cli/nextflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from latch.types.file import LatchFile
from latch.types.directory import LatchDir, LatchOutputDir
from latch.ldata.path import LPath
from latch.executions import report_nextflow_used_storage
from latch_cli.nextflow.workflow import get_flag
from latch_cli.nextflow.utils import _get_execution_name
from latch_cli.utils import urljoins
Expand Down Expand Up @@ -155,6 +156,26 @@ def nextflow_runtime(pvc_name: str, {param_signature}) -> None:
print(f"Uploading .nextflow.log to {{remote.path}}")
remote.upload_from(nextflow_log)
print("Computing size of workdir... ", end="")
try:
result = subprocess.run(
['du', '-sb', str(shared_dir)],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5 * 60
)
size = int(result.stdout.split()[0])
report_nextflow_used_storage(size)
except subprocess.TimeoutExpired:
print("Failed to compute storage size: Operation timed out after 5 minutes.")
except subprocess.CalledProcessError as e:
print(f"Failed to compute storage size: {{e.stderr}}")
except Exception as e:
print(f"Failed to compute storage size: {{e}}")
if failed:
sys.exit(1)
Expand Down

0 comments on commit c9b096d

Please sign in to comment.