Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: cache mappers, avoid allocations, codons are arrays #172

Merged
merged 15 commits into from
Jun 10, 2024
Merged
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ postgres = { version = "0.19", features = ["with-chrono-0_4"] }
quick_cache = "0.5"
regex = "1.7"
rustc-hash = "1.1"
seqrepo = { version = "0.10", features = ["cached"] }
seqrepo = { version = "0.10.2", features = ["cached"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
indexmap = { version = "2", features = ["serde"] }
biocommons-bioutils = "0.1.0"
ahash = "0.8.11"
cached = "0.50.0"

[dev-dependencies]
anyhow = "1.0"
Expand Down
10 changes: 3 additions & 7 deletions src/data/cdot/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,13 +766,9 @@ impl TxProvider {
.as_ref()
.expect("cannot happen; genes without map_location are not imported")
.clone(),
descr: gene
.description
.as_ref()
.map(Clone::clone)
.unwrap_or_default(),
summary: gene.summary.as_ref().map(Clone::clone).unwrap_or_default(),
aliases: gene.aliases.as_ref().map(Clone::clone).unwrap_or_default(),
descr: gene.description.clone().unwrap_or_default(),
summary: gene.summary.clone().unwrap_or_default(),
aliases: gene.aliases.clone().unwrap_or_default(),
added: NaiveDateTime::default(),
})
}
Expand Down
5 changes: 3 additions & 2 deletions src/data/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Error type definition.

use std::sync::Arc;
use thiserror::Error;

/// Error type for data.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum Error {
#[error("UTA Postgres access error")]
UtaPostgresError(#[from] postgres::Error),
UtaPostgresError(#[from] Arc<postgres::Error>),
#[error("sequence operation failed")]
SequenceOperationFailed(#[from] crate::sequences::Error),
#[error("problem with seqrepo access")]
Expand Down
146 changes: 80 additions & 66 deletions src/data/uta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use indexmap::IndexMap;
use postgres::{Client, NoTls, Row};
use quick_cache::sync::Cache;
use std::fmt::Debug;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

use crate::sequences::{seq_md5, TranslationTable};
use biocommons_bioutils::assemblies::{Assembly, ASSEMBLY_INFOS};
Expand Down Expand Up @@ -42,15 +42,15 @@ impl TryFrom<Row> for GeneInfoRecord {
type Error = Error;

fn try_from(row: Row) -> Result<Self, Self::Error> {
let aliases: String = row.try_get("aliases")?;
let aliases: String = row.try_get("aliases").map_err(Arc::new)?;
let aliases = aliases.split(',').map(|s| s.to_owned()).collect::<Vec<_>>();
Ok(Self {
hgnc: row.try_get("hgnc")?,
maploc: row.try_get("maploc")?,
descr: row.try_get("descr")?,
summary: row.try_get("summary")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
maploc: row.try_get("maploc").map_err(Arc::new)?,
descr: row.try_get("descr").map_err(Arc::new)?,
summary: row.try_get("summary").map_err(Arc::new)?,
aliases,
added: row.try_get("added")?,
added: row.try_get("added").map_err(Arc::new)?,
})
}
}
Expand All @@ -60,8 +60,8 @@ impl TryFrom<Row> for TxSimilarityRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac1: row.try_get("tx_ac1")?,
tx_ac2: row.try_get("tx_ac2")?,
tx_ac1: row.try_get("tx_ac1").map_err(Arc::new)?,
tx_ac2: row.try_get("tx_ac2").map_err(Arc::new)?,
hgnc_eq: row.try_get("hgnc_eq").unwrap_or(false),
cds_eq: row.try_get("cds_eq").unwrap_or(false),
es_fp_eq: row.try_get("es_fp_eq").unwrap_or(false),
Expand All @@ -76,24 +76,24 @@ impl TryFrom<Row> for TxExonsRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
hgnc: row.try_get("hgnc")?,
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
alt_strand: row.try_get("alt_strand")?,
ord: row.try_get("ord")?,
tx_start_i: row.try_get("tx_start_i")?,
tx_end_i: row.try_get("tx_end_i")?,
alt_start_i: row.try_get("alt_start_i")?,
alt_end_i: row.try_get("alt_end_i")?,
cigar: row.try_get("cigar")?,
tx_aseq: row.try_get("tx_aseq")?,
alt_aseq: row.try_get("alt_aseq")?,
tx_exon_set_id: row.try_get("tx_exon_set_id")?,
alt_exon_set_id: row.try_get("alt_exon_set_id")?,
tx_exon_id: row.try_get("tx_exon_id")?,
alt_exon_id: row.try_get("alt_exon_id")?,
exon_aln_id: row.try_get("exon_aln_id")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
alt_strand: row.try_get("alt_strand").map_err(Arc::new)?,
ord: row.try_get("ord").map_err(Arc::new)?,
tx_start_i: row.try_get("tx_start_i").map_err(Arc::new)?,
tx_end_i: row.try_get("tx_end_i").map_err(Arc::new)?,
alt_start_i: row.try_get("alt_start_i").map_err(Arc::new)?,
alt_end_i: row.try_get("alt_end_i").map_err(Arc::new)?,
cigar: row.try_get("cigar").map_err(Arc::new)?,
tx_aseq: row.try_get("tx_aseq").map_err(Arc::new)?,
alt_aseq: row.try_get("alt_aseq").map_err(Arc::new)?,
tx_exon_set_id: row.try_get("tx_exon_set_id").map_err(Arc::new)?,
alt_exon_set_id: row.try_get("alt_exon_set_id").map_err(Arc::new)?,
tx_exon_id: row.try_get("tx_exon_id").map_err(Arc::new)?,
alt_exon_id: row.try_get("alt_exon_id").map_err(Arc::new)?,
exon_aln_id: row.try_get("exon_aln_id").map_err(Arc::new)?,
})
}
}
Expand All @@ -103,12 +103,12 @@ impl TryFrom<Row> for TxForRegionRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_strand: row.try_get("alt_strand")?,
alt_aln_method: row.try_get("alt_aln_method")?,
start_i: row.try_get("start_i")?,
end_i: row.try_get("end_i")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_strand: row.try_get("alt_strand").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
start_i: row.try_get("start_i").map_err(Arc::new)?,
end_i: row.try_get("end_i").map_err(Arc::new)?,
})
}
}
Expand All @@ -126,15 +126,15 @@ impl TryFrom<Row> for TxIdentityInfo {
type Error = Error;

fn try_from(row: Row) -> Result<Self, Self::Error> {
let hgnc = row.try_get("hgnc")?;
let hgnc = row.try_get("hgnc").map_err(Arc::new)?;
let is_selenoprotein = SELENOPROTEIN_SYMBOLS.contains(&hgnc);
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
cds_start_i: row.try_get("cds_start_i")?,
cds_end_i: row.try_get("cds_end_i")?,
lengths: row.try_get("lengths")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
cds_start_i: row.try_get("cds_start_i").map_err(Arc::new)?,
cds_end_i: row.try_get("cds_end_i").map_err(Arc::new)?,
lengths: row.try_get("lengths").map_err(Arc::new)?,
hgnc: hgnc.to_string(),
// UTA database does not support selenoproteins (yet).
translation_table: if is_selenoprotein {
Expand All @@ -151,12 +151,12 @@ impl TryFrom<Row> for TxInfoRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
hgnc: row.try_get("hgnc")?,
cds_start_i: row.try_get("cds_start_i")?,
cds_end_i: row.try_get("cds_end_i")?,
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
hgnc: row.try_get("hgnc").map_err(Arc::new)?,
cds_start_i: row.try_get("cds_start_i").map_err(Arc::new)?,
cds_end_i: row.try_get("cds_end_i").map_err(Arc::new)?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
})
}
}
Expand All @@ -166,9 +166,9 @@ impl TryFrom<Row> for TxMappingOptionsRecord {

fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(Self {
tx_ac: row.try_get("tx_ac")?,
alt_ac: row.try_get("alt_ac")?,
alt_aln_method: row.try_get("alt_aln_method")?,
tx_ac: row.try_get("tx_ac").map_err(Arc::new)?,
alt_ac: row.try_get("alt_ac").map_err(Arc::new)?,
alt_aln_method: row.try_get("alt_aln_method").map_err(Arc::new)?,
})
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ impl Debug for Provider {
impl Provider {
pub fn with_config(config: &Config) -> Result<Self, Error> {
let config = config.clone();
let conn = Mutex::new(Client::connect(&config.db_url, NoTls)?);
let conn = Mutex::new(Client::connect(&config.db_url, NoTls).map_err(Arc::new)?);
let schema_version = Self::fetch_schema_version(
&mut conn.lock().expect("cannot obtain connection lock"),
&config.db_schema,
Expand All @@ -246,7 +246,7 @@ impl Provider {

fn fetch_schema_version(conn: &mut Client, db_schema: &str) -> Result<String, Error> {
let sql = format!("select key, value from {db_schema}.meta where key = 'schema_version'");
let row = conn.query_one(&sql, &[])?;
let row = conn.query_one(&sql, &[]).map_err(Arc::new)?;
Ok(row.get("value"))
}
}
Expand Down Expand Up @@ -282,7 +282,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&hgnc])?
.query_one(&sql, &[&hgnc])
.map_err(Arc::new)?
.try_into()?;

self.caches
Expand All @@ -305,11 +306,12 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?)
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?)
.into_iter()
.next()
{
let result = Some(row.try_get("pro_ac")?);
let result = Some(row.try_get("pro_ac").map_err(Arc::new)?);
self.caches
.get_pro_ac_for_tx_ac
.insert(tx_ac.to_string(), None);
Expand Down Expand Up @@ -337,8 +339,10 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&ac])?
.try_get("seq_id")?;
.query_one(&sql, &[&ac])
.map_err(Arc::new)?
.try_get("seq_id")
.map_err(Arc::new)?;

let sql = format!(
"SELECT seq FROM {}.seq WHERE seq_id = $1",
Expand All @@ -348,8 +352,10 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&seq_id])?
.try_get("seq")?;
.query_one(&sql, &[&seq_id])
.map_err(Arc::new)?
.try_get("seq")
.map_err(Arc::new)?;

let begin = begin.unwrap_or_default();
let end = end
Expand All @@ -373,7 +379,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&md5])?
.query(&sql, &[&md5])
.map_err(Arc::new)?
{
result.push(row.get(0));
}
Expand Down Expand Up @@ -402,7 +409,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -439,7 +447,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])?
.query(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -473,7 +482,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&gene])?
.query(&sql, &[&gene])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down Expand Up @@ -517,7 +527,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&alt_ac, &start_i, &end_i])?
.query(&sql, &[&alt_ac, &start_i, &end_i])
.map_err(Arc::new)?
{
let record: TxForRegionRecord = row.try_into()?;
// NB: The original Python code did not use alt_aln_method in the query either.
Expand Down Expand Up @@ -547,7 +558,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&tx_ac])?
.query_one(&sql, &[&tx_ac])
.map_err(Arc::new)?
.try_into()?;

self.caches
Expand Down Expand Up @@ -583,7 +595,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query_one(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])?
.query_one(&sql, &[&tx_ac, &alt_ac, &alt_aln_method])
.map_err(Arc::new)?
.try_into()?;

self.caches.get_tx_info.insert(key, result.clone());
Expand All @@ -607,7 +620,8 @@ impl interface::Provider for Provider {
.conn
.lock()
.expect("cannot obtain connection lock")
.query(&sql, &[&tx_ac])?
.query(&sql, &[&tx_ac])
.map_err(Arc::new)?
{
result.push(row.try_into()?);
}
Expand Down
Loading
Loading