Skip to content

Commit

Permalink
Add cleanup job working directory as celery task
Browse files Browse the repository at this point in the history
See the inital PR: galaxyproject#15618
  • Loading branch information
sanjaysrikakulam committed Mar 16, 2023
1 parent 1dc25d1 commit 3b792e9
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
from concurrent.futures import TimeoutError
import datetime
from functools import lru_cache
import os
from pathlib import Path
import shutil
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -34,6 +37,7 @@
from galaxy.managers.tool_data import ToolDataImportManager
from galaxy.metadata.set_metadata import set_metadata_portable
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.objectstore import BaseObjectStore
from galaxy.schema.tasks import (
ComputeDatasetHashTaskRequest,
GenerateHistoryContentDownload,
Expand Down Expand Up @@ -389,3 +393,53 @@ def prune_history_audit_table(sa_session: galaxy_scoped_session):
def cleanup_short_term_storage(storage_monitor: ShortTermStorageMonitor):
"""Cleanup short term storage."""
storage_monitor.cleanup()


@galaxy_task(action="clean up job working directories")
def cleanup_jwds(sa_session: galaxy_scoped_session, object_store: BaseObjectStore):
"""Cleanup job working directories for failed jobs that are older than X days"""
def get_failed_jobs():
failed_jobs = {}
jobs = sa_session.query(model.Job).filter(
model.Job.state == "error", model.Job.update_time > datetime.datetime.now() - datetime.timedelta(days=days), model.Job.object_store_id is not None
)

for job in jobs:
failed_jobs[job.id] = object_store.get_filename(job, base_dir="job_work", dir_only=True, obj_dir=True)

return failed_jobs

def delete_jwd(jwd_path):
try:
# Validate that the path is a JWD
# It is a JWD if the following conditions are true:
# 1. Check if tool_script.sh exists
# 2. Check if directories 'inputs', and 'outputs' exist
if (
os.path.exists(jwd_path)
and os.path.exists(f"{jwd_path}/tool_script.sh")
and os.path.exists(f"{jwd_path}/inputs")
and os.path.exists(f"{jwd_path}/outputs")
):
shutil.rmtree(jwd_path)
except OSError as e:
log.error(f"Error deleting job working directory: {jwd_path} : {e.strerror}")

# days should be converted to a config option
days = 5
galaxy_log_dir = "/var/log/galaxy"
log_file_name = f"jwds_cleanup_{datetime.datetime.now().strftime('%d_%m_%Y-%I_%M_%S')}.log"

failed_jobs = get_failed_jobs()

if not failed_jobs:
log.info("No failed jobs found within the last %s days", days)

with open(f"{galaxy_log_dir}/{log_file_name}", "w") as jwd_log:
jwd_log.write(
"The following job working directories (JWDs) belonging "
"to the failed jobs are deleted\nJob id: JWD path\n"
)
for job_id, job_working_directory in failed_jobs.items():
delete_jwd(job_working_directory)
jwd_log.write(f"{job_id}: {job_working_directory}")

0 comments on commit 3b792e9

Please sign in to comment.