Skip to content

Commit

Permalink
feat: adding by-accession colum family for clinvar-minimal (#289) (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored Nov 16, 2023
1 parent c1ebece commit 807abaf
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 20 deletions.
101 changes: 101 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ prost-build = "0.12"
insta = { version = "1.33", features = ["yaml"] }
log = "0.4"
pretty_assertions = "1.4"
rstest = "0.18.2"
temp_testdir = "0.2"
test-log = "0.2"
tracing-subscriber = "0.3"
Expand Down
12 changes: 10 additions & 2 deletions src/clinvar_minimal/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Args {
/// Name of the column family to import into.
#[arg(long, default_value = "clinvar")]
pub cf_name: String,
/// Name of the column family for accession lookup.
#[arg(long, default_value = "clinvar_by_accession")]
pub cf_name_by_accession: String,
/// Optional path to RocksDB WAL directory.
#[arg(long)]
pub path_wal_dir: Option<String>,
Expand All @@ -39,6 +42,7 @@ fn jsonl_import(
args: &Args,
) -> Result<(), anyhow::Error> {
let cf_data = db.cf_handle(&args.cf_name).unwrap();
let cf_by_accession = db.cf_handle(&args.cf_name_by_accession).unwrap();

// Open reader, possibly decompressing gziped files.
let reader: Box<dyn std::io::Read> = if args.path_in_jsonl.ends_with(".gz") {
Expand Down Expand Up @@ -103,6 +107,9 @@ fn jsonl_import(
continue;
}
Ok(data) => {
db.put_cf(&cf_by_accession, rcv.as_bytes(), &key)?;
db.put_cf(&cf_by_accession, vcv.as_bytes(), &key)?;

let record = if let Some(data) = data {
let mut record =
crate::pbs::annonars::clinvar::v1::minimal::Record::decode(&data[..])?;
Expand Down Expand Up @@ -136,7 +143,7 @@ fn jsonl_import(
}
};
let buf = record.encode_to_vec();
db.put_cf(&cf_data, key, buf)?;
db.put_cf(&cf_data, &key, &buf)?;
}
}
}
Expand All @@ -158,7 +165,7 @@ pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error>
rocksdb::Options::default(),
args.path_wal_dir.as_ref().map(|s| s.as_ref()),
);
let cf_names = &["meta", &args.cf_name];
let cf_names = &["meta", &args.cf_name, &args.cf_name_by_accession];
let db = Arc::new(rocksdb::DB::open_cf_with_opts(
&options,
common::readlink_f(&args.path_out_rocksdb)?,
Expand Down Expand Up @@ -219,6 +226,7 @@ mod test {
path_in_jsonl: String::from("tests/clinvar-minimal/clinvar-seqvars-grch37-tgds.jsonl"),
path_out_rocksdb: format!("{}", tmp_dir.join("out-rocksdb").display()),
cf_name: String::from("clinvar"),
cf_name_by_accession: String::from("clinvar_by_accession"),
path_wal_dir: None,
};

Expand Down
71 changes: 68 additions & 3 deletions src/clinvar_minimal/cli/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct Args {
/// Name of the column family to import into.
#[arg(long, default_value = "clinvar")]
pub cf_name: String,
/// Name of the column family for accession lookup.
#[arg(long, default_value = "clinvar_by_accession")]
pub cf_name_by_accession: String,
/// Output file (default is stdout == "-").
#[arg(long, default_value = "-")]
pub out_file: String,
Expand All @@ -43,10 +46,11 @@ pub fn open_rocksdb<P: AsRef<std::path::Path>>(
path_rocksdb: P,
cf_data: &str,
cf_meta: &str,
cf_by_accession: &str,
) -> Result<(Arc<rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>>, Meta), anyhow::Error> {
tracing::info!("Opening RocksDB database ...");
let before_open = std::time::Instant::now();
let cf_names = &[cf_meta, cf_data];
let cf_names = &[cf_meta, cf_data, cf_by_accession];
let db = Arc::new(rocksdb::DB::open_cf_for_read_only(
&rocksdb::Options::default(),
common::readlink_f(&path_rocksdb)?,
Expand Down Expand Up @@ -78,7 +82,12 @@ pub fn open_rocksdb<P: AsRef<std::path::Path>>(
pub fn open_rocksdb_from_args(
args: &Args,
) -> Result<(Arc<rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>>, Meta), anyhow::Error> {
open_rocksdb(&args.path_rocksdb, &args.cf_name, "meta")
open_rocksdb(
&args.path_rocksdb,
&args.cf_name,
"meta",
&args.cf_name_by_accession,
)
}

fn print_record(
Expand Down Expand Up @@ -125,6 +134,36 @@ pub fn query_for_variant(
.transpose()
}

/// Query for a single variant by accession.
pub fn query_for_accession(
accession: &str,
db: &rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
cf_data: &Arc<rocksdb::BoundColumnFamily>,
cf_data_by_rsid: &Arc<rocksdb::BoundColumnFamily>,
) -> Result<Option<crate::pbs::annonars::clinvar::v1::minimal::Record>, anyhow::Error> {
let accession = accession.to_uppercase(); // VCV*, RCV*

// First, lookup accession.
let var_key = db
.get_cf(cf_data_by_rsid, accession.clone())
.map_err(|e| anyhow::anyhow!("error while querying for accession {}: {}", &accession, e))?
.ok_or_else(|| anyhow::anyhow!("no record found for accession {}", &accession))?;

// Execute query for key.
let raw_value = db
.get_cf(cf_data, var_key.clone())
.map_err(|e| anyhow::anyhow!("error while querying for variant {:?}: {}", &var_key, e))?;
raw_value
.map(|raw_value| {
// Decode via prost.
crate::pbs::annonars::clinvar::v1::minimal::Record::decode(&mut std::io::Cursor::new(
&raw_value,
))
.map_err(|e| anyhow::anyhow!("failed to decode record: {}", e))
})
.transpose()
}

/// Implementation of `tsv query` sub command.
pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("Starting 'gnomad-mtdna query' command");
Expand All @@ -133,6 +172,7 @@ pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error>

let (db, meta) = open_rocksdb_from_args(args)?;
let cf_data = db.cf_handle(&args.cf_name).unwrap();
let cf_by_accession = db.cf_handle(&args.cf_name_by_accession).unwrap();

// Obtain writer to output.
let mut out_writer = match args.out_file.as_ref() {
Expand All @@ -145,7 +185,13 @@ pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error>

tracing::info!("Running query...");
let before_query = std::time::Instant::now();
if let Some(variant) = args.query.variant.as_ref() {
if let Some(accession) = args.query.accession.as_ref() {
if let Some(record) = query_for_accession(accession, &db, &cf_data, &cf_by_accession)? {
print_record(&mut out_writer, args.out_format, &record)?;
} else {
tracing::info!("no record found for accession {}", accession);
}
} else if let Some(variant) = args.query.variant.as_ref() {
if let Some(record) = query_for_variant(variant, &meta, &db, &cf_data)? {
print_record(&mut out_writer, args.out_format, &record)?;
} else {
Expand Down Expand Up @@ -237,6 +283,7 @@ mod test {
let args = Args {
path_rocksdb: String::from("tests/clinvar-minimal/clinvar-seqvars-grch37-tgds.tsv.db"),
cf_name: String::from("clinvar"),
cf_name_by_accession: String::from("clinvar_by_accession"),
out_file: temp.join("out").to_string_lossy().to_string(),
out_format: common::cli::OutputFormat::Jsonl,
query,
Expand Down Expand Up @@ -348,4 +395,22 @@ mod test {

Ok(())
}

#[rstest::rstest]
#[test]
#[case("RCV001679107")]
#[case("VCV001307216")]
fn smoke_query_by_accession(#[case] accession: &str) -> Result<(), anyhow::Error> {
crate::common::set_snapshot_suffix!("{}", &accession);

let (common, args, _temp) = args(ArgsQuery {
accession: Some(accession.to_string()),
..Default::default()
});
run(&common, &args)?;
let out_data = std::fs::read_to_string(&args.out_file)?;
insta::assert_snapshot!(&out_data);

Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
source: src/clinvar_minimal/cli/query.rs
expression: "&out_data"
---
{"release":"GRCh37","chromosome":"13","start":95235557,"stop":95235557,"reference":"A","alternative":"G","vcv":"VCV001273284","reference_assertions":[{"rcv":"RCV001679107","title":"NM_014305.4(TGDS):c.314-67T>C AND not provided","clinical_significance":4,"review_status":3}]}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
source: src/clinvar_minimal/cli/query.rs
expression: "&out_data"
---
{"release":"GRCh37","chromosome":"13","start":95232181,"stop":95232181,"reference":"A","alternative":"C","vcv":"VCV001307216","reference_assertions":[{"rcv":"RCV001760634","title":"NM_014305.4(TGDS):c.582T>G (p.Ser194Arg) AND not provided","clinical_significance":2,"review_status":3}]}

12 changes: 12 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,15 @@ where
{
_readlink_f(path.as_ref(), 20)
}

/// Allows to set the rstest snapshot suffix.
#[macro_export]
macro_rules! set_snapshot_suffix {
($($expr:expr),*) => {
let mut settings = insta::Settings::clone_current();
settings.set_snapshot_suffix(format!($($expr,)*));
let _guard = settings.bind_to_scope();
}
}

pub use set_snapshot_suffix;

This file was deleted.

Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Loading

0 comments on commit 807abaf

Please sign in to comment.