Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db splitting #154

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/sage-cli/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sage_core::{
};
use serde::{Deserialize, Serialize};

#[derive(Serialize)]
#[derive(Serialize, Clone)]
/// Actual search parameters - may include overrides or default values not set by user
pub struct Search {
pub version: String,
Expand Down Expand Up @@ -142,7 +142,7 @@ pub struct QuantOptions {
pub lfq_options: Option<LfqOptions>,
}

#[derive(Serialize, Default)]
#[derive(Serialize, Default, Clone)]
pub struct QuantSettings {
pub tmt: Option<Isobaric>,
pub tmt_settings: TmtSettings,
Expand Down
149 changes: 142 additions & 7 deletions crates/sage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use input::{Input, Search};
use log::info;
use rayon::prelude::*;
use sage_cloudpath::CloudPath;
use sage_core::database::IndexedDatabase;
use sage_core::database::{IndexedDatabase, Parameters, PeptideIx};
use sage_core::fasta::Fasta;
use sage_core::mass::Tolerance;
use sage_core::peptide::Peptide;
use sage_core::scoring::{Feature, Scorer};
use sage_core::spectrum::{ProcessedSpectrum, SpectrumProcessor};
use sage_core::tmt::TmtQuant;
use std::collections::HashSet;
use std::time::Instant;

mod input;
Expand Down Expand Up @@ -61,7 +64,7 @@ impl FromIterator<SageResults> for SageResults {
}

impl Runner {
pub fn new(parameters: Search) -> anyhow::Result<Self> {
pub fn new(parameters: Search, parallel: usize) -> anyhow::Result<Self> {
let start = Instant::now();
let fasta = sage_cloudpath::util::read_fasta(
&parameters.database.fasta,
Expand All @@ -75,7 +78,18 @@ impl Runner {
)
})?;

let database = parameters.database.clone().build(fasta);
let database = match parameters.database.prefilter {
false => parameters.database.clone().build(fasta),
true => {
let mini_runner = Self {
database: IndexedDatabase::default(),
parameters: parameters.clone(),
start,
};
let peptides = mini_runner.prefilter_peptides(parallel, fasta);
parameters.database.clone().build_from_peptides(peptides)
}
};
info!(
"generated {} fragments, {} peptides in {}ms",
database.fragments.len(),
Expand All @@ -89,6 +103,107 @@ impl Runner {
})
}

pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec<Peptide> {
let spectra: Option<Vec<ProcessedSpectrum>> =
match parallel >= self.parameters.mzml_paths.len() {
true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)),
false => None,
};
let mut all_peptides: Vec<Peptide> = fasta
.iter_chunks(self.parameters.database.prefilter_chunk_size)
.enumerate()
.flat_map(|(chunk_id, fasta_chunk)| {
let start = Instant::now();
info!("pre-filtering fasta chunk {}", chunk_id,);
let db = &self.parameters.database.clone().build(fasta_chunk);
info!(
"generated {} fragments, {} peptides in {}ms",
db.fragments.len(),
db.peptides.len(),
(Instant::now() - start).as_millis()
);
let scorer = Scorer {
db,
precursor_tol: self.parameters.precursor_tol,
fragment_tol: self.parameters.fragment_tol,
min_matched_peaks: self.parameters.min_matched_peaks,
min_isotope_err: self.parameters.isotope_errors.0,
max_isotope_err: self.parameters.isotope_errors.1,
min_precursor_charge: self.parameters.precursor_charge.0,
max_precursor_charge: self.parameters.precursor_charge.1,
override_precursor_charge: self.parameters.override_precursor_charge,
max_fragment_charge: self.parameters.max_fragment_charge,
chimera: self.parameters.chimera,
report_psms: self.parameters.report_psms + 1,
wide_window: self.parameters.wide_window,
annotate_matches: self.parameters.annotate_matches,
};
let peptide_idxs: HashSet<PeptideIx> = match &spectra {
Some(spectra) => self.peptide_filter_processed_spectra(&scorer, spectra),
None => self
.parameters
.mzml_paths
.chunks(parallel)
.enumerate()
.flat_map(|(chunk_idx, chunk)| {
let spectra_chunk =
self.read_processed_spectra(chunk, chunk_idx, parallel);
self.peptide_filter_processed_spectra(&scorer, &spectra_chunk)
})
.collect(),
}
.into_iter()
.collect();
let peptides: Vec<Peptide> = peptide_idxs
.into_iter()
.map(|idx| db[idx].clone())
.collect();
info!(
"found {} pre-filtered peptides for fasta chunk {}",
peptides.len(),
chunk_id,
);
peptides
})
.collect();
Parameters::reorder_peptides(&mut all_peptides);
all_peptides
}

fn peptide_filter_processed_spectra(
&self,
scorer: &Scorer,
spectra: &Vec<ProcessedSpectrum>,
) -> Vec<PeptideIx> {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let start = Instant::now();

let peptide_idxs: Vec<_> = spectra
.par_iter()
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
.map(|x| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 10_000 == 0 {
let duration = Instant::now().duration_since(start).as_millis() as usize;

let rate = prev * 1000 / (duration + 1);
log::trace!("- searched {} spectra ({} spectra/s)", prev, rate);
}
x
})
.flat_map(|spec| {
scorer.quick_score(spec, self.parameters.database.prefilter_low_memory)
})
.collect();

let duration = Instant::now().duration_since(start).as_millis() as usize;
let prev = counter.load(Ordering::Relaxed);
let rate = prev * 1000 / (duration + 1);
log::info!("- search: {:8} ms ({} spectra/s)", duration, rate);
peptide_idxs
}

fn spectrum_fdr(&self, features: &mut [Feature]) -> usize {
if sage_core::ml::linear_discriminant::score_psms(features, self.parameters.precursor_tol)
.is_none()
Expand All @@ -113,8 +228,8 @@ impl Runner {
fn search_processed_spectra(
&self,
scorer: &Scorer,
spectra: Vec<ProcessedSpectrum>,
) -> SageResults {
spectra: &Vec<ProcessedSpectrum>,
) -> Vec<Feature> {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let start = Instant::now();
Expand All @@ -139,7 +254,14 @@ impl Runner {
let prev = counter.load(Ordering::Relaxed);
let rate = prev * 1000 / (duration + 1);
log::info!("- search: {:8} ms ({} spectra/s)", duration, rate);
features
}

fn complete_features(
&self,
spectra: Vec<ProcessedSpectrum>,
features: Vec<Feature>,
) -> SageResults {
let quant = self
.parameters
.quant
Expand Down Expand Up @@ -169,6 +291,17 @@ impl Runner {
chunk_idx: usize,
batch_size: usize,
) -> SageResults {
let spectra = self.read_processed_spectra(chunk, chunk_idx, batch_size);
let features = self.search_processed_spectra(scorer, &spectra);
self.complete_features(spectra, features)
}

fn read_processed_spectra(
&self,
chunk: &[String],
chunk_idx: usize,
batch_size: usize,
) -> Vec<ProcessedSpectrum> {
// Read all of the spectra at once - this can help prevent memory over-consumption issues
info!(
"processing files {} .. {} ",
Expand Down Expand Up @@ -240,7 +373,7 @@ impl Runner {
let io_time = Instant::now() - start;
info!("- file IO: {:8} ms", io_time.as_millis());

self.search_processed_spectra(scorer, spectra)
spectra
}

pub fn batch_files(&self, scorer: &Scorer, batch_size: usize) -> SageResults {
Expand Down Expand Up @@ -520,7 +653,9 @@ fn main() -> anyhow::Result<()> {

let input = Input::from_arguments(matches)?;

let runner = input.build().and_then(Runner::new)?;
let runner = input
.build()
.and_then(|parameters| Runner::new(parameters, parallel))?;

let tel = runner.run(parallel, parquet)?;

Expand Down
31 changes: 29 additions & 2 deletions crates/sage/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub struct Builder {
pub generate_decoys: Option<bool>,
/// Path to fasta database
pub fasta: Option<String>,
/// Number of sequences to handle simultaneously when pre-filtering the db
pub prefilter_chunk_size: Option<usize>,
/// Pre-filter the database to minimize memory usage
pub prefilter: Option<bool>,
/// Pre-filter the database with a minimal amount of memory at the cost of speed
pub prefilter_low_memory: Option<bool>,
}

impl Builder {
Expand All @@ -102,6 +108,9 @@ impl Builder {
max_variable_mods: self.max_variable_mods.map(|x| x.max(1)).unwrap_or(2),
generate_decoys: self.generate_decoys.unwrap_or(true),
fasta: self.fasta.expect("A fasta file must be provided!"),
prefilter_chunk_size: self.prefilter_chunk_size.unwrap_or(0),
prefilter: self.prefilter.unwrap_or(false),
prefilter_low_memory: self.prefilter_low_memory.unwrap_or(true),
}
}

Expand All @@ -124,6 +133,9 @@ pub struct Parameters {
pub decoy_tag: String,
pub generate_decoys: bool,
pub fasta: String,
pub prefilter_chunk_size: usize,
pub prefilter: bool,
pub prefilter_low_memory: bool,
}

impl Parameters {
Expand Down Expand Up @@ -172,6 +184,12 @@ impl Parameters {
})
.collect::<Vec<_>>();

Self::reorder_peptides(&mut target_decoys);

target_decoys
}

pub fn reorder_peptides(target_decoys: &mut Vec<Peptide>) {
log::trace!("sorting and deduplicating peptides");

// This is equivalent to a stable sort
Expand All @@ -187,6 +205,9 @@ impl Parameters {
&& remove.cterm == keep.cterm
{
keep.proteins.extend(remove.proteins.iter().cloned());
// When merging peptides from different Fastas,
// decoys in one fasta might be targets in another
keep.decoy &= remove.decoy;
true
} else {
false
Expand All @@ -196,13 +217,15 @@ impl Parameters {
target_decoys
.par_iter_mut()
.for_each(|peptide| peptide.proteins.sort_unstable());

target_decoys
}

// pub fn build(self) -> Result<IndexedDatabase, Box<dyn std::error::Error + Send + Sync + 'static>> {
pub fn build(self, fasta: Fasta) -> IndexedDatabase {
let target_decoys = self.digest(&fasta);
self.build_from_peptides(target_decoys)
}

pub fn build_from_peptides(self, target_decoys: Vec<Peptide>) -> IndexedDatabase {
log::trace!("generating fragments");

// Finally, perform in silico digest for our target sequences
Expand Down Expand Up @@ -321,6 +344,7 @@ pub struct Theoretical {
pub fragment_mz: f32,
}

#[derive(Default)]
pub struct IndexedDatabase {
pub peptides: Vec<Peptide>,
pub fragments: Vec<Theoretical>,
Expand Down Expand Up @@ -598,6 +622,9 @@ mod test {
decoy_tag: "rev_".into(),
generate_decoys: false,
fasta: "none".into(),
prefilter: false,
prefilter_chunk_size: 0,
prefilter_low_memory: true,
};

let peptides = params.digest(&fasta);
Expand Down
10 changes: 10 additions & 0 deletions crates/sage/src/fasta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ impl Fasta {
})
.collect()
}

pub fn iter_chunks(&self, chunk_size: usize) -> impl Iterator<Item = Self> + '_ {
self.targets
.chunks(chunk_size)
.map(move |target_chunk| Self {
targets: target_chunk.to_vec(),
decoy_tag: self.decoy_tag.clone(),
generate_decoys: self.generate_decoys,
})
}
}
Loading
Loading