Skip to content

Commit

Permalink
Add basic-sequence-qc and begin implementing QC filtering (#10)
Browse files Browse the repository at this point in the history
* Add basic-sequence-qc and begin implementing QC filtering

* Implement input sequence quantity-based QC filter
  • Loading branch information
dfornika authored Aug 31, 2023
1 parent 0173c26 commit ee7ba4e
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 10 deletions.
2 changes: 2 additions & 0 deletions auto_cpo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def main():

scan_start_timestamp = datetime.datetime.now()
for run in core.scan(config):
if quit_when_safe:
exit(0)
if run is not None:
try:
config = auto_cpo.config.load_config(args.config)
Expand Down
134 changes: 124 additions & 10 deletions auto_cpo/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import csv
import datetime
import glob
import json
Expand Down Expand Up @@ -114,6 +115,34 @@ def check_analysis_dependencies_complete(pipeline: dict[str, object], analysis:
return all_dependencies_complete


def get_library_fastq_paths(fastq_input_dir: str):
"""
Get the paths to all of the fastq files in a directory.
param: fastq_input_dir: Path to a directory containing fastq files.
type: fastq_input_dir: str
return: Paths to R1 and R2 fastq files, indexed by library ID. Keys of the dict are library IDs, values are dicts with keys: ['ID', 'R1', 'R2'].
rtype: dict[str, dict[str, str]]
"""
fastq_paths_by_library_id = {}
for fastq_file in glob.glob(os.path.join(fastq_input_dir, '*.f*q.gz')):
fastq_file_basename = os.path.basename(fastq_file)
fastq_file_abspath = os.path.abspath(fastq_file)
fastq_file_basename_parts = fastq_file_basename.split('_')
library_id = fastq_file_basename_parts[0]
if library_id not in fastq_paths_by_library_id:
fastq_paths_by_library_id[library_id] = {
'ID': library_id,
'R1': None,
'R2': None,
}
if '_R1' in fastq_file_basename:
fastq_paths_by_library_id[library_id]['R1'] = fastq_file_abspath
elif '_R2' in fastq_file_basename:
fastq_paths_by_library_id[library_id]['R2'] = fastq_file_abspath

return fastq_paths_by_library_id


def analyze_run(config: dict[str, object], run: dict[str, object], assembly_mode: str):
"""
Initiate an analysis on one directory of fastq files. We assume that the directory of fastq files is named using
Expand All @@ -127,14 +156,20 @@ def analyze_run(config: dict[str, object], run: dict[str, object], assembly_mode
:param config:
:type config: dict[str, object]
:param analysis:
:type analysis: dict[str, object]
:param run: Dictionary describing the run to be analyzed. Keys: ['run_id', 'fastq_directory', 'instrument_type', 'analysis_parameters']
:type run: dict[str, object]
:assembly_mode: Whether to run the short or long assembly pipeline.
:type assembly_mode: str
:return: None
:rtype: NoneType
"""
base_analysis_outdir = config['analysis_output_dir']
base_analysis_work_dir = config['analysis_work_dir']
analysis_run_id = run['run_id']
analysis_run_output_dir = os.path.abspath(os.path.join(base_analysis_outdir, analysis_run_id, assembly_mode))

no_value_flags_by_pipeline_name = {
"BCCDC-PHL/basic-sequence-qc": [],
"BCCDC-PHL/taxon-abundance": [],
"BCCDC-PHL/routine-assembly": ["hybrid", "long_only", "bakta", "prokka"],
"BCCDC-PHL/mlst-nf": [],
Expand All @@ -151,22 +186,21 @@ def analyze_run(config: dict[str, object], run: dict[str, object], assembly_mode
pipeline_short_name = pipeline['pipeline_name'].split('/')[1]
pipeline_minor_version = ''.join(pipeline['pipeline_version'].rsplit('.', 1)[0])

if pipeline['pipeline_name'] == 'BCCDC-PHL/routine-assembly' and assembly_mode == "short":
if pipeline['pipeline_name'] == 'BCCDC-PHL/basic-sequence-qc' and assembly_mode == "short":
pipeline['pipeline_parameters']['prefix'] = analysis_run_id
elif pipeline['pipeline_name'] == 'BCCDC-PHL/routine-assembly' and assembly_mode == "short":
pipeline['pipeline_parameters'].pop('fastq_input_long', None)
pipeline['pipeline_parameters']['samplesheet_input'] = os.path.join(analysis_run_output_dir, 'samplesheets', analysis_run_id + '_routine-assembly_samplesheet.csv')
elif pipeline['pipeline_name'] == 'BCCDC-PHL/mlst-nf':
analysis_run_id = os.path.basename(run['analysis_parameters']['fastq_input'])
stashed_fastq_input = run['analysis_parameters'].pop('fastq_input', None)
analysis_run_output_dir = os.path.abspath(os.path.join(base_analysis_outdir, analysis_run_id, assembly_mode))
assemblies_dir = os.path.join(analysis_run_output_dir, 'assemblies')
run['analysis_parameters']['assembly_input'] = assemblies_dir
elif pipeline['pipeline_name'] == 'BCCDC-PHL/plasmid-screen':
analysis_run_id = os.path.basename(run['analysis_parameters']['fastq_input'])
pipeline['pipeline_parameters']['samplesheet_input'] = os.path.join(analysis_run_output_dir, 'samplesheets', analysis_run_id + '_plasmid-screen_samplesheet.csv')
analysis_run_output_dir = os.path.abspath(os.path.join(base_analysis_outdir, analysis_run_id, assembly_mode))
assemblies_dir = os.path.join(analysis_run_output_dir, 'assemblies')
run['analysis_parameters']['assembly_input'] = assemblies_dir
else:
analysis_run_id = os.path.basename(run['analysis_parameters']['fastq_input'])
analysis_run_output_dir = os.path.abspath(os.path.join(base_analysis_outdir, analysis_run_id, assembly_mode))

analysis_output_dir_name = '-'.join([pipeline_short_name, pipeline_minor_version, 'output'])
analysis_pipeline_output_dir = os.path.abspath(os.path.join(analysis_run_output_dir, analysis_output_dir_name))
Expand Down Expand Up @@ -247,14 +281,94 @@ def analyze_run(config: dict[str, object], run: dict[str, object], assembly_mode
logging.info(json.dumps({"event_type": "analysis_completed", "sequencing_run_id": analysis_run_id, "pipeline_command": " ".join(pipeline_command)}))
shutil.rmtree(analysis_work_dir, ignore_errors=True)
logging.info(json.dumps({"event_type": "analysis_work_dir_deleted", "sequencing_run_id": analysis_run_id, "analysis_work_dir_path": analysis_work_dir}))
if pipeline['pipeline_name'] == 'BCCDC-PHL/routine-assembly':
if pipeline['pipeline_name'] == 'BCCDC-PHL/basic-sequence-qc':
samplesheets_dir = os.path.join(analysis_run_output_dir, "samplesheets")
os.makedirs(samplesheets_dir, exist_ok=True)
qc_summary_path = os.path.join(analysis_run_output_dir, analysis_run_id + "_auto-cpo_qc_summary.csv")
basic_sequence_qc_csv_path = os.path.join(analysis_pipeline_output_dir, analysis_run_id + '_basic_qc_stats.csv')
basic_sequence_qc = {}
if os.path.exists(basic_sequence_qc_csv_path):
total_bases_by_library = {}
with open(basic_sequence_qc_csv_path, 'r') as f:
reader = csv.DictReader(f, dialect='unix')
for row in reader:
library_id = row['sample_id']
total_bases_input = 0
try:
total_bases_input = int(row['total_bases_before_filtering'])
except ValueError as e:
pass
total_bases_by_library[library_id] = {
"sequencing_run_id": analysis_run_id,
"library_id": library_id,
"total_bases_input": total_bases_input,
}
qc_pass_fail = "FAIL"
if total_bases_input > config['qc_filters']['minimum_total_bases_input']:
qc_pass_fail = "WARN"
if total_bases_input > config['qc_filters']['warning_total_bases_input']:
qc_pass_fail = "PASS"
total_bases_by_library[library_id]['total_bases_input_pass_fail'] = qc_pass_fail
with open(qc_summary_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=['sequencing_run_id', 'library_id', 'total_bases_input', 'total_bases_input_pass_fail'], dialect='unix', quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for library_id, library_info in total_bases_by_library.items():
writer.writerow(library_info)
logging.info(json.dumps({"event_type": "qc_summary_written", "sequencing_run_id": analysis_run_id, "qc_summary_path": qc_summary_path}))
library_fastq_paths = get_library_fastq_paths(run['analysis_parameters']['fastq_input'])
assembly_samplesheet_path = os.path.join(samplesheets_dir, analysis_run_id + '_routine-assembly_samplesheet.csv')
with open(assembly_samplesheet_path, 'w') as f:
output_fieldnames = ['ID', 'R1', 'R2']
writer = csv.DictWriter(f, fieldnames=output_fieldnames, dialect='unix', quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for library_id, library_qc in total_bases_by_library.items():
pass_fail = library_qc['total_bases_input_pass_fail']
if pass_fail == "PASS" or pass_fail == "WARN":
writer.writerow(library_fastq_paths[library_id])
logging.info(json.dumps({
"event_type": "samplesheet_written",
"sequencing_run_id": analysis_run_id,
"pipeline_name": "BCCDC-PHL/routine-assembly",
"samplesheet_path": assembly_samplesheet_path,
}))
elif pipeline['pipeline_name'] == 'BCCDC-PHL/routine-assembly':
assemblies_dir = os.path.join(analysis_run_output_dir, "assemblies")
os.makedirs(assemblies_dir, exist_ok=True)
for assembly in glob.glob(os.path.join(analysis_pipeline_output_dir, '*', '*.fa')):
src = assembly
dest = os.path.join(assemblies_dir, os.path.basename(assembly))
os.symlink(src, dest)
logging.info(json.dumps({"event_type": "assemblies_symlinked", "sequencing_run_id": analysis_run_id, "assembly_analysis_output_dir": analysis_pipeline_output_dir, "assemblies_symlink_dir": assemblies_dir}))
logging.info(json.dumps({
"event_type": "assemblies_symlinked",
"sequencing_run_id": analysis_run_id,
"assembly_analysis_output_dir": analysis_pipeline_output_dir,
"assemblies_symlink_dir": assemblies_dir
}))
assembly_samplesheet_path = os.path.join(samplesheets_dir, analysis_run_id + '_routine-assembly_samplesheet.csv')
plasmid_screen_samplesheet_path = os.path.join(samplesheets_dir, analysis_run_id + '_plasmid-screen_samplesheet.csv')
plasmid_screen_samplesheet = []
with open(assembly_samplesheet_path, 'r') as f:
reader = csv.DictReader(f, dialect='unix')
for row in reader:
plasmid_screen_samplesheet.append(row)
assembly_paths = list(map(lambda x: os.path.abspath(x), glob.glob(os.path.join(assemblies_dir, '*.fa'))))
for record in plasmid_screen_samplesheet:
for assembly_path in assembly_paths:
assembly_basename = os.path.basename(assembly_path)
assembly_library_id = assembly_basename.split('_')[0]
if record['ID'] == assembly_library_id:
record['ASSEMBLY'] = assembly_path
with open(plasmid_screen_samplesheet_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=['ID', 'R1', 'R2', 'ASSEMBLY'], dialect='unix', quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for record in plasmid_screen_samplesheet:
writer.writerow(record)
logging.info(json.dumps({
"event_type": "samplesheet_written",
"sequencing_run_id": analysis_run_id,
"pipeline_name": "BCCDC-PHL/plasmid-screen",
"samplesheet_path": plasmid_screen_samplesheet_path
}))
elif pipeline['pipeline_name'] == 'BCCDC-PHL/mlst-nf':
run['analysis_parameters']['fastq_input'] = stashed_fastq_input
except subprocess.CalledProcessError as e:
Expand Down
89 changes: 89 additions & 0 deletions config-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"fastq_by_run_dir": "/path/to/fastq_symlinks_by_run",
"analysis_output_dir": "/path/to/analysis_by_run",
"analysis_work_dir": "/path/to/scratch/work-auto-cpo",
"notification_email_addresses": [
"[email protected]"
],
"send_notification_emails": false,
"scan_interval_seconds": 60,
"analyze_runs_in_reverse_order": true,
"qc_filters":{
"minimum_total_bases_input": 50000000,
"warning_total_bases_input": 100000000
},
"pipelines": [
{
"pipeline_name": "BCCDC-PHL/basic-sequence-qc",
"pipeline_version": "v0.2.0",
"dependencies": null,
"pipeline_parameters": {
"fastq_input": null,
"prefix": null,
"outdir": null
}
},
{
"pipeline_name": "BCCDC-PHL/taxon-abundance",
"pipeline_version": "v0.1.2",
"dependencies": [
{
"pipeline_name": "BCCDC-PHL/basic-sequence-qc",
"pipeline_version": "v0.2.0"
}
],
"pipeline_parameters": {
"fastq_input": null,
"kraken_db": "/path/to/ref_databases/kraken2/2022-09-26_standard",
"bracken_db": "/path/to/ref_databases/kraken2/2022-09-26_standard",
"read_length": null,
"outdir": null
}
},
{
"pipeline_name": "BCCDC-PHL/routine-assembly",
"pipeline_version": "v0.4.4",
"dependencies": [
{
"pipeline_name": "BCCDC-PHL/basic-sequence-qc",
"pipeline_version": "v0.2.0"
}
],
"pipeline_parameters": {
"samplesheet_input": null,
"prokka": null,
"outdir": null
}
},
{
"pipeline_name": "BCCDC-PHL/mlst-nf",
"pipeline_version": "v0.1.3",
"dependencies": [
{
"pipeline_name": "BCCDC-PHL/routine-assembly",
"pipeline_version": "v0.4.4"
}
],
"pipeline_parameters": {
"assembly_input": null,
"outdir": null
}
},
{
"pipeline_name": "BCCDC-PHL/plasmid-screen",
"pipeline_version": "v0.2.0",
"dependencies": [
{
"pipeline_name": "BCCDC-PHL/routine-assembly",
"pipeline_version": "v0.4.4"
}
],
"pipeline_parameters": {
"samplesheet_input": null,
"pre_assembled": null,
"mob_db": "/path/to/ref_databases/mob-suite/local_2019-06-09_v1.0.0",
"outdir": null
}
}
]
}

0 comments on commit ee7ba4e

Please sign in to comment.