From 38ff7eb5efb03044cbcae93528e4610da70d5aa0 Mon Sep 17 00:00:00 2001 From: Till Hartmann Date: Mon, 2 Dec 2024 18:23:13 +0100 Subject: [PATCH] feat: refactor db subset CLI, add subset by VCF and subset by TxId options --- src/annotate/seqvars/provider.rs | 42 +++++ src/db/subset/mod.rs | 299 +++++++++++++++++++++++-------- 2 files changed, 265 insertions(+), 76 deletions(-) diff --git a/src/annotate/seqvars/provider.rs b/src/annotate/seqvars/provider.rs index baf8c574..831b06f8 100644 --- a/src/annotate/seqvars/provider.rs +++ b/src/annotate/seqvars/provider.rs @@ -97,6 +97,48 @@ impl TxIntervalTrees { (contig_to_idx, trees) } + + pub fn get_tx_for_region( + &self, + tx_seq_db: &TxSeqDatabase, + alt_ac: &str, + _alt_aln_method: &str, + start_i: i32, + end_i: i32, + ) -> Result, Error> { + let contig_idx = *self + .contig_to_idx + .get(alt_ac) + .ok_or(Error::NoTranscriptFound(alt_ac.to_string()))?; + let query = start_i..end_i; + let tx_idxs = self.trees[contig_idx].find(query); + + Ok(tx_idxs + .iter() + .map(|entry| { + let tx = &tx_seq_db.tx_db.as_ref().expect("no tx_db?").transcripts + [*entry.data() as usize]; + assert_eq!( + tx.genome_alignments.len(), + 1, + "Can only have one alignment in Mehari" + ); + let alt_strand = tx.genome_alignments.first().unwrap().strand; + TxForRegionRecord { + tx_ac: tx.id.clone(), + alt_ac: alt_ac.to_string(), + alt_strand: match Strand::try_from(alt_strand).expect("invalid strand") { + Strand::Plus => 1, + Strand::Minus => -1, + _ => unreachable!("invalid strand {}", alt_strand), + }, + alt_aln_method: ALT_ALN_METHOD.to_string(), + start_i, + end_i, + } + }) + .collect()) + } } /// Configuration for constructing the `Provider`. diff --git a/src/db/subset/mod.rs b/src/db/subset/mod.rs index 87fa2d4b..78b4edb2 100644 --- a/src/db/subset/mod.rs +++ b/src/db/subset/mod.rs @@ -1,12 +1,16 @@ //! Subset transcript database. -use std::{io::Write, path::PathBuf}; - +use crate::annotate::seqvars::load_tx_db; +use crate::annotate::seqvars::provider::TxIntervalTrees; +use crate::db::TranscriptDatabase; +use crate::pbs::txs::{TranscriptDb, TxSeqDatabase}; +use anyhow::{Error, Result}; +use biocommons_bioutils::assemblies::{Assembly, ASSEMBLY_INFOS}; use clap::Parser; use indexmap::{IndexMap, IndexSet}; +use noodles::vcf::variant::Record; use prost::Message as _; - -use crate::annotate::seqvars::load_tx_db; +use std::{io::Write, path::PathBuf}; /// Command line arguments for `db subset` sub command. #[derive(Parser, Debug)] @@ -15,44 +19,102 @@ pub struct Args { /// Path to database file to read from. #[arg(long)] pub path_in: PathBuf, - // Path to output file to write to. + + /// Path to output file to write to. #[arg(long)] pub path_out: PathBuf, - /// Limit transcript database to the following HGNC symbols. + + #[command(flatten)] + pub selection: Selection, +} + +#[derive(Debug, Default, Clone, clap::Args)] +#[group(required = true, multiple = false)] +pub struct Selection { + /// Limit transcript database to the transcripts affected by the variants described in the specified VCF file. + #[arg(long)] + vcf: Option, + + /// Limit transcript database to the specified HGNC ID. Can be specified multiple times. #[arg(long)] - pub gene_symbols: Vec, + hgnc_id: Option>, + + /// Limit transcript database to the specified transcript ID. Can be specified multiple times. + #[arg(long)] + transcript_id: Option>, +} + +/// Main entry point. +pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), Error> { + tracing::info!("Opening transcript database"); + let tx_db = load_tx_db(format!("{}", args.path_in.display()))?; + + tracing::info!("Subsetting ..."); + let tx_db = subset_tx_db(&tx_db, &args.selection)?; + + tracing::info!("... and encoding ..."); + let mut buf = Vec::with_capacity(tx_db.encoded_len()); + tx_db + .encode(&mut buf) + .map_err(|e| anyhow::anyhow!("failed to encode: {}", e))?; + + tracing::info!("... and writing ..."); + // Open file and if necessary, wrap in a decompressor. + let file = std::fs::File::create(&args.path_out) + .map_err(|e| anyhow::anyhow!("failed to create file {}: {}", args.path_out.display(), e))?; + let ext = args.path_out.extension().map(|s| s.to_str()); + let mut writer: Box = if ext == Some(Some("gz")) { + Box::new(flate2::write::GzEncoder::new( + file, + flate2::Compression::default(), + )) + } else if ext == Some(Some("zst")) { + Box::new( + zstd::Encoder::new(file, 0) + .map_err(|e| { + anyhow::anyhow!( + "failed to open zstd encoder for {}: {}", + args.path_out.display(), + e + ) + })? + .auto_finish(), + ) + } else { + Box::new(file) + }; + writer + .write_all(&buf) + .map_err(|e| anyhow::anyhow!("failed to write to {}: {}", args.path_out.display(), e))?; + + tracing::info!("... done"); + Ok(()) } -fn subset_tx_db( - container: &crate::pbs::txs::TxSeqDatabase, - gene_symbols: &[String], -) -> Result { +fn subset_tx_db(container: &TxSeqDatabase, selection: &Selection) -> Result { let container_tx_db = container.tx_db.as_ref().expect("no tx_db"); - let (tx_idxs, tx_ids, gene_ids) = { - let gene_symbols = IndexSet::::from_iter(gene_symbols.iter().cloned()); - let mut tx_idxs = Vec::new(); - let mut tx_ids = Vec::new(); - let mut gene_ids = IndexSet::new(); - for (idx, tx) in container_tx_db.transcripts.iter().enumerate() { - tracing::debug!("considering transcript: {:?}", &tx); - if gene_symbols.contains(&tx.gene_symbol) - || gene_symbols.contains(&tx.gene_id) - || gene_symbols.contains(&tx.gene_id.replace("HGNC:", "")) - { - tracing::debug!("keeping transcript: {}", tx.id); - tx_idxs.push(idx); - tx_ids.push(tx.id.clone()); - gene_ids.insert(tx.gene_id.clone()); - } + + let Selection { + vcf, + hgnc_id: hgnc_ids, + transcript_id: transcript_ids, + } = selection; + + let (tx_idxs, tx_ids, gene_ids) = match (hgnc_ids, transcript_ids, vcf) { + (Some(hgnc_ids), _, _) => _extract_transcripts_by_hgnc_id(container_tx_db, hgnc_ids)?, + (_, Some(transcript_ids), _) => { + _extract_transcripts_by_txid(container_tx_db, transcript_ids) } - ( - IndexSet::::from_iter(tx_idxs), - IndexSet::::from_iter(tx_ids), - gene_ids, - ) + (_, _, Some(vcf)) => _extract_transcripts_by_region(container, vcf)?, + _ => unreachable!(), }; + tracing::info!( + "Keeping {} transcripts (of {} gene(s))", + tx_idxs.len(), + gene_ids.len() + ); - let tx_db = Some(crate::pbs::txs::TranscriptDb { + let tx_db = Some(TranscriptDb { transcripts: container_tx_db .transcripts .iter() @@ -112,7 +174,7 @@ fn subset_tx_db( }) }; - Ok(crate::pbs::txs::TxSeqDatabase { + Ok(TxSeqDatabase { tx_db, seq_db, version: container.version.clone(), @@ -120,55 +182,137 @@ fn subset_tx_db( }) } -/// Main entry point. -pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> { - tracing::info!("Opening transcript database"); - let tx_db = load_tx_db(format!("{}", args.path_in.display()))?; +fn _extract_transcripts_by_region( + container: &TxSeqDatabase, + vcf: &PathBuf, +) -> Result<(IndexSet, IndexSet, IndexSet)> { + let container_tx_db = container.tx_db.as_ref().expect("no tx_db"); + let trees = TxIntervalTrees::new(container); - tracing::info!("Subsetting ..."); - let tx_db = subset_tx_db(&tx_db, &args.gene_symbols)?; + let chrom_to_acc = _chrom_to_acc(container.assembly()); - tracing::info!("... and encoding ..."); - let mut buf = Vec::with_capacity(tx_db.encoded_len()); - tx_db - .encode(&mut buf) - .map_err(|e| anyhow::anyhow!("failed to encode: {}", e))?; + let mut transcript_ids = IndexSet::::new(); + let mut reader = noodles::vcf::io::reader::Builder::default().build_from_path(vcf)?; + let header = reader.read_header()?; + for result in reader.records() { + let record = result?; + if let (Some(Ok(start)), Ok(end)) = (record.variant_start(), record.variant_end(&header)) { + let chrom = record.reference_sequence_name().to_string(); + let accession = chrom_to_acc.get(&chrom).unwrap_or(&chrom); - tracing::info!("... and writing ..."); - // Open file and if necessary, wrap in a decompressor. - let file = std::fs::File::create(&args.path_out) - .map_err(|e| anyhow::anyhow!("failed to create file {}: {}", args.path_out.display(), e))?; - let ext = args.path_out.extension().map(|s| s.to_str()); - let mut writer: Box = if ext == Some(Some("gz")) { - Box::new(flate2::write::GzEncoder::new( - file, - flate2::Compression::default(), - )) - } else if ext == Some(Some("zst")) { - Box::new( - zstd::Encoder::new(file, 0) - .map_err(|e| { - anyhow::anyhow!( - "failed to open zstd encoder for {}: {}", - args.path_out.display(), - e - ) - })? - .auto_finish(), - ) - } else { - Box::new(file) - }; - writer - .write_all(&buf) - .map_err(|e| anyhow::anyhow!("failed to write to {}: {}", args.path_out.display(), e))?; + let txs = trees.get_tx_for_region( + container, + accession, + "splign", + usize::from(start).try_into()?, + usize::from(end).try_into()?, + )?; + transcript_ids.extend(txs.into_iter().map(|tx| tx.tx_ac)); + } + } + if transcript_ids.is_empty() { + return Err(anyhow::anyhow!( + "no transcripts overlapping VCF variants found in transcript database" + )); + } + let (tx_idxs, gene_ids) = __extract_transcripts_from_db(container_tx_db, &transcript_ids); + Ok(( + IndexSet::::from_iter(tx_idxs), + transcript_ids, + gene_ids, + )) +} - tracing::info!("... done"); - Ok(()) +fn _extract_transcripts_by_txid( + container_tx_db: &TranscriptDb, + transcript_ids: &Vec, +) -> (IndexSet, IndexSet, IndexSet) { + let transcript_ids = IndexSet::::from_iter(transcript_ids.iter().cloned()); + let (tx_idxs, gene_ids) = __extract_transcripts_from_db(container_tx_db, &transcript_ids); + ( + IndexSet::::from_iter(tx_idxs), + transcript_ids, + gene_ids, + ) +} + +fn _extract_transcripts_by_hgnc_id( + container_tx_db: &TranscriptDb, + gene_symbols: &Vec, +) -> Result<(IndexSet, IndexSet, IndexSet)> { + let hgnc_ids = IndexSet::::from_iter(gene_symbols.iter().cloned()); + let mut tx_idxs = Vec::new(); + let mut tx_ids = Vec::new(); + let mut gene_ids = IndexSet::new(); + for (idx, tx) in container_tx_db.transcripts.iter().enumerate() { + tracing::debug!("considering transcript: {:?}", &tx); + if hgnc_ids.contains(&tx.gene_symbol) + || hgnc_ids.contains(&tx.gene_id) + || hgnc_ids.contains(&tx.gene_id.replace("HGNC:", "")) + { + tracing::debug!("keeping transcript: {}", tx.id); + tx_idxs.push(idx); + tx_ids.push(tx.id.clone()); + gene_ids.insert(tx.gene_id.clone()); + } + } + if tx_ids.is_empty() { + return Err(anyhow::anyhow!( + "no transcripts found for HGNC IDs: {:?}", + hgnc_ids + )); + } + Ok(( + IndexSet::::from_iter(tx_idxs), + IndexSet::::from_iter(tx_ids), + gene_ids, + )) +} + +fn __extract_transcripts_from_db( + container_tx_db: &TranscriptDb, + transcript_ids: &IndexSet, +) -> (Vec, IndexSet) { + let mut tx_idxs = Vec::new(); + let mut gene_ids = IndexSet::new(); + for (idx, tx) in container_tx_db.transcripts.iter().enumerate() { + tracing::debug!("considering transcript: {:?}", &tx); + if transcript_ids.contains(&tx.id) { + tracing::debug!("keeping transcript: {}", tx.id); + tx_idxs.push(idx); + gene_ids.insert(tx.gene_id.clone()); + } + } + (tx_idxs, gene_ids) +} + +fn _acc_to_chrom(assembly: Assembly) -> IndexMap { + IndexMap::from_iter( + ASSEMBLY_INFOS[assembly] + .sequences + .iter() + .map(|record| (record.refseq_ac.clone(), record.name.clone())), + ) +} + +fn _chrom_to_acc(assembly: Assembly) -> IndexMap { + let acc_to_chrom = _acc_to_chrom(assembly); + let mut chrom_to_acc = IndexMap::new(); + for (acc, chrom) in &acc_to_chrom { + let chrom = if chrom.starts_with("chr") { + chrom.strip_prefix("chr").unwrap() + } else { + chrom + }; + chrom_to_acc.insert(chrom.to_string(), acc.clone()); + chrom_to_acc.insert(format!("chr{}", chrom), acc.clone()); + } + chrom_to_acc } #[cfg(test)] mod tests { + use crate::db::subset::Selection; use temp_testdir::TempDir; #[tracing_test::traced_test] @@ -182,7 +326,10 @@ mod tests { &super::Args { path_in: "tests/data/annotate/db/grch37/txs.bin.zst".into(), path_out: path_out.clone(), - gene_symbols: vec!["HGNC:12403".into()], + selection: Selection { + hgnc_id: Some(vec!["HGNC:12403".into()]), + ..Default::default() + }, }, )?;