diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index d452a29dfd80..15f6c989af18 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -1,7 +1,10 @@ import json from concurrent.futures import TimeoutError +import datetime from functools import lru_cache +import os from pathlib import Path +import shutil from typing import ( Any, Callable, @@ -34,6 +37,7 @@ from galaxy.managers.tool_data import ToolDataImportManager from galaxy.metadata.set_metadata import set_metadata_portable from galaxy.model.scoped_session import galaxy_scoped_session +from galaxy.objectstore import BaseObjectStore from galaxy.schema.tasks import ( ComputeDatasetHashTaskRequest, GenerateHistoryContentDownload, @@ -389,3 +393,53 @@ def prune_history_audit_table(sa_session: galaxy_scoped_session): def cleanup_short_term_storage(storage_monitor: ShortTermStorageMonitor): """Cleanup short term storage.""" storage_monitor.cleanup() + + +@galaxy_task(action="clean up job working directories") +def cleanup_jwds(sa_session: galaxy_scoped_session, object_store: BaseObjectStore): + """Cleanup job working directories for failed jobs that are older than X days""" + def get_failed_jobs(): + failed_jobs = {} + jobs = sa_session.query(model.Job).filter( + model.Job.state == "error", model.Job.update_time > datetime.datetime.now() - datetime.timedelta(days=days), model.Job.object_store_id is not None + ) + + for job in jobs: + failed_jobs[job.id] = object_store.get_filename(job, base_dir="job_work", dir_only=True, obj_dir=True) + + return failed_jobs + + def delete_jwd(jwd_path): + try: + # Validate that the path is a JWD + # It is a JWD if the following conditions are true: + # 1. Check if tool_script.sh exists + # 2. Check if directories 'inputs', and 'outputs' exist + if ( + os.path.exists(jwd_path) + and os.path.exists(f"{jwd_path}/tool_script.sh") + and os.path.exists(f"{jwd_path}/inputs") + and os.path.exists(f"{jwd_path}/outputs") + ): + shutil.rmtree(jwd_path) + except OSError as e: + log.error(f"Error deleting job working directory: {jwd_path} : {e.strerror}") + + # days should be converted to a config option + days = 5 + galaxy_log_dir = "/var/log/galaxy" + log_file_name = f"jwds_cleanup_{datetime.datetime.now().strftime('%d_%m_%Y-%I_%M_%S')}.log" + + failed_jobs = get_failed_jobs() + + if not failed_jobs: + log.info("No failed jobs found within the last %s days", days) + + with open(f"{galaxy_log_dir}/{log_file_name}", "w") as jwd_log: + jwd_log.write( + "The following job working directories (JWDs) belonging " + "to the failed jobs are deleted\nJob id: JWD path\n" + ) + for job_id, job_working_directory in failed_jobs.items(): + delete_jwd(job_working_directory) + jwd_log.write(f"{job_id}: {job_working_directory}")