-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Galaxy JWD Python script #15618
Add Galaxy JWD Python script #15618
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,347 @@ | ||
#!/usr/bin/env python | ||
# Description: Galaxy jobs's job working directory (JWD) script. Can get you | ||
# the path of a JWD and can delete JWD's of job failed within last X days. | ||
|
||
import argparse | ||
import os | ||
import shutil | ||
import sys | ||
from datetime import datetime | ||
from xml.dom.minidom import parse | ||
import psycopg2 | ||
|
||
|
||
def main(): | ||
""" | ||
JWD script | ||
1. Can get you the path of a JWD | ||
2. Can delete JWD's of job failed within last X days | ||
""" | ||
parser = argparse.ArgumentParser() | ||
subparsers = parser.add_subparsers( | ||
dest="subcommand", | ||
required=True, | ||
title=""" | ||
Use one of the following subcommands: | ||
get_jwd: Get JWD path of a given Galaxy job id | ||
clean_jwds: Clean JWD's of jobs failed within last X days | ||
|
||
The following ENVs (same as gxadmin's) should be set: | ||
GALAXY_CONFIG_FILE: Path to the galaxy.yml file | ||
GALAXY_LOG_DIR: Path to the Galaxy log directory | ||
PGDATABASE: Name of the Galaxy database | ||
PGUSER: Galaxy database user | ||
PGHOST: Galaxy database host | ||
We also need a ~/.pgpass file (same as gxadmin's) in format: | ||
<pg_host>:5432:*:<pg_user>:<pg_password> | ||
|
||
Example: | ||
python galaxy_jwd.py get_jwd 12345678 | ||
python galaxy_jwd.py clean_jwds --dry_run True --days 30 | ||
""", | ||
) | ||
|
||
# Parser for the get_jwd subcommand | ||
get_jwd_parser = subparsers.add_parser("get_jwd", help="Get JWD path of a given Galaxy job id") | ||
get_jwd_parser.add_argument( | ||
"job_id", | ||
help="Galaxy job id", | ||
) | ||
|
||
# Parser for the clean_jwds subcommand | ||
clean_jwds_parser = subparsers.add_parser("clean_jwds", help="Clean JWD's of jobs failed within last X days") | ||
clean_jwds_parser.add_argument( | ||
"--dry_run", | ||
help="If True, do NOT delete JWD's; only print them (default: True)", | ||
default=True, | ||
) | ||
clean_jwds_parser.add_argument( | ||
"--days", | ||
help="Number of days within which the jobs were last updated to be considered for deletion (default: 5)", | ||
default=5, | ||
) | ||
|
||
args = parser.parse_args(args=None if sys.argv[1:] else ["--help"]) | ||
|
||
# Check if environment variables are set | ||
if not os.environ.get("GALAXY_CONFIG_FILE"): | ||
raise ValueError("Please set ENV GALAXY_CONFIG_FILE") | ||
if not os.environ.get("GALAXY_LOG_DIR"): | ||
raise ValueError("Please set ENV GALAXY_LOG_DIR") | ||
if not os.environ.get("PGDATABASE"): | ||
raise ValueError("Please set ENV PGDATABASE") | ||
if not os.environ.get("PGUSER"): | ||
raise ValueError("Please set ENV PGUSER") | ||
if not os.environ.get("PGHOST"): | ||
raise ValueError("Please set ENV PGHOST") | ||
|
||
# Check if ~/.pgpass file exists and is not empty | ||
if not os.path.isfile(os.path.expanduser("~/.pgpass")) or os.stat(os.path.expanduser("~/.pgpass")).st_size == 0: | ||
raise ValueError("Please create a ~/.pgpass file in format: <pg_host>:5432:*:<pg_user>:<pg_password>") | ||
|
||
# Check if the given galaxy.yml file exists | ||
if not os.path.isfile(os.environ.get("GALAXY_CONFIG_FILE")): | ||
raise ValueError(f"The given galaxy.yml file {os.environ.get('GALAXY_CONFIG_FILE')} does not exist") | ||
|
||
# Set variables | ||
galaxy_config_file = os.environ.get("GALAXY_CONFIG_FILE").strip() | ||
galaxy_log_dir = os.environ.get("GALAXY_LOG_DIR").strip() | ||
db_name = os.environ.get("PGDATABASE").strip() | ||
db_user = os.environ.get("PGUSER").strip() | ||
db_host = os.environ.get("PGHOST").strip() | ||
db_password = extract_password_from_pgpass(pgpass_file=os.path.expanduser("~/.pgpass")) | ||
object_store_conf = get_object_store_conf_path(galaxy_config_file) | ||
backends = parse_object_store(object_store_conf) | ||
|
||
# Connect to Galaxy database | ||
db = Database( | ||
dbname=db_name, | ||
dbuser=db_user, | ||
dbhost=db_host, | ||
dbpassword=db_password, | ||
) | ||
|
||
# For the get_jwd subcommand | ||
if args.subcommand == "get_jwd": | ||
job_id = args.job_id | ||
object_store_id = db.get_object_store_id(job_id) | ||
jwd_path = decode_path(job_id, [object_store_id], backends) | ||
|
||
# Check | ||
if jwd_path: | ||
print(jwd_path) | ||
else: | ||
print(f"INFO: Job working directory (of {job_id}) does not exist") | ||
sys.exit(1) | ||
|
||
# For the clean_jwds subcommand | ||
if args.subcommand == "clean_jwds": | ||
# Check if the given Galaxy log directory exists | ||
if not os.path.isdir(galaxy_log_dir): | ||
raise ValueError(f"The given Galaxy log directory {galaxy_log_dir} does not exist") | ||
|
||
# Set variables | ||
dry_run = args.dry_run | ||
days = args.days | ||
jwd_cleanup_log = f"{galaxy_log_dir}/jwd_cleanup" f"_{datetime.now().strftime('%d_%m_%Y-%I_%M_%S')}.log" | ||
failed_jobs = db.get_failed_jobs(days=days) | ||
|
||
# Delete JWD folders if dry_run is False | ||
# Log the folders that will be deleted | ||
if not dry_run: | ||
with open(jwd_cleanup_log, "w") as jwd_log: | ||
jwd_log.write( | ||
"The following job working directories (JWDs) belonging " | ||
"to the failed jobs are deleted\nJob id: JWD path\n" | ||
) | ||
for job_id, metadata in failed_jobs.items(): | ||
# Delete JWD folders older than X days | ||
jwd_path = decode_path(job_id, metadata, backends) | ||
if jwd_path: | ||
jwd_log.write(f"{job_id}: {jwd_path}") | ||
delete_jwd(jwd_path) | ||
else: | ||
# Print JWD folders older than X days | ||
for job_id, metadata in failed_jobs.items(): | ||
jwd_path = decode_path(job_id, metadata, backends) | ||
if jwd_path: | ||
print(f"{job_id}: {jwd_path}") | ||
|
||
|
||
def extract_password_from_pgpass(pgpass_file): | ||
"""Extract the password from the ~/.pgpass file | ||
|
||
The ~/.pgpass file should have the following format: | ||
<pg_host>:5432:*:<pg_user>:<pg_password> | ||
|
||
Args: | ||
pgpass_file (str): Path to the ~/.pgpass file | ||
|
||
Returns: | ||
str: Password for the given pg_host | ||
""" | ||
pgpass_format = "<pg_host>:5432:*:<pg_user>:<pg_password>" | ||
with open(pgpass_file, "r") as pgpass: | ||
for line in pgpass: | ||
if line.startswith(os.environ.get("PGHOST")): | ||
return line.split(":")[4].strip() | ||
else: | ||
raise ValueError( | ||
f"Please add the password for '{os.environ.get('PGHOST')}' to the ~/.pgpass file in format: {pgpass_format} " | ||
) | ||
|
||
|
||
def get_object_store_conf_path(galaxy_config_file): | ||
"""Get the path to the object_store_conf.xml file | ||
|
||
Args: | ||
galaxy_config_file (str): Path to the galaxy.yml file | ||
|
||
Returns: | ||
str: Path to the object_store_conf.xml file | ||
""" | ||
object_store_conf = "" | ||
with open(galaxy_config_file, "r") as config: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The config file is a yaml file, please parse it as such. |
||
for line in config: | ||
if line.strip().startswith("object_store_config_file"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The object store file doesn't have to be an xml file, there are different ways to parse them. |
||
object_store_conf = line.split(":")[1].strip() | ||
|
||
# Check if the object_store_conf.xml file exists | ||
if not os.path.isfile(object_store_conf): | ||
raise ValueError(f"{object_store_conf} does not exist") | ||
|
||
return object_store_conf | ||
|
||
|
||
def parse_object_store(object_store_conf): | ||
"""Get the path of type 'job_work' from the extra_dir's for each backend | ||
|
||
Args: | ||
object_store_conf (str): Path to the object_store_conf.xml file | ||
|
||
Returns: | ||
dict: Dictionary of backend id and path of type 'job_work' | ||
""" | ||
dom = parse(object_store_conf) | ||
backends = {} | ||
for backend in dom.getElementsByTagName("backend"): | ||
backend_id = backend.getAttribute("id") | ||
backends[backend_id] = {} | ||
# Get the extra_dir's path for each backend if type is "job_work" | ||
for extra_dir in backend.getElementsByTagName("extra_dir"): | ||
if extra_dir.getAttribute("type") == "job_work": | ||
backends[backend_id] = extra_dir.getAttribute("path") | ||
return backends | ||
|
||
|
||
def decode_path(job_id, metadata, backends_dict): | ||
"""Decode the path of JWD's and check if the path exists | ||
|
||
Args: | ||
job_id (int): Job id | ||
metadata (list): List of object_store_id and update_time | ||
backends_dict (dict): Dictionary of backend id and path of type 'job_work' | ||
|
||
Returns: | ||
str: Path to the JWD | ||
""" | ||
job_id = str(job_id) | ||
|
||
# Check if object_store_id exists in our object store config | ||
if metadata[0] not in backends_dict.keys(): | ||
raise ValueError(f"Object store id '{metadata[0]}' does not exist in the object_store_conf.xml file") | ||
|
||
jwd_path = f"{backends_dict[metadata[0]]}/0{job_id[0:2]}/{job_id[2:5]}/{job_id}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's very brittle, I think you just want to load up the object store itself. |
||
|
||
# Validate that the path is a JWD | ||
# It is a JWD if the following conditions are true: | ||
# 1. Check if tool_script.sh exists | ||
# 2. Check if directories 'inputs', and 'outputs' exist | ||
# 3. Additionally, we can also try and find the file '__instrument_core_epoch_end' | ||
# and compare the timestamp in that with the 'update_time' (metadata[1]) of the job. | ||
if ( | ||
os.path.exists(jwd_path) | ||
and os.path.exists(f"{jwd_path}/tool_script.sh") | ||
and os.path.exists(f"{jwd_path}/inputs") | ||
and os.path.exists(f"{jwd_path}/outputs") | ||
): | ||
return jwd_path | ||
else: | ||
return None | ||
|
||
|
||
def delete_jwd(jwd_path): | ||
"""Delete JWD folder and all its contents | ||
|
||
Args: | ||
jwd_path (str): Path to the JWD folder | ||
""" | ||
try: | ||
shutil.rmtree(jwd_path) | ||
except OSError as e: | ||
print(f"Error deleting JWD: {jwd_path} : {e.strerror}") | ||
|
||
|
||
class Database: | ||
"""Class to connect to the database and query DB | ||
|
||
Args: | ||
dbname (str): Name of the database | ||
dbuser (str): Name of the database user | ||
dbhost (str): Hostname of the database | ||
dbpassword (str): Password of the database user | ||
""" | ||
|
||
def __init__(self, dbname, dbuser, dbhost, dbpassword): | ||
try: | ||
self.conn = psycopg2.connect(dbname=dbname, user=dbuser, host=dbhost, password=dbpassword) | ||
except psycopg2.OperationalError as e: | ||
print(f"Unable to connect to database: {e}") | ||
|
||
def get_failed_jobs(self, days): | ||
"""Get failed jobs for DB | ||
|
||
Args: | ||
days (int): Number of days to look back for failed jobs | ||
|
||
Returns: | ||
dict: Dictionary with job_id as key and object_store_id, and update_time as list of values | ||
|
||
|
||
""" | ||
cur = self.conn.cursor() | ||
cur.execute( | ||
f""" | ||
SELECT id, object_store_id, update_time | ||
FROM job | ||
WHERE state = 'error' | ||
AND update_time IS NOT NULL | ||
AND object_store_id IS NOT NULL | ||
AND update_time > NOW() - INTERVAL '{days} days' | ||
""" | ||
) | ||
failed_jobs = cur.fetchall() | ||
cur.close() | ||
self.conn.close() | ||
|
||
# Create a dictionary with job_id as key and object_store_id, and update_time as values | ||
failed_jobs_dict = {} | ||
for job_id, object_store_id, update_time in failed_jobs: | ||
failed_jobs_dict[job_id] = [object_store_id, update_time] | ||
|
||
if not failed_jobs_dict: | ||
print(f"No failed jobs found within the last {days} days") | ||
sys.exit(1) | ||
|
||
return failed_jobs_dict | ||
|
||
def get_object_store_id(self, job_id): | ||
"""Get object_store_id for a job id | ||
|
||
Args: | ||
job_id (int): Job id | ||
|
||
Returns: | ||
object_store_id (str): Object store id | ||
""" | ||
cur = self.conn.cursor() | ||
cur.execute( | ||
f""" | ||
SELECT object_store_id | ||
FROM job | ||
WHERE id = '{job_id}' AND object_store_id IS NOT NULL | ||
""" | ||
) | ||
object_store_id = cur.fetchone()[0] | ||
cur.close() | ||
self.conn.close() | ||
|
||
if not object_store_id: | ||
print(f"Job id {job_id} not found in the database") | ||
sys.exit(1) | ||
|
||
return object_store_id | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use sqlalchemy please ?