diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index f17dd03..e0cbe82 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -9,7 +9,7 @@ use sage_core::{ }; use serde::{Deserialize, Serialize}; -#[derive(Serialize)] +#[derive(Serialize, Clone)] /// Actual search parameters - may include overrides or default values not set by user pub struct Search { pub version: String, @@ -142,7 +142,7 @@ pub struct QuantOptions { pub lfq_options: Option, } -#[derive(Serialize, Default)] +#[derive(Serialize, Default, Clone)] pub struct QuantSettings { pub tmt: Option, pub tmt_settings: TmtSettings, diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index a87d3bf..842912b 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -4,11 +4,14 @@ use input::{Input, Search}; use log::info; use rayon::prelude::*; use sage_cloudpath::CloudPath; -use sage_core::database::IndexedDatabase; +use sage_core::database::{IndexedDatabase, Parameters, PeptideIx}; +use sage_core::fasta::Fasta; use sage_core::mass::Tolerance; +use sage_core::peptide::Peptide; use sage_core::scoring::{Feature, Scorer}; use sage_core::spectrum::{ProcessedSpectrum, SpectrumProcessor}; use sage_core::tmt::TmtQuant; +use std::collections::HashSet; use std::time::Instant; mod input; @@ -61,7 +64,7 @@ impl FromIterator for SageResults { } impl Runner { - pub fn new(parameters: Search) -> anyhow::Result { + pub fn new(parameters: Search, parallel: usize) -> anyhow::Result { let start = Instant::now(); let fasta = sage_cloudpath::util::read_fasta( ¶meters.database.fasta, @@ -75,7 +78,18 @@ impl Runner { ) })?; - let database = parameters.database.clone().build(fasta); + let database = match parameters.database.prefilter { + false => parameters.database.clone().build(fasta), + true => { + let mini_runner = Self { + database: IndexedDatabase::default(), + parameters: parameters.clone(), + start, + }; + let peptides = mini_runner.prefilter_peptides(parallel, fasta); + parameters.database.clone().build_from_peptides(peptides) + } + }; info!( "generated {} fragments, {} peptides in {}ms", database.fragments.len(), @@ -89,6 +103,107 @@ impl Runner { }) } + pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec { + let spectra: Option> = + match parallel >= self.parameters.mzml_paths.len() { + true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)), + false => None, + }; + let mut all_peptides: Vec = fasta + .iter_chunks(self.parameters.database.prefilter_chunk_size) + .enumerate() + .flat_map(|(chunk_id, fasta_chunk)| { + let start = Instant::now(); + info!("pre-filtering fasta chunk {}", chunk_id,); + let db = &self.parameters.database.clone().build(fasta_chunk); + info!( + "generated {} fragments, {} peptides in {}ms", + db.fragments.len(), + db.peptides.len(), + (Instant::now() - start).as_millis() + ); + let scorer = Scorer { + db, + precursor_tol: self.parameters.precursor_tol, + fragment_tol: self.parameters.fragment_tol, + min_matched_peaks: self.parameters.min_matched_peaks, + min_isotope_err: self.parameters.isotope_errors.0, + max_isotope_err: self.parameters.isotope_errors.1, + min_precursor_charge: self.parameters.precursor_charge.0, + max_precursor_charge: self.parameters.precursor_charge.1, + override_precursor_charge: self.parameters.override_precursor_charge, + max_fragment_charge: self.parameters.max_fragment_charge, + chimera: self.parameters.chimera, + report_psms: self.parameters.report_psms + 1, + wide_window: self.parameters.wide_window, + annotate_matches: self.parameters.annotate_matches, + }; + let peptide_idxs: HashSet = match &spectra { + Some(spectra) => self.peptide_filter_processed_spectra(&scorer, spectra), + None => self + .parameters + .mzml_paths + .chunks(parallel) + .enumerate() + .flat_map(|(chunk_idx, chunk)| { + let spectra_chunk = + self.read_processed_spectra(chunk, chunk_idx, parallel); + self.peptide_filter_processed_spectra(&scorer, &spectra_chunk) + }) + .collect(), + } + .into_iter() + .collect(); + let peptides: Vec = peptide_idxs + .into_iter() + .map(|idx| db[idx].clone()) + .collect(); + info!( + "found {} pre-filtered peptides for fasta chunk {}", + peptides.len(), + chunk_id, + ); + peptides + }) + .collect(); + Parameters::reorder_peptides(&mut all_peptides); + all_peptides + } + + fn peptide_filter_processed_spectra( + &self, + scorer: &Scorer, + spectra: &Vec, + ) -> Vec { + use std::sync::atomic::{AtomicUsize, Ordering}; + let counter = AtomicUsize::new(0); + let start = Instant::now(); + + let peptide_idxs: Vec<_> = spectra + .par_iter() + .filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2) + .map(|x| { + let prev = counter.fetch_add(1, Ordering::Relaxed); + if prev > 0 && prev % 10_000 == 0 { + let duration = Instant::now().duration_since(start).as_millis() as usize; + + let rate = prev * 1000 / (duration + 1); + log::trace!("- searched {} spectra ({} spectra/s)", prev, rate); + } + x + }) + .flat_map(|spec| { + scorer.quick_score(spec, self.parameters.database.prefilter_low_memory) + }) + .collect(); + + let duration = Instant::now().duration_since(start).as_millis() as usize; + let prev = counter.load(Ordering::Relaxed); + let rate = prev * 1000 / (duration + 1); + log::info!("- search: {:8} ms ({} spectra/s)", duration, rate); + peptide_idxs + } + fn spectrum_fdr(&self, features: &mut [Feature]) -> usize { if sage_core::ml::linear_discriminant::score_psms(features, self.parameters.precursor_tol) .is_none() @@ -113,8 +228,8 @@ impl Runner { fn search_processed_spectra( &self, scorer: &Scorer, - spectra: Vec, - ) -> SageResults { + spectra: &Vec, + ) -> Vec { use std::sync::atomic::{AtomicUsize, Ordering}; let counter = AtomicUsize::new(0); let start = Instant::now(); @@ -139,7 +254,14 @@ impl Runner { let prev = counter.load(Ordering::Relaxed); let rate = prev * 1000 / (duration + 1); log::info!("- search: {:8} ms ({} spectra/s)", duration, rate); + features + } + fn complete_features( + &self, + spectra: Vec, + features: Vec, + ) -> SageResults { let quant = self .parameters .quant @@ -169,6 +291,17 @@ impl Runner { chunk_idx: usize, batch_size: usize, ) -> SageResults { + let spectra = self.read_processed_spectra(chunk, chunk_idx, batch_size); + let features = self.search_processed_spectra(scorer, &spectra); + self.complete_features(spectra, features) + } + + fn read_processed_spectra( + &self, + chunk: &[String], + chunk_idx: usize, + batch_size: usize, + ) -> Vec { // Read all of the spectra at once - this can help prevent memory over-consumption issues info!( "processing files {} .. {} ", @@ -240,7 +373,7 @@ impl Runner { let io_time = Instant::now() - start; info!("- file IO: {:8} ms", io_time.as_millis()); - self.search_processed_spectra(scorer, spectra) + spectra } pub fn batch_files(&self, scorer: &Scorer, batch_size: usize) -> SageResults { @@ -520,7 +653,9 @@ fn main() -> anyhow::Result<()> { let input = Input::from_arguments(matches)?; - let runner = input.build().and_then(Runner::new)?; + let runner = input + .build() + .and_then(|parameters| Runner::new(parameters, parallel))?; let tel = runner.run(parallel, parquet)?; diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index db8cd11..2db0e4c 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -84,6 +84,12 @@ pub struct Builder { pub generate_decoys: Option, /// Path to fasta database pub fasta: Option, + /// Number of sequences to handle simultaneously when pre-filtering the db + pub prefilter_chunk_size: Option, + /// Pre-filter the database to minimize memory usage + pub prefilter: Option, + /// Pre-filter the database with a minimal amount of memory at the cost of speed + pub prefilter_low_memory: Option, } impl Builder { @@ -102,6 +108,9 @@ impl Builder { max_variable_mods: self.max_variable_mods.map(|x| x.max(1)).unwrap_or(2), generate_decoys: self.generate_decoys.unwrap_or(true), fasta: self.fasta.expect("A fasta file must be provided!"), + prefilter_chunk_size: self.prefilter_chunk_size.unwrap_or(0), + prefilter: self.prefilter.unwrap_or(false), + prefilter_low_memory: self.prefilter_low_memory.unwrap_or(true), } } @@ -124,6 +133,9 @@ pub struct Parameters { pub decoy_tag: String, pub generate_decoys: bool, pub fasta: String, + pub prefilter_chunk_size: usize, + pub prefilter: bool, + pub prefilter_low_memory: bool, } impl Parameters { @@ -172,6 +184,12 @@ impl Parameters { }) .collect::>(); + Self::reorder_peptides(&mut target_decoys); + + target_decoys + } + + pub fn reorder_peptides(target_decoys: &mut Vec) { log::trace!("sorting and deduplicating peptides"); // This is equivalent to a stable sort @@ -187,6 +205,9 @@ impl Parameters { && remove.cterm == keep.cterm { keep.proteins.extend(remove.proteins.iter().cloned()); + // When merging peptides from different Fastas, + // decoys in one fasta might be targets in another + keep.decoy &= remove.decoy; true } else { false @@ -196,13 +217,15 @@ impl Parameters { target_decoys .par_iter_mut() .for_each(|peptide| peptide.proteins.sort_unstable()); - - target_decoys } // pub fn build(self) -> Result> { pub fn build(self, fasta: Fasta) -> IndexedDatabase { let target_decoys = self.digest(&fasta); + self.build_from_peptides(target_decoys) + } + + pub fn build_from_peptides(self, target_decoys: Vec) -> IndexedDatabase { log::trace!("generating fragments"); // Finally, perform in silico digest for our target sequences @@ -321,6 +344,7 @@ pub struct Theoretical { pub fragment_mz: f32, } +#[derive(Default)] pub struct IndexedDatabase { pub peptides: Vec, pub fragments: Vec, @@ -598,6 +622,9 @@ mod test { decoy_tag: "rev_".into(), generate_decoys: false, fasta: "none".into(), + prefilter: false, + prefilter_chunk_size: 0, + prefilter_low_memory: true, }; let peptides = params.digest(&fasta); diff --git a/crates/sage/src/fasta.rs b/crates/sage/src/fasta.rs index 12433d9..ec11eca 100644 --- a/crates/sage/src/fasta.rs +++ b/crates/sage/src/fasta.rs @@ -77,4 +77,14 @@ impl Fasta { }) .collect() } + + pub fn iter_chunks(&self, chunk_size: usize) -> impl Iterator + '_ { + self.targets + .chunks(chunk_size) + .map(move |target_chunk| Self { + targets: target_chunk.to_vec(), + decoy_tag: self.decoy_tag.clone(), + generate_decoys: self.generate_decoys, + }) + } } diff --git a/crates/sage/src/scoring.rs b/crates/sage/src/scoring.rs index e2d0c0e..9515193 100644 --- a/crates/sage/src/scoring.rs +++ b/crates/sage/src/scoring.rs @@ -209,6 +209,45 @@ fn max_fragment_charge(max_fragment_charge: Option, precursor_charge: u8) -> } impl<'db> Scorer<'db> { + pub fn quick_score( + &self, + query: &ProcessedSpectrum, + prefilter_low_memory: bool, + ) -> Vec { + assert_eq!( + query.level, 2, + "internal bug, trying to score a non-MS2 scan!" + ); + let precursor = query.precursors.first().unwrap_or_else(|| { + panic!("missing MS1 precursor for {}", query.id); + }); + let hits = self.initial_hits(&query, precursor); + + if prefilter_low_memory { + let mut score_vector = hits + .preliminary + .iter() + .filter(|score| score.peptide != PeptideIx::default()) + .map(|pre| self.score_candidate(query, pre)) + .filter(|s| (s.0.matched_b + s.0.matched_y) >= self.min_matched_peaks) + .collect::>(); + + score_vector.sort_by(|a, b| b.0.hyperscore.total_cmp(&a.0.hyperscore)); + score_vector + .iter() + .take(self.report_psms.min(score_vector.len())) + .map(|x| x.0.peptide) + .filter(|&peptide| peptide != PeptideIx::default()) + .collect() + } else { + hits.preliminary + .iter() + .map(|x| x.peptide) + .filter(|&peptide| peptide != PeptideIx::default()) + .collect() + } + } + pub fn score(&self, query: &ProcessedSpectrum) -> Vec { assert_eq!( query.level, 2, @@ -331,8 +370,8 @@ impl<'db> Scorer<'db> { let mz = precursor.mz - PROTON; // Search in wide-window/DIA mode - if self.wide_window { - let mut hits = (self.min_precursor_charge..=self.max_precursor_charge).fold( + let mut hits = if self.wide_window { + (self.min_precursor_charge..=self.max_precursor_charge).fold( InitialHits::default(), |mut hits, precursor_charge| { let precursor_mass = mz * precursor_charge as f32; @@ -344,9 +383,7 @@ impl<'db> Scorer<'db> { self.matched_peaks(query, precursor_mass, precursor_charge, precursor_tol); hits }, - ); - self.trim_hits(&mut hits); - hits + ) } else if precursor.charge.is_some() && self.override_precursor_charge == false { let charge = precursor.charge.unwrap(); // Charge state is already annotated for this precusor, only search once @@ -356,7 +393,7 @@ impl<'db> Scorer<'db> { // Not all selected ion precursors have charge states annotated (or user has set // `override_precursor_charge`) // assume it could be z=2, z=3, z=4 and search all three - let mut hits = (self.min_precursor_charge..=self.max_precursor_charge).fold( + (self.min_precursor_charge..=self.max_precursor_charge).fold( InitialHits::default(), |mut hits, precursor_charge| { let precursor_mass = mz * precursor_charge as f32; @@ -368,10 +405,10 @@ impl<'db> Scorer<'db> { ); hits }, - ); - self.trim_hits(&mut hits); - hits - } + ) + }; + self.trim_hits(&mut hits); + hits } /// Score a single [`ProcessedSpectrum`] against the database