From 04a65d5d3e1b2b31ae4ec03459916acf5cf95a83 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:52:00 +0200 Subject: [PATCH 01/12] FEAT: added option to iterate over fasta chunks --- crates/sage/src/fasta.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/sage/src/fasta.rs b/crates/sage/src/fasta.rs index 12433d9..ec11eca 100644 --- a/crates/sage/src/fasta.rs +++ b/crates/sage/src/fasta.rs @@ -77,4 +77,14 @@ impl Fasta { }) .collect() } + + pub fn iter_chunks(&self, chunk_size: usize) -> impl Iterator + '_ { + self.targets + .chunks(chunk_size) + .map(move |target_chunk| Self { + targets: target_chunk.to_vec(), + decoy_tag: self.decoy_tag.clone(), + generate_decoys: self.generate_decoys, + }) + } } From 1cb92388202bb597814aae67d25353eb090ac7ff Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:52:45 +0200 Subject: [PATCH 02/12] FEAT: Added cloning to Search and QuantSettings --- crates/sage-cli/src/input.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index f17dd03..e0cbe82 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -9,7 +9,7 @@ use sage_core::{ }; use serde::{Deserialize, Serialize}; -#[derive(Serialize)] +#[derive(Serialize, Clone)] /// Actual search parameters - may include overrides or default values not set by user pub struct Search { pub version: String, @@ -142,7 +142,7 @@ pub struct QuantOptions { pub lfq_options: Option, } -#[derive(Serialize, Default)] +#[derive(Serialize, Default, Clone)] pub struct QuantSettings { pub tmt: Option, pub tmt_settings: TmtSettings, From fcef97b3f92a8259f3cee550b63b34592780d734 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:54:53 +0200 Subject: [PATCH 03/12] CHORE: made reordering of target_decoys a seperate function outside of digest --- crates/sage/src/database.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index db8cd11..3ba1305 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -172,6 +172,12 @@ impl Parameters { }) .collect::>(); + Self::reorder_peptides(&mut target_decoys); + + target_decoys + } + + pub fn reorder_peptides(target_decoys: &mut Vec) { log::trace!("sorting and deduplicating peptides"); // This is equivalent to a stable sort From f7f77dc0808acc78c62a2d9f7594caa64b22fb6d Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:56:19 +0200 Subject: [PATCH 04/12] CHORE: implemented build db from peptide list --- crates/sage/src/database.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index 3ba1305..dd8be95 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -209,6 +209,10 @@ impl Parameters { // pub fn build(self) -> Result> { pub fn build(self, fasta: Fasta) -> IndexedDatabase { let target_decoys = self.digest(&fasta); + self.build_from_peptides(target_decoys) + } + + pub fn build_from_peptides(self, target_decoys: Vec) -> IndexedDatabase { log::trace!("generating fragments"); // Finally, perform in silico digest for our target sequences From 9a328e3fc2393d772d66c5429a1eb9f5f13dab71 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:58:20 +0200 Subject: [PATCH 05/12] FIX: reorder_peptide function return --- crates/sage/src/database.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index dd8be95..11923f6 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -202,8 +202,6 @@ impl Parameters { target_decoys .par_iter_mut() .for_each(|peptide| peptide.proteins.sort_unstable()); - - target_decoys } // pub fn build(self) -> Result> { From 8fb9dd03a74a8baf9d9309d5cc9b59ee8af7d8a3 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:59:12 +0200 Subject: [PATCH 06/12] FIX: ambiguous target/decoy peptides are now always target --- crates/sage/src/database.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index 11923f6..ff23a82 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -193,6 +193,9 @@ impl Parameters { && remove.cterm == keep.cterm { keep.proteins.extend(remove.proteins.iter().cloned()); + // When merging peptides from different Fastas, + // decoys in one fasta might be targets in another + keep.decoy &= remove.decoy; true } else { false From cc8d7c0fd790cc7ea3e40d3e54c2b80e1aa5db0d Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 16:59:55 +0200 Subject: [PATCH 07/12] FEAT: added fasta chunking params to db --- crates/sage/src/database.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index ff23a82..d54578e 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -84,6 +84,10 @@ pub struct Builder { pub generate_decoys: Option, /// Path to fasta database pub fasta: Option, + /// Number of sequences to handle simultaneously when pre-filtering the db + pub prefilter_chunk_size: Option, + /// Pre-filter the database to minimize memory usage + pub prefilter: Option, } impl Builder { @@ -102,6 +106,8 @@ impl Builder { max_variable_mods: self.max_variable_mods.map(|x| x.max(1)).unwrap_or(2), generate_decoys: self.generate_decoys.unwrap_or(true), fasta: self.fasta.expect("A fasta file must be provided!"), + prefilter_chunk_size: self.prefilter_chunk_size.unwrap_or(0), + prefilter: self.prefilter.unwrap_or(false), } } @@ -124,6 +130,8 @@ pub struct Parameters { pub decoy_tag: String, pub generate_decoys: bool, pub fasta: String, + pub prefilter_chunk_size: usize, + pub prefilter: bool, } impl Parameters { @@ -609,6 +617,8 @@ mod test { decoy_tag: "rev_".into(), generate_decoys: false, fasta: "none".into(), + prefilter: false, + prefilter_chunk_size: 0, }; let peptides = params.digest(&fasta); From 3a3889481dc5f619f50c03e2d10ef8458e307487 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 17:03:03 +0200 Subject: [PATCH 08/12] CHORE: refactored process_chunk from runner --- crates/sage-cli/src/main.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index a87d3bf..c0dcaa3 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -113,8 +113,8 @@ impl Runner { fn search_processed_spectra( &self, scorer: &Scorer, - spectra: Vec, - ) -> SageResults { + spectra: &Vec, + ) -> Vec { use std::sync::atomic::{AtomicUsize, Ordering}; let counter = AtomicUsize::new(0); let start = Instant::now(); @@ -139,7 +139,14 @@ impl Runner { let prev = counter.load(Ordering::Relaxed); let rate = prev * 1000 / (duration + 1); log::info!("- search: {:8} ms ({} spectra/s)", duration, rate); + features + } + fn complete_features( + &self, + spectra: Vec, + features: Vec, + ) -> SageResults { let quant = self .parameters .quant @@ -169,6 +176,17 @@ impl Runner { chunk_idx: usize, batch_size: usize, ) -> SageResults { + let spectra = self.read_processed_spectra(chunk, chunk_idx, batch_size); + let features = self.search_processed_spectra(scorer, &spectra); + self.complete_features(spectra, features) + } + + fn read_processed_spectra( + &self, + chunk: &[String], + chunk_idx: usize, + batch_size: usize, + ) -> Vec { // Read all of the spectra at once - this can help prevent memory over-consumption issues info!( "processing files {} .. {} ", @@ -240,7 +258,7 @@ impl Runner { let io_time = Instant::now() - start; info!("- file IO: {:8} ms", io_time.as_millis()); - self.search_processed_spectra(scorer, spectra) + spectra } pub fn batch_files(&self, scorer: &Scorer, batch_size: usize) -> SageResults { From caef6384dc4a2144f63cabaabbf351a78318a5ca Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 17:03:42 +0200 Subject: [PATCH 09/12] FEAT: added defaultto IndexedDatabase --- crates/sage/src/database.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index d54578e..c880c94 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -340,6 +340,7 @@ pub struct Theoretical { pub fragment_mz: f32, } +#[derive(Default)] pub struct IndexedDatabase { pub peptides: Vec, pub fragments: Vec, From e92c8b70c7eef9232bb53f8f1eafb0bb2292e7ab Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 17:29:37 +0200 Subject: [PATCH 10/12] FEAT: added quick_score option to quickly filter peptides --- crates/sage/src/scoring.rs | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/crates/sage/src/scoring.rs b/crates/sage/src/scoring.rs index e2d0c0e..807a7e8 100644 --- a/crates/sage/src/scoring.rs +++ b/crates/sage/src/scoring.rs @@ -209,6 +209,22 @@ fn max_fragment_charge(max_fragment_charge: Option, precursor_charge: u8) -> } impl<'db> Scorer<'db> { + pub fn quick_score(&self, query: &ProcessedSpectrum) -> Vec { + assert_eq!( + query.level, 2, + "internal bug, trying to score a non-MS2 scan!" + ); + let precursor = query.precursors.first().unwrap_or_else(|| { + panic!("missing MS1 precursor for {}", query.id); + }); + let hits = self.initial_hits(&query, precursor); + hits.preliminary + .iter() + .map(|x| x.peptide) + .filter(|&peptide| peptide != PeptideIx::default()) + .collect() + } + pub fn score(&self, query: &ProcessedSpectrum) -> Vec { assert_eq!( query.level, 2, @@ -331,8 +347,8 @@ impl<'db> Scorer<'db> { let mz = precursor.mz - PROTON; // Search in wide-window/DIA mode - if self.wide_window { - let mut hits = (self.min_precursor_charge..=self.max_precursor_charge).fold( + let mut hits = if self.wide_window { + (self.min_precursor_charge..=self.max_precursor_charge).fold( InitialHits::default(), |mut hits, precursor_charge| { let precursor_mass = mz * precursor_charge as f32; @@ -344,9 +360,7 @@ impl<'db> Scorer<'db> { self.matched_peaks(query, precursor_mass, precursor_charge, precursor_tol); hits }, - ); - self.trim_hits(&mut hits); - hits + ) } else if precursor.charge.is_some() && self.override_precursor_charge == false { let charge = precursor.charge.unwrap(); // Charge state is already annotated for this precusor, only search once @@ -356,7 +370,7 @@ impl<'db> Scorer<'db> { // Not all selected ion precursors have charge states annotated (or user has set // `override_precursor_charge`) // assume it could be z=2, z=3, z=4 and search all three - let mut hits = (self.min_precursor_charge..=self.max_precursor_charge).fold( + (self.min_precursor_charge..=self.max_precursor_charge).fold( InitialHits::default(), |mut hits, precursor_charge| { let precursor_mass = mz * precursor_charge as f32; @@ -368,10 +382,10 @@ impl<'db> Scorer<'db> { ); hits }, - ); - self.trim_hits(&mut hits); - hits - } + ) + }; + self.trim_hits(&mut hits); + hits } /// Score a single [`ProcessedSpectrum`] against the database From d966e733486c5e562d03188807041db9c64a7777 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Fri, 6 Sep 2024 17:30:24 +0200 Subject: [PATCH 11/12] FEAT: added parsing of the prefilter fasta by chunk parameters --- crates/sage-cli/src/main.rs | 123 ++++++++++++++++++++++++++++++++++-- 1 file changed, 119 insertions(+), 4 deletions(-) diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index c0dcaa3..41f284e 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -4,11 +4,14 @@ use input::{Input, Search}; use log::info; use rayon::prelude::*; use sage_cloudpath::CloudPath; -use sage_core::database::IndexedDatabase; +use sage_core::database::{IndexedDatabase, Parameters, PeptideIx}; +use sage_core::fasta::Fasta; use sage_core::mass::Tolerance; +use sage_core::peptide::Peptide; use sage_core::scoring::{Feature, Scorer}; use sage_core::spectrum::{ProcessedSpectrum, SpectrumProcessor}; use sage_core::tmt::TmtQuant; +use std::collections::HashSet; use std::time::Instant; mod input; @@ -61,7 +64,7 @@ impl FromIterator for SageResults { } impl Runner { - pub fn new(parameters: Search) -> anyhow::Result { + pub fn new(parameters: Search, parallel: usize) -> anyhow::Result { let start = Instant::now(); let fasta = sage_cloudpath::util::read_fasta( ¶meters.database.fasta, @@ -75,7 +78,18 @@ impl Runner { ) })?; - let database = parameters.database.clone().build(fasta); + let database = match parameters.database.prefilter { + false => parameters.database.clone().build(fasta), + true => { + let mini_runner = Self { + database: IndexedDatabase::default(), + parameters: parameters.clone(), + start, + }; + let peptides = mini_runner.prefilter_peptides(parallel, fasta); + parameters.database.clone().build_from_peptides(peptides) + } + }; info!( "generated {} fragments, {} peptides in {}ms", database.fragments.len(), @@ -89,6 +103,105 @@ impl Runner { }) } + pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec { + let spectra: Option> = + match parallel >= self.parameters.mzml_paths.len() { + true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)), + false => None, + }; + let mut all_peptides: Vec = fasta + .iter_chunks(self.parameters.database.prefilter_chunk_size) + .enumerate() + .flat_map(|(chunk_id, fasta_chunk)| { + let start = Instant::now(); + info!("pre-filtering fasta chunk {}", chunk_id,); + let db = &self.parameters.database.clone().build(fasta_chunk); + info!( + "generated {} fragments, {} peptides in {}ms", + db.fragments.len(), + db.peptides.len(), + (Instant::now() - start).as_millis() + ); + let scorer = Scorer { + db, + precursor_tol: self.parameters.precursor_tol, + fragment_tol: self.parameters.fragment_tol, + min_matched_peaks: self.parameters.min_matched_peaks, + min_isotope_err: self.parameters.isotope_errors.0, + max_isotope_err: self.parameters.isotope_errors.1, + min_precursor_charge: self.parameters.precursor_charge.0, + max_precursor_charge: self.parameters.precursor_charge.1, + override_precursor_charge: self.parameters.override_precursor_charge, + max_fragment_charge: self.parameters.max_fragment_charge, + chimera: self.parameters.chimera, + report_psms: self.parameters.report_psms, + wide_window: self.parameters.wide_window, + annotate_matches: self.parameters.annotate_matches, + }; + let peptide_idxs: HashSet = match &spectra { + Some(spectra) => self.peptide_filter_processed_spectra(&scorer, spectra), + None => self + .parameters + .mzml_paths + .chunks(parallel) + .enumerate() + .flat_map(|(chunk_idx, chunk)| { + let spectra_chunk = + self.read_processed_spectra(chunk, chunk_idx, parallel); + self.peptide_filter_processed_spectra(&scorer, &spectra_chunk) + }) + .collect(), + } + .into_iter() + .collect(); + let peptides: Vec = peptide_idxs + .into_iter() + .map(|idx| db[idx].clone()) + .collect(); + info!( + "found {} pre-filtered peptides for fasta chunk {}", + peptides.len(), + chunk_id, + ); + peptides + }) + .collect(); + Parameters::reorder_peptides(&mut all_peptides); + all_peptides + } + + fn peptide_filter_processed_spectra( + &self, + scorer: &Scorer, + spectra: &Vec, + ) -> Vec { + use std::sync::atomic::{AtomicUsize, Ordering}; + let counter = AtomicUsize::new(0); + let start = Instant::now(); + + let peptide_idxs: Vec<_> = spectra + .par_iter() + .filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2) + .map(|x| { + let prev = counter.fetch_add(1, Ordering::Relaxed); + if prev > 0 && prev % 10_000 == 0 { + let duration = Instant::now().duration_since(start).as_millis() as usize; + + let rate = prev * 1000 / (duration + 1); + log::trace!("- searched {} spectra ({} spectra/s)", prev, rate); + } + x + }) + .flat_map(|spec| scorer.quick_score(spec)) + .collect(); + + let duration = Instant::now().duration_since(start).as_millis() as usize; + let prev = counter.load(Ordering::Relaxed); + let rate = prev * 1000 / (duration + 1); + log::info!("- search: {:8} ms ({} spectra/s)", duration, rate); + peptide_idxs + } + fn spectrum_fdr(&self, features: &mut [Feature]) -> usize { if sage_core::ml::linear_discriminant::score_psms(features, self.parameters.precursor_tol) .is_none() @@ -538,7 +651,9 @@ fn main() -> anyhow::Result<()> { let input = Input::from_arguments(matches)?; - let runner = input.build().and_then(Runner::new)?; + let runner = input + .build() + .and_then(|parameters| Runner::new(parameters, parallel))?; let tel = runner.run(parallel, parquet)?; From 4299a0d67ba15114ddde07686ecf21714c5ba5e8 Mon Sep 17 00:00:00 2001 From: Sander Willems Date: Mon, 9 Sep 2024 18:20:04 +0200 Subject: [PATCH 12/12] FEAT: added extra low memory option --- crates/sage-cli/src/main.rs | 6 ++++-- crates/sage/src/database.rs | 5 +++++ crates/sage/src/scoring.rs | 25 ++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index 41f284e..842912b 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -134,7 +134,7 @@ impl Runner { override_precursor_charge: self.parameters.override_precursor_charge, max_fragment_charge: self.parameters.max_fragment_charge, chimera: self.parameters.chimera, - report_psms: self.parameters.report_psms, + report_psms: self.parameters.report_psms + 1, wide_window: self.parameters.wide_window, annotate_matches: self.parameters.annotate_matches, }; @@ -192,7 +192,9 @@ impl Runner { } x }) - .flat_map(|spec| scorer.quick_score(spec)) + .flat_map(|spec| { + scorer.quick_score(spec, self.parameters.database.prefilter_low_memory) + }) .collect(); let duration = Instant::now().duration_since(start).as_millis() as usize; diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index c880c94..2db0e4c 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -88,6 +88,8 @@ pub struct Builder { pub prefilter_chunk_size: Option, /// Pre-filter the database to minimize memory usage pub prefilter: Option, + /// Pre-filter the database with a minimal amount of memory at the cost of speed + pub prefilter_low_memory: Option, } impl Builder { @@ -108,6 +110,7 @@ impl Builder { fasta: self.fasta.expect("A fasta file must be provided!"), prefilter_chunk_size: self.prefilter_chunk_size.unwrap_or(0), prefilter: self.prefilter.unwrap_or(false), + prefilter_low_memory: self.prefilter_low_memory.unwrap_or(true), } } @@ -132,6 +135,7 @@ pub struct Parameters { pub fasta: String, pub prefilter_chunk_size: usize, pub prefilter: bool, + pub prefilter_low_memory: bool, } impl Parameters { @@ -620,6 +624,7 @@ mod test { fasta: "none".into(), prefilter: false, prefilter_chunk_size: 0, + prefilter_low_memory: true, }; let peptides = params.digest(&fasta); diff --git a/crates/sage/src/scoring.rs b/crates/sage/src/scoring.rs index 807a7e8..9515193 100644 --- a/crates/sage/src/scoring.rs +++ b/crates/sage/src/scoring.rs @@ -209,7 +209,11 @@ fn max_fragment_charge(max_fragment_charge: Option, precursor_charge: u8) -> } impl<'db> Scorer<'db> { - pub fn quick_score(&self, query: &ProcessedSpectrum) -> Vec { + pub fn quick_score( + &self, + query: &ProcessedSpectrum, + prefilter_low_memory: bool, + ) -> Vec { assert_eq!( query.level, 2, "internal bug, trying to score a non-MS2 scan!" @@ -218,11 +222,30 @@ impl<'db> Scorer<'db> { panic!("missing MS1 precursor for {}", query.id); }); let hits = self.initial_hits(&query, precursor); + + if prefilter_low_memory { + let mut score_vector = hits + .preliminary + .iter() + .filter(|score| score.peptide != PeptideIx::default()) + .map(|pre| self.score_candidate(query, pre)) + .filter(|s| (s.0.matched_b + s.0.matched_y) >= self.min_matched_peaks) + .collect::>(); + + score_vector.sort_by(|a, b| b.0.hyperscore.total_cmp(&a.0.hyperscore)); + score_vector + .iter() + .take(self.report_psms.min(score_vector.len())) + .map(|x| x.0.peptide) + .filter(|&peptide| peptide != PeptideIx::default()) + .collect() + } else { hits.preliminary .iter() .map(|x| x.peptide) .filter(|&peptide| peptide != PeptideIx::default()) .collect() + } } pub fn score(&self, query: &ProcessedSpectrum) -> Vec {