diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..89e8eebf --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +tests/tsv/example/*.db/** filter=lfs diff=lfs merge=lfs -text diff --git a/.gitignore b/.gitignore index aeb027da..10f29268 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.vscode + /target /Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index c8ffde09..4e5feba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ byteorder = "1.4.3" log = { version = "0.4", optional = true} rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] } serde = { version = "1.0.163", features = ["derive"] } -serde_json = "1.0.96" +serde_json = { version = "1.0.96", features=["preserve_order"] } thiserror = "1.0" tracing = "0.1" tracing-subscriber = {version = "0.3", optional = true} diff --git a/src/common/cli.rs b/src/common/cli.rs index cf207d17..6b3d5f08 100644 --- a/src/common/cli.rs +++ b/src/common/cli.rs @@ -8,6 +8,14 @@ pub struct Args { pub verbose: clap_verbosity_flag::Verbosity, } +/// Output format to write. +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, clap::ValueEnum, strum::Display)] +#[strum(serialize_all = "lowercase")] +pub enum OutputFormat { + /// JSONL format. + Jsonl, +} + /// Local genome release for command line arguments. #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, clap::ValueEnum, strum::Display)] #[strum(serialize_all = "lowercase")] diff --git a/src/common/keys.rs b/src/common/keys.rs index 815e4cd5..0ff2c83c 100644 --- a/src/common/keys.rs +++ b/src/common/keys.rs @@ -10,6 +10,11 @@ pub struct Pos { } impl Pos { + /// Create new position. + pub fn new(chrom: String, pos: i32) -> Self { + Self { chrom, pos } + } + /// Create from the given chrom/pos pair. pub fn from(chrom: &str, pos: i32) -> Self { Pos { @@ -30,6 +35,20 @@ impl From for Vec { } } +impl From<&[u8]> for Pos { + fn from(value: &[u8]) -> Self { + let chrom = chrom_key_to_name(&value[0..2]); + let pos = i32::from_be_bytes(value[2..6].try_into().unwrap()); + Self { chrom, pos } + } +} + +impl From for Pos { + fn from(other: super::spdi::Pos) -> Self { + Self::new(other.sequence, other.position) + } +} + /// A chromosomal change `CHROM-POS-REF-ALT`. #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct Var { @@ -44,6 +63,16 @@ pub struct Var { } impl Var { + /// Create new VCF-style variant. + pub fn new(chrom: String, pos: i32, reference: String, alternative: String) -> Self { + Self { + chrom, + pos, + reference, + alternative, + } + } + /// Create from the given VCF-style variant. pub fn from(chrom: &str, pos: i32, reference: &str, alternative: &str) -> Self { Self { @@ -68,6 +97,17 @@ impl From for Vec { } } +impl From for Var { + fn from(other: super::spdi::Var) -> Self { + Self::new( + other.sequence, + other.position, + other.deletion, + other.insertion, + ) + } +} + /// Convert chromosome to key in RocksDB. pub fn chrom_name_to_key(name: &str) -> String { let chrom = if let Some(stripped) = name.strip_prefix("chr") { @@ -92,12 +132,16 @@ pub fn chrom_name_to_key(name: &str) -> String { } /// Convert from RocksDB chromosome key part to chromosome name. -pub fn chrom_key_to_name(key: &str) -> String { +pub fn chrom_key_to_name(key: &[u8]) -> String { assert!(key.len() == 2); - if key.starts_with('0') || key.starts_with(' ') { - key[1..].to_string() + if key.starts_with(b"0") || key.starts_with(b" ") { + std::str::from_utf8(&key[1..]) + .expect("could not decode UTF-8") + .to_string() } else { - key.to_string() + std::str::from_utf8(&key) + .expect("could not decode UTF-8") + .to_string() } } @@ -147,10 +191,10 @@ mod test { #[test] fn test_chrom_key_to_name() { - assert_eq!(chrom_key_to_name("01"), "1"); - assert_eq!(chrom_key_to_name("21"), "21"); - assert_eq!(chrom_key_to_name(" X"), "X"); - assert_eq!(chrom_key_to_name(" Y"), "Y"); - assert_eq!(chrom_key_to_name("MT"), "MT"); + assert_eq!(chrom_key_to_name(b"01"), "1"); + assert_eq!(chrom_key_to_name(b"21"), "21"); + assert_eq!(chrom_key_to_name(b" X"), "X"); + assert_eq!(chrom_key_to_name(b" Y"), "Y"); + assert_eq!(chrom_key_to_name(b"MT"), "MT"); } } diff --git a/src/common/mod.rs b/src/common/mod.rs index 92f0b4ff..1055cd40 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -3,221 +3,5 @@ #[cfg(feature = "cli")] pub mod cli; pub mod keys; - -/// Utility code for rocksdb access. -pub mod rocks_utils { - use std::{path::Path, time::Instant}; - - use crate::error; - - /// Tune `RocksDB` options for bulk insertion. - /// - /// Example: - /// - /// ``` - /// use annonars::common::rocks_utils::tune_options; - /// - /// let options = tune_options(rocksdb::Options::default(), None); - /// ``` - /// - /// # Arguments - /// - /// * `options` - `RocksDB` options to tune. - /// * `wal_dir` - Optional directory for write-ahead log files. - /// - /// # Returns - /// - /// Tuned `RocksDB` options. - pub fn tune_options(options: rocksdb::Options, wal_dir: Option<&str>) -> rocksdb::Options { - let mut options = options; - - options.create_if_missing(true); - options.create_missing_column_families(true); - - options.prepare_for_bulk_load(); - - // compress all files with Zstandard - options.set_compression_per_level(&[]); - options.set_compression_type(rocksdb::DBCompressionType::Zstd); - // We only want to set level to 2 but have to set the rest as well using the - // Rust interface. The (default) values for the other levels were taken from - // the output of a RocksDB output folder created with default settings. - options.set_compression_options(-14, 2, 0, 0); - // options.set_zstd_max_train_bytes(100 * 1024); - - options.set_max_background_jobs(16); - options.set_max_subcompactions(8); - options.increase_parallelism(8); - options.optimize_level_style_compaction(1 << 30); - options.set_min_write_buffer_number(1); - options.set_min_write_buffer_number_to_merge(1); - options.set_write_buffer_size(1 << 30); - options.set_target_file_size_base(1 << 30); - options.set_compaction_style(rocksdb::DBCompactionStyle::Universal); - - if let Some(wal_dir) = wal_dir { - options.set_wal_dir(wal_dir); - } - - options.set_bottommost_compression_options(-14, 3, 0, 1 << 14, true); - options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); - options.set_bottommost_zstd_max_train_bytes(1 << 22, true); - - options.set_disable_auto_compactions(true); - options.optimize_for_point_lookup(1 << 26); - - options - } - - /// Force manual compaction of all column families at the given path. - /// - /// This function will enumerate all column families and start a compaction of all of them. - /// It will then wait for the completion of all such compactions. - /// - /// # Arguments - /// - /// * `path` - Path to the `RocksDB` database. - /// * `options` - `RocksDB` options to use for opening database and column families. - /// * `wait_msg_prefix` - Optional prefix for the wait message. - pub fn force_compaction

( - path: P, - options: &rocksdb::Options, - wait_msg_prefix: Option<&str>, - ) -> Result<(), error::Error> - where - P: AsRef, - { - let cf_names = rocksdb::DB::list_cf(options, path.as_ref()) - .map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?; - let cfs = cf_names - .iter() - .map(|s| (s, options.clone())) - .collect::>(); - let db = rocksdb::DB::open_cf_with_opts(options, path.as_ref(), cfs) - .map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?; - - let cf_names_str = cf_names - .iter() - .map(std::string::String::as_str) - .collect::>(); - force_compaction_cf(&db, cf_names_str, wait_msg_prefix) - } - - /// Force manual compaction of the given column families in the given database. - /// - /// The function will enforce compaction of the bottommost level of all column families. - /// The compression will depend on the options that the database was opened with. Using the - /// `tune_options` function is recommended to optimize the resulting database. - /// - /// # Arguments - /// - /// * `db` - `RocksDB` database to compact. - /// * `cf_names` - Names of the column families to compact. - /// * `wait_msg_prefix` - Optional prefix for the wait message. - pub fn force_compaction_cf( - db: &rocksdb::DBWithThreadMode, - cf_names: I, - wait_msg_prefix: Option<&str>, - ) -> Result<(), error::Error> - where - I: IntoIterator, - N: AsRef, - { - // Collect columns families to run compaction for. - let cfs = cf_names - .into_iter() - .map(|cf| db.cf_handle(cf.as_ref()).unwrap()) - .collect::>(); - - // Create compaction options and enforce bottommost level compaction. - let mut compact_opt = rocksdb::CompactOptions::default(); - compact_opt.set_exclusive_manual_compaction(true); - compact_opt.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force); - - // Start the compaction for each column family. - cfs.iter() - .for_each(|cf| db.compact_range_cf_opt(cf, None::<&[u8]>, None::<&[u8]>, &compact_opt)); - let compaction_start = Instant::now(); - let mut last_logged = compaction_start; - - // Wait until all compactions are done. - while db - .property_int_value(rocksdb::properties::COMPACTION_PENDING) - .map_err(error::Error::RocksDBProperty)? - .unwrap() - > 0 - || db - .property_int_value(rocksdb::properties::NUM_RUNNING_COMPACTIONS) - .map_err(error::Error::RocksDBProperty)? - .unwrap() - > 0 - { - std::thread::sleep(std::time::Duration::from_millis(100)); - // Log to info every second that compaction is still running. - if let Some(wait_msg_prefix) = wait_msg_prefix { - if last_logged.elapsed() > std::time::Duration::from_millis(1000) { - tracing::info!( - "{}still waiting for RocksDB compaction (since {:?})", - wait_msg_prefix, - compaction_start.elapsed() - ); - last_logged = Instant::now(); - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod test { - use temp_testdir::TempDir; - - use super::rocks_utils; - - /// Smoke test for the `rocks_utils::tune_options` function. - #[test] - fn smoke_test_tune_options() -> Result<(), anyhow::Error> { - let options = rocksdb::Options::default(); - let _tuned = rocks_utils::tune_options(options, None); - - Ok(()) - } - - /// Smoke test for the `rocks_utils::force_compaction` function. - #[test] - fn smoke_test_force_compaction() -> Result<(), anyhow::Error> { - let temp = TempDir::default(); - let path_db = temp.join("rocksdb"); - - let mut options = rocksdb::Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - { - let cf_names = &["foo", "bar"]; - let _db = rocksdb::DB::open_cf(&options, &path_db, cf_names)?; - } - - rocks_utils::force_compaction(&path_db, &options, Some("msg"))?; - - Ok(()) - } - - /// Smoke test for the `rocks_utils::force_compaction` function. - #[test] - fn force_compaction_cf() -> Result<(), anyhow::Error> { - let temp = TempDir::default(); - let path_db = temp.join("rocksdb"); - - let mut options = rocksdb::Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - let cf_names = &["foo", "bar"]; - let db = rocksdb::DB::open_cf(&options, path_db, cf_names)?; - - rocks_utils::force_compaction_cf(&db, cf_names, Some("msg"))?; - - Ok(()) - } -} +pub mod rocks_utils; +pub mod spdi; diff --git a/src/common/rocks_utils.rs b/src/common/rocks_utils.rs new file mode 100644 index 00000000..901212d6 --- /dev/null +++ b/src/common/rocks_utils.rs @@ -0,0 +1,216 @@ +//! Utility code for rocksdb access. + +use std::{path::Path, time::Instant}; + +use crate::error; + +/// Tune `RocksDB` options for bulk insertion. +/// +/// Example: +/// +/// ``` +/// use annonars::common::rocks_utils::tune_options; +/// +/// let options = tune_options(rocksdb::Options::default(), None); +/// ``` +/// +/// # Arguments +/// +/// * `options` - `RocksDB` options to tune. +/// * `wal_dir` - Optional directory for write-ahead log files. +/// +/// # Returns +/// +/// Tuned `RocksDB` options. +pub fn tune_options(options: rocksdb::Options, wal_dir: Option<&str>) -> rocksdb::Options { + let mut options = options; + + options.create_if_missing(true); + options.create_missing_column_families(true); + + options.prepare_for_bulk_load(); + + // compress all files with Zstandard + options.set_compression_per_level(&[]); + options.set_compression_type(rocksdb::DBCompressionType::Zstd); + // We only want to set level to 2 but have to set the rest as well using the + // Rust interface. The (default) values for the other levels were taken from + // the output of a RocksDB output folder created with default settings. + options.set_compression_options(-14, 2, 0, 0); + // options.set_zstd_max_train_bytes(100 * 1024); + + options.set_max_background_jobs(16); + options.set_max_subcompactions(8); + options.increase_parallelism(8); + options.optimize_level_style_compaction(1 << 30); + options.set_min_write_buffer_number(1); + options.set_min_write_buffer_number_to_merge(1); + options.set_write_buffer_size(1 << 30); + options.set_target_file_size_base(1 << 30); + options.set_compaction_style(rocksdb::DBCompactionStyle::Universal); + + if let Some(wal_dir) = wal_dir { + options.set_wal_dir(wal_dir); + } + + options.set_bottommost_compression_options(-14, 3, 0, 1 << 14, true); + options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + options.set_bottommost_zstd_max_train_bytes(1 << 22, true); + + options.set_disable_auto_compactions(true); + options.optimize_for_point_lookup(1 << 26); + + options +} + +/// Force manual compaction of all column families at the given path. +/// +/// This function will enumerate all column families and start a compaction of all of them. +/// It will then wait for the completion of all such compactions. +/// +/// # Arguments +/// +/// * `path` - Path to the `RocksDB` database. +/// * `options` - `RocksDB` options to use for opening database and column families. +/// * `wait_msg_prefix` - Optional prefix for the wait message. +pub fn force_compaction

( + path: P, + options: &rocksdb::Options, + wait_msg_prefix: Option<&str>, +) -> Result<(), error::Error> +where + P: AsRef, +{ + let cf_names = rocksdb::DB::list_cf(options, path.as_ref()) + .map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?; + let cfs = cf_names + .iter() + .map(|s| (s, options.clone())) + .collect::>(); + let db = rocksdb::DB::open_cf_with_opts(options, path.as_ref(), cfs) + .map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?; + + let cf_names_str = cf_names + .iter() + .map(std::string::String::as_str) + .collect::>(); + force_compaction_cf(&db, cf_names_str, wait_msg_prefix) +} + +/// Force manual compaction of the given column families in the given database. +/// +/// The function will enforce compaction of the bottommost level of all column families. +/// The compression will depend on the options that the database was opened with. Using the +/// `tune_options` function is recommended to optimize the resulting database. +/// +/// # Arguments +/// +/// * `db` - `RocksDB` database to compact. +/// * `cf_names` - Names of the column families to compact. +/// * `wait_msg_prefix` - Optional prefix for the wait message. +pub fn force_compaction_cf( + db: &rocksdb::DBWithThreadMode, + cf_names: I, + wait_msg_prefix: Option<&str>, +) -> Result<(), error::Error> +where + I: IntoIterator, + N: AsRef, +{ + // Collect columns families to run compaction for. + let cfs = cf_names + .into_iter() + .map(|cf| db.cf_handle(cf.as_ref()).unwrap()) + .collect::>(); + + // Create compaction options and enforce bottommost level compaction. + let mut compact_opt = rocksdb::CompactOptions::default(); + compact_opt.set_exclusive_manual_compaction(true); + compact_opt.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force); + + // Start the compaction for each column family. + cfs.iter() + .for_each(|cf| db.compact_range_cf_opt(cf, None::<&[u8]>, None::<&[u8]>, &compact_opt)); + let compaction_start = Instant::now(); + let mut last_logged = compaction_start; + + // Wait until all compactions are done. + while db + .property_int_value(rocksdb::properties::COMPACTION_PENDING) + .map_err(error::Error::RocksDBProperty)? + .unwrap() + > 0 + || db + .property_int_value(rocksdb::properties::NUM_RUNNING_COMPACTIONS) + .map_err(error::Error::RocksDBProperty)? + .unwrap() + > 0 + { + std::thread::sleep(std::time::Duration::from_millis(100)); + // Log to info every second that compaction is still running. + if let Some(wait_msg_prefix) = wait_msg_prefix { + if last_logged.elapsed() > std::time::Duration::from_millis(1000) { + tracing::info!( + "{}still waiting for RocksDB compaction (since {:?})", + wait_msg_prefix, + compaction_start.elapsed() + ); + last_logged = Instant::now(); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod test { + use temp_testdir::TempDir; + + use super::*; + + /// Smoke test for the `tune_options` function. + #[test] + fn smoke_test_tune_options() -> Result<(), anyhow::Error> { + let options = rocksdb::Options::default(); + let _tuned = tune_options(options, None); + + Ok(()) + } + + /// Smoke test for the `force_compaction` function. + #[test] + fn smoke_test_force_compaction() -> Result<(), anyhow::Error> { + let temp = TempDir::default(); + let path_db = temp.join("rocksdb"); + + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + { + let cf_names = &["foo", "bar"]; + let _db = rocksdb::DB::open_cf(&options, &path_db, cf_names)?; + } + + force_compaction(&path_db, &options, Some("msg"))?; + + Ok(()) + } + + /// Smoke test for the `force_compaction` function. + #[test] + fn smoke_test_force_compaction_cf() -> Result<(), anyhow::Error> { + let temp = TempDir::default(); + let path_db = temp.join("rocksdb"); + + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + let cf_names = &["foo", "bar"]; + let db = rocksdb::DB::open_cf(&options, path_db, cf_names)?; + + force_compaction_cf(&db, cf_names, Some("msg"))?; + + Ok(()) + } +} diff --git a/src/common/spdi.rs b/src/common/spdi.rs new file mode 100644 index 00000000..3b4ae92b --- /dev/null +++ b/src/common/spdi.rs @@ -0,0 +1,284 @@ +//! Variants and coordinates in SPDI format. +//! +//! Also see: +//! +//! - Holmes JB, Moyer E, Phan L, Maglott D, Kattman B. [SPDI: data model for variants +//! and applications at NCBI](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7523648/). +//! Bioinformatics. 2020 Mar 1;36(6):1902-1907. + +use std::{fmt::Display, str::FromStr}; + +/// A variant in in SPDI format. +/// +/// The SPDI format is described in [Holmes et al. +/// 2020](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7523648/). +/// +/// Note that the format uses 1-based positions and VCF-style allele strings. +/// +/// # Example +/// +/// ``` +/// use std::str::FromStr; +/// use annonars::common::spdi::Var; +/// +/// let var = Var::from_str("NC_000001.11:1000:G:A").unwrap(); +/// assert_eq!(format!("{}", &var), "NC_000001.11:1000:G:A"); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Var { + /// Sequence identifier. + pub sequence: String, + /// Position information. + pub position: i32, + /// Deletion base string. + pub deletion: String, + /// Insertion base string. + pub insertion: String, +} + +impl Var { + /// Create a new variant. + pub fn new(sequence: String, position: i32, deletion: String, insertion: String) -> Self { + Self { + sequence, + position, + deletion, + insertion, + } + } +} + +impl FromStr for Var { + type Err = anyhow::Error; + + fn from_str(spdi: &str) -> Result { + let mut parts = spdi.rsplitn(4, ':'); + let insertion = parts.next().unwrap().to_string(); + let deletion = parts.next().unwrap().to_string(); + let position = parts + .next() + .unwrap() + .parse::() + .map_err(|e| anyhow::anyhow!("Could not parse position: {}", e))?; + let sequence = parts.next().unwrap().to_string(); + Ok(Self { + sequence, + position, + deletion, + insertion, + }) + } +} + +impl Display for Var { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}:{}:{}", + self.sequence, self.position, self.deletion, self.insertion + ) + } +} + +/// A SPDI-style position. +/// +/// # Example +/// +/// ``` +/// use std::str::FromStr; +/// use annonars::common::spdi::Pos; +/// +/// let pos = Pos::from_str("NC_000001.11:1000").unwrap(); +/// assert_eq!(format!("{}", &pos), "NC_000001.11:1000"); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Pos { + /// Sequence identifier. + pub sequence: String, + /// Position information. + pub position: i32, +} + +impl Pos { + /// Create a new position. + pub fn new(sequence: String, position: i32) -> Self { + Self { sequence, position } + } +} + +impl FromStr for Pos { + type Err = anyhow::Error; + + fn from_str(spdi: &str) -> Result { + let mut parts = spdi.rsplitn(2, ':'); + let position = parts + .next() + .unwrap() + .parse::() + .map_err(|e| anyhow::anyhow!("Could not parse position: {}", e))?; + let sequence = parts.next().unwrap().to_string(); + Ok(Self { sequence, position }) + } +} + +impl Display for Pos { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.sequence, self.position) + } +} + +/// A SPDI-style range. +/// +/// The range is inclusive of the 1-based start and end positions. +/// +/// # Example +/// +/// ``` +/// use std::str::FromStr; +/// use annonars::common::spdi::Range; +/// +/// let range = Range::from_str("NC_000001.11:1000:2000").unwrap(); +/// assert_eq!(format!("{}", &range), "NC_000001.11:1000:2000"); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Range { + /// Sequence identifier. + pub sequence: String, + /// Start position. + pub start: i32, + /// End position. + pub end: i32, +} + +impl Range { + /// Create a new range. + pub fn new(sequence: String, start: i32, end: i32) -> Self { + Self { + sequence, + start, + end, + } + } +} + +impl FromStr for Range { + type Err = anyhow::Error; + + fn from_str(spdi: &str) -> Result { + let mut parts = spdi.rsplitn(3, ':'); + let end = parts + .next() + .unwrap() + .parse::() + .map_err(|e| anyhow::anyhow!("Could not parse end position: {}", e))?; + let start = parts + .next() + .unwrap() + .parse::() + .map_err(|e| anyhow::anyhow!("Could not parse start position: {}", e))?; + let sequence = parts.next().unwrap().to_string(); + Ok(Self { + sequence, + start, + end, + }) + } +} + +impl Display for Range { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}:{}", self.sequence, self.start, self.end) + } +} + +impl Into<(Pos, Pos)> for Range { + fn into(self) -> (Pos, Pos) { + ( + Pos::new(self.sequence.clone(), self.start), + Pos::new(self.sequence, self.end), + ) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use pretty_assertions::assert_eq; + + #[test] + fn var_new() { + let var = Var::new( + String::from("NC_000001.11"), + 123, + String::from("A"), + String::from("T"), + ); + assert_eq!(var.sequence, "NC_000001.11"); + assert_eq!(var.position, 123); + assert_eq!(var.deletion, "A"); + assert_eq!(var.insertion, "T"); + } + + #[test] + fn var_from_str() { + let var = Var::from_str("NC_000001.11:123:A:T").unwrap(); + assert_eq!(var.sequence, "NC_000001.11"); + assert_eq!(var.position, 123); + assert_eq!(var.deletion, "A"); + assert_eq!(var.insertion, "T"); + } + + #[test] + fn var_display() { + let var = Var::new( + String::from("NC_000001.11"), + 123, + String::from("A"), + String::from("T"), + ); + assert_eq!(var.to_string(), "NC_000001.11:123:A:T"); + } + + #[test] + fn pos_new() { + let pos = Pos::new(String::from("NC_000001.11"), 123); + assert_eq!(pos.sequence, "NC_000001.11"); + assert_eq!(pos.position, 123); + } + + #[test] + fn pos_from_str() { + let pos = Pos::from_str("NC_000001.11:123").unwrap(); + assert_eq!(pos.sequence, "NC_000001.11"); + assert_eq!(pos.position, 123); + } + + #[test] + fn pos_display() { + let pos = Pos::new(String::from("NC_000001.11"), 123); + assert_eq!(pos.to_string(), "NC_000001.11:123"); + } + + #[test] + fn range_new() { + let range = Range::new(String::from("NC_000001.11"), 123, 456); + assert_eq!(range.sequence, "NC_000001.11"); + assert_eq!(range.start, 123); + assert_eq!(range.end, 456); + } + + #[test] + fn range_from_str() { + let range = Range::from_str("NC_000001.11:123:456").unwrap(); + assert_eq!(range.sequence, "NC_000001.11"); + assert_eq!(range.start, 123); + assert_eq!(range.end, 456); + } + + #[test] + fn range_display() { + let range = Range::new(String::from("NC_000001.11"), 123, 456); + assert_eq!(range.to_string(), "NC_000001.11:123:456"); + } +} diff --git a/src/error.rs b/src/error.rs index a55fe28c..e4d420f2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,7 +37,10 @@ pub enum Error { /// Mismatching column counts. #[error("mismatching number of columns: {0} != {1}")] ColumnCount(usize, usize), - /// Mismatching column names.. + /// Mismatching null values. + #[error("mismatching of null values: {0} != {1}")] + NullValues(String, String), + /// Mismatching column names. #[error("mismatching number of column names: {0} != {1}")] ColumnName(String, String), /// Problem opening RocksDB. diff --git a/src/main.rs b/src/main.rs index b58b310d..ce7e4a21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,6 +40,8 @@ struct Tsv { enum TsvCommands { /// "import" sub command Import(tsv::cli::import::Args), + /// "query" sub command + Query(tsv::cli::query::Args), } pub fn main() -> Result<(), anyhow::Error> { @@ -65,6 +67,7 @@ pub fn main() -> Result<(), anyhow::Error> { match &cli.command { Commands::Tsv(args) => match &args.command { TsvCommands::Import(args) => tsv::cli::import::run(&cli.common, args)?, + TsvCommands::Query(args) => tsv::cli::query::run(&cli.common, args)?, }, } @@ -75,19 +78,3 @@ pub fn main() -> Result<(), anyhow::Error> { Ok(()) } - -// -// Copyright 2023 annonars Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// diff --git a/src/tsv/cli/import/mod.rs b/src/tsv/cli/import/mod.rs index 6c987698..9b52d113 100644 --- a/src/tsv/cli/import/mod.rs +++ b/src/tsv/cli/import/mod.rs @@ -84,9 +84,16 @@ pub fn process_tsv_line( let values = values.iter().collect::>(); let var = ctx.values_to_var(&values)?; - let key: Vec = var.into(); + let key: Vec = var.clone().into(); let value = ctx.encode_values(&values)?; + tracing::trace!( + "putting for var = {:?}, key = {:?}, value = {:?}", + &var, + &key, + &value + ); + db.put_cf(cf_data, key, value)?; Ok(()) @@ -175,6 +182,15 @@ pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> db.put_cf(&cf_meta, "db-name", &args.db_name)?; db.put_cf(&cf_meta, "db-version", &args.db_version)?; db.put_cf(&cf_meta, "db-schema", serde_json::to_string(&schema)?)?; + db.put_cf( + &cf_meta, + "db-infer-config", + serde_json::to_string(&infer_config)?, + )?; + tracing::info!( + " putting infer config: {}", + serde_json::to_string(&infer_config)? + ); tracing::info!(" putting schema: {}", serde_json::to_string(&schema)?); tracing::info!( "... done opening RocksDB for writing in {:?}", diff --git a/src/tsv/cli/import/no_tbi.rs b/src/tsv/cli/import/no_tbi.rs index b813aef4..24eafc14 100644 --- a/src/tsv/cli/import/no_tbi.rs +++ b/src/tsv/cli/import/no_tbi.rs @@ -32,7 +32,7 @@ pub fn tsv_import( Box::new(BufReader::new(std::fs::File::open(path_in_tsv)?)) }; - let ctx = tsv::coding::Context::from(config.clone(), schema.clone()); + let ctx = tsv::coding::Context::new(config.clone(), schema.clone()); // Read the file line by line, decode the values, extract position, and insert into RocksDB // instance. diff --git a/src/tsv/cli/import/par_tbi.rs b/src/tsv/cli/import/par_tbi.rs index 0e21b9e8..aa2c3b79 100644 --- a/src/tsv/cli/import/par_tbi.rs +++ b/src/tsv/cli/import/par_tbi.rs @@ -98,7 +98,7 @@ pub fn tsv_import_window( let query = noodles_csi::io::Query::new(&mut reader, chunks); // Read through the overlapping lines. - let ctx = tsv::coding::Context::from(config.clone(), schema.clone()); + let ctx = tsv::coding::Context::new(config.clone(), schema.clone()); for result in query.lines() { let line = result?; diff --git a/src/tsv/cli/mod.rs b/src/tsv/cli/mod.rs index 0f28dbbd..5e954b47 100644 --- a/src/tsv/cli/mod.rs +++ b/src/tsv/cli/mod.rs @@ -1,3 +1,4 @@ //! Command line interface `tsv *` subcommands. pub mod import; +pub mod query; diff --git a/src/tsv/cli/query.rs b/src/tsv/cli/query.rs new file mode 100644 index 00000000..f9aa40c5 --- /dev/null +++ b/src/tsv/cli/query.rs @@ -0,0 +1,470 @@ +//! Query for variants, positions, or ranges. + +use std::sync::Arc; + +use crate::{ + common::{self, keys, spdi}, + tsv::{coding, schema}, +}; + +/// + +/// Command line arguments for `tsv query` sub command. +#[derive(clap::Parser, Debug, Clone)] +#[command(about = "import tsv data into rocksdb", long_about = None)] +pub struct Args { + /// Path to RocksDB directory with data. + #[arg(long)] + pub path_rocksdb: String, + /// Name of the column family to import into. + #[arg(long, default_value = "tsv_data")] + pub cf_name: String, + /// Output file (default is stdout == "-"). + #[arg(long, default_value = "-")] + pub out_file: String, + /// Output format. + #[arg(long, default_value = "jsonl")] + pub out_format: common::cli::OutputFormat, + + /// Variant or position to query for. + #[command(flatten)] + pub query: ArgsQuery, +} + +/// Argument group for specifying one of variant, position, or range. +#[derive(clap::Args, Debug, Clone, Default)] +#[group(required = true, multiple = false)] +pub struct ArgsQuery { + /// Specify variant to query for. + #[arg(long, group = "query")] + pub variant: Option, + /// Specify position to query for. + #[arg(long, group = "query")] + pub position: Option, + /// Specify range to query for. + #[arg(long, group = "query")] + pub range: Option, + /// Query for all variants. + #[arg(long, group = "query")] + pub all: bool, +} + +/// Meta information as read from database. +#[derive(Debug)] +struct Meta { + /// Genome release of data in database. + pub genome_release: String, + /// Name of the database. + pub db_name: String, + /// Version of the database. + pub db_version: String, + /// Schema of the database. + pub db_schema: schema::FileSchema, + /// Inference configuration. + pub db_infer_config: schema::infer::Config, +} + +/// Open RocksDB database. +fn open_rocksdb( + args: &Args, +) -> Result<(Arc>, Meta), anyhow::Error> { + tracing::info!("Opening RocksDB database ..."); + let before_open = std::time::Instant::now(); + let cf_names = &["meta", &args.cf_name]; + let db = Arc::new(rocksdb::DB::open_cf_for_read_only( + &rocksdb::Options::default(), + &args.path_rocksdb, + cf_names, + true, + )?); + tracing::info!(" reading meta information"); + let meta = { + let cf_meta = db.cf_handle("meta").unwrap(); + let meta_db_name = String::from_utf8( + db.get_cf(&cf_meta, "db-name")? + .ok_or_else(|| anyhow::anyhow!("missing value meta:db-schema"))?, + )?; + let meta_genome_release = String::from_utf8( + db.get_cf(&cf_meta, "genome-release")? + .ok_or_else(|| anyhow::anyhow!("missing value meta:genome-release"))?, + )?; + let meta_db_version = String::from_utf8( + db.get_cf(&cf_meta, "db-version")? + .ok_or_else(|| anyhow::anyhow!("missing value meta:db-schema"))?, + )?; + let meta_db_schema = String::from_utf8( + db.get_cf(&cf_meta, "db-schema")? + .ok_or_else(|| anyhow::anyhow!("missing value meta:db-schema"))?, + )?; + let meta_db_infer_config = String::from_utf8( + db.get_cf(&cf_meta, "db-infer-config")? + .ok_or_else(|| anyhow::anyhow!("missing value meta:db-infer-config"))?, + )?; + Meta { + genome_release: meta_genome_release, + db_name: meta_db_name, + db_version: meta_db_version, + db_schema: serde_json::from_str(&meta_db_schema)?, + db_infer_config: serde_json::from_str(&meta_db_infer_config)?, + } + }; + + tracing::info!(" meta:db-name = {}", &meta.db_name); + tracing::info!(" meta:genome-release = {}", &meta.genome_release); + tracing::info!(" meta:db-version = {}", &meta.db_version); + tracing::info!( + " meta:db-schema = {}", + &serde_json::to_string(&meta.db_schema)? + ); + tracing::info!( + " meta:db-infer-config = {}", + &serde_json::to_string(&meta.db_infer_config)? + ); + tracing::info!( + "... opening RocksDB database took {:?}", + before_open.elapsed() + ); + + Ok((db, meta)) +} + +/// Get chromosome from the SPDI variant. +/// +/// If the optional genome release was given then it is compared to the one specified +/// in `meta` and stripped (comparision is case insensitive). +fn extract_chrom_var(variant: &spdi::Var, meta: &Meta) -> Result { + if variant.sequence.contains(":") { + let mut iter = variant.sequence.rsplitn(2, ":"); + let chromosome = iter.next().unwrap(); + if let Some(genome_release) = iter.next() { + if genome_release.to_lowercase() != meta.genome_release.to_lowercase() { + return Err(anyhow::anyhow!( + "genome release mismatch (lowercase): expected {}, got {}", + meta.genome_release, + genome_release + )); + } + } + Ok(chromosome.to_owned()) + } else { + Ok(variant.sequence.clone()) + } +} + +/// Get chromosome from the SPDI position. +/// +/// See `extract_chrom_var` for details. +fn extract_chrom_pos(pos: &spdi::Pos, meta: &Meta) -> Result { + if pos.sequence.contains(":") { + let mut iter = pos.sequence.rsplitn(2, ":"); + let chromosome = iter.next().unwrap(); + if let Some(genome_release) = iter.next() { + if genome_release.to_lowercase() != meta.genome_release.to_lowercase() { + return Err(anyhow::anyhow!( + "genome release mismatch (lowercase): expected {}, got {}", + meta.genome_release, + genome_release + )); + } + } + Ok(chromosome.to_owned()) + } else { + Ok(pos.sequence.clone()) + } +} + +/// Get chromosome from the SPDI range. +/// +/// See `extract_chrom_var` for details. +fn extract_chrom_range(range: &spdi::Range, meta: &Meta) -> Result { + if range.sequence.contains(":") { + let mut iter = range.sequence.rsplitn(2, ":"); + let chromosome = iter.next().unwrap(); + if let Some(genome_release) = iter.next() { + if genome_release.to_lowercase() != meta.genome_release.to_lowercase() { + return Err(anyhow::anyhow!( + "genome release mismatch (lowercase): expected {}, got {}", + meta.genome_release, + genome_release + )); + } + } + Ok(chromosome.to_owned()) + } else { + Ok(range.sequence.clone()) + } +} + +/// Print values to stdout. +fn print_values( + out_writer: &mut Box, + output_format: common::cli::OutputFormat, + meta: &Meta, + values: Vec, +) -> Result<(), anyhow::Error> { + match output_format { + common::cli::OutputFormat::Jsonl => { + let mut map = serde_json::Map::new(); + for (col, value) in meta.db_schema.columns.iter().zip(values.iter()) { + if !value.is_null() { + map.insert(col.name.clone(), value.clone()); + } + } + writeln!( + out_writer, + "{}", + serde_json::to_string(&serde_json::Value::Object(map))? + )?; + } + } + + Ok(()) +} + +/// Perform query for variant. +fn query_for_variant( + variant: &spdi::Var, + meta: &Meta, + db: &Arc>, + cf_data: Arc, + ctx: coding::Context, +) -> Result, anyhow::Error> { + // Split off the genome release (checked) and convert to key as used in database. + let query = spdi::Var { + sequence: extract_chrom_var(variant, meta)?, + ..variant.clone() + }; + tracing::debug!("query = {:?}", &query); + let var: keys::Var = query.into(); + let key: Vec = var.into(); + let raw_value = db + .get_cf(&cf_data, &key)? + .ok_or_else(|| anyhow::anyhow!("could not find variant in database"))?; + let values = ctx.decode_values(&raw_value)?; + + Ok(values) +} + +/// Implementation of `tsv query` sub command. +pub fn run(common: &common::cli::Args, args: &Args) -> Result<(), anyhow::Error> { + tracing::info!("Starting 'tsv query' command"); + tracing::info!("common = {:#?}", &common); + tracing::info!("args = {:#?}", &args); + + let (db, meta) = open_rocksdb(args)?; + let cf_data = db.cf_handle(&args.cf_name).unwrap(); + let ctx = coding::Context::new(meta.db_infer_config.clone(), meta.db_schema.clone()); + + // Obtain writer to output. + let mut out_writer = match args.out_file.as_ref() { + "-" => Box::new(std::io::stdout()) as Box, + out_file => { + let path = std::path::Path::new(out_file); + Box::new(std::fs::File::create(&path).unwrap()) as Box + } + }; + + tracing::info!("Running query..."); + let before_query = std::time::Instant::now(); + if let Some(variant) = args.query.variant.as_ref() { + print_values( + &mut out_writer, + args.out_format, + &meta, + query_for_variant(variant, &meta, &db, cf_data, ctx)?, + )?; + } else { + let (start, stop) = if let Some(position) = args.query.position.as_ref() { + let position = spdi::Pos { + sequence: extract_chrom_pos(position, &meta)?, + ..position.clone() + }; + (Some(position.clone()), Some(position.clone())) + } else if let Some(range) = args.query.range.as_ref() { + let range = spdi::Range { + sequence: extract_chrom_range(range, &meta)?, + ..range.clone() + }; + let (start, stop) = range.clone().into(); + (Some(start), Some(stop)) + } else if args.query.all { + (None, None) + } else { + unreachable!() + }; + + tracing::debug!("start = {:?}, stop = {:?}", &start, &stop); + + // Obtain iterator and seek to start. + let mut iter = db.raw_iterator_cf(&cf_data); + if let Some(start) = start { + let pos: keys::Pos = start.into(); + let key: Vec = pos.into(); + tracing::debug!("seeking to key {:?}", &key); + iter.seek(&key); + } else { + iter.seek(&b"") + } + + // Cast stop to `keys::Pos`. + let stop = stop.map(|stop| -> keys::Pos { stop.into() }); + if let Some(stop) = stop.as_ref() { + let stop: Vec = stop.clone().into(); + tracing::debug!("stop = {:?}", &stop); + } + + // Iterate over all variants until we are behind stop. + while iter.valid() { + if let Some(value) = iter.value() { + tracing::trace!("iterator at {:?} => {:?}", &iter.key(), &value); + if let Some(stop) = stop.as_ref() { + let iter_key = iter.key().unwrap(); + let iter_pos: keys::Pos = iter_key.into(); + + if &iter_pos > stop { + break; + } + } + + let values = ctx.decode_values(&value)?; + print_values(&mut out_writer, args.out_format, &meta, values)?; + iter.next(); + } else { + break; + } + } + } + tracing::info!("... done querying in {:?}", before_query.elapsed()); + + tracing::info!("All done. Have a nice day!"); + Ok(()) +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use super::*; + + use temp_testdir::TempDir; + + fn args(query: ArgsQuery) -> (common::cli::Args, Args, TempDir) { + let temp = TempDir::default(); + let common = common::cli::Args { + verbose: clap_verbosity_flag::Verbosity::new(1, 0), + }; + let args = Args { + path_rocksdb: String::from("tests/tsv/example/data.tsv.gz.db"), + cf_name: String::from("tsv_data"), + out_file: temp.join("out").to_string_lossy().to_string(), + out_format: common::cli::OutputFormat::Jsonl, + query, + }; + + (common, args, temp) + } + + #[test] + fn smoke_query_all() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + all: true, + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_var() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + variant: Some(spdi::Var::from_str("GRCh37:1:1000:A:T")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_pos() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + position: Some(spdi::Pos::from_str("GRCh37:1:1000")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_range_find_all() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + range: Some(spdi::Range::from_str("GRCh37:1:1000:1001")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_range_find_first() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + range: Some(spdi::Range::from_str("GRCh37:1:1000:1000")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_range_find_second() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + range: Some(spdi::Range::from_str("GRCh37:1:1001:1001")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_range_find_none_smaller() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + range: Some(spdi::Range::from_str("GRCh37:1:1:999")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } + + #[test] + fn smoke_query_range_find_none_larger() -> Result<(), anyhow::Error> { + let (common, args, _temp) = args(ArgsQuery { + range: Some(spdi::Range::from_str("GRCh37:1:1002:2000")?), + ..Default::default() + }); + run(&common, &args)?; + let out_data = std::fs::read_to_string(&args.out_file)?; + insta::assert_snapshot!(&out_data); + + Ok(()) + } +} diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_all.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_all.snap new file mode 100644 index 00000000..b4761d2d --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_all.snap @@ -0,0 +1,7 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1000,"REF":"A","ALT":"T","payload":0.1} +{"CHROM":"chr1","POS":1001,"REF":"A","ALT":"T","payload":0.2} + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_pos.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_pos.snap new file mode 100644 index 00000000..21e989e0 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_pos.snap @@ -0,0 +1,6 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1000,"REF":"A","ALT":"T","payload":0.1} + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_all.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_all.snap new file mode 100644 index 00000000..b4761d2d --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_all.snap @@ -0,0 +1,7 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1000,"REF":"A","ALT":"T","payload":0.1} +{"CHROM":"chr1","POS":1001,"REF":"A","ALT":"T","payload":0.2} + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_first.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_first.snap new file mode 100644 index 00000000..ab307694 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_first.snap @@ -0,0 +1,7 @@ +--- +source: src/tsv/cli/query.rs +assertion_line: 427 +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1000,"REF":"A","ALT":"T","payload":0.1} + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_larger.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_larger.snap new file mode 100644 index 00000000..dabbaa44 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_larger.snap @@ -0,0 +1,5 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_smaller.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_smaller.snap new file mode 100644 index 00000000..dabbaa44 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_none_smaller.snap @@ -0,0 +1,5 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_second.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_second.snap new file mode 100644 index 00000000..3034dc31 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_range_find_second.snap @@ -0,0 +1,6 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1001,"REF":"A","ALT":"T","payload":0.2} + diff --git a/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_var.snap b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_var.snap new file mode 100644 index 00000000..21e989e0 --- /dev/null +++ b/src/tsv/cli/snapshots/annonars__tsv__cli__query__test__smoke_query_var.snap @@ -0,0 +1,6 @@ +--- +source: src/tsv/cli/query.rs +expression: "&out_data" +--- +{"CHROM":"chr1","POS":1000,"REF":"A","ALT":"T","payload":0.1} + diff --git a/src/tsv/coding.rs b/src/tsv/coding.rs index bd04f588..2f4e6d14 100644 --- a/src/tsv/coding.rs +++ b/src/tsv/coding.rs @@ -22,7 +22,7 @@ impl Context { } /// Create a new context for coding and decoding. - pub fn from(config: schema::infer::Config, schema: schema::FileSchema) -> Self { + pub fn new(config: schema::infer::Config, schema: schema::FileSchema) -> Self { Self { config, schema } } @@ -188,6 +188,7 @@ impl Context { val.push(bytes[offset]); offset += 1; } + offset += 1; let val = String::from_utf8(val).map_err(error::Error::InvalidUtf8)?; res.push(val.into()); } @@ -277,13 +278,16 @@ mod test { null_values: vec![String::from("NA")], ..schema::infer::Config::default() }; - let schema = schema::FileSchema::from(vec![ - schema::ColumnSchema::from("a", schema::ColumnType::String), - schema::ColumnSchema::from("b", schema::ColumnType::Integer), - schema::ColumnSchema::from("c", schema::ColumnType::Float), - schema::ColumnSchema::from("d", schema::ColumnType::String), - ]); - Context::from(config, schema) + let schema = schema::FileSchema::from( + vec![ + schema::ColumnSchema::from("a", schema::ColumnType::String), + schema::ColumnSchema::from("b", schema::ColumnType::Integer), + schema::ColumnSchema::from("c", schema::ColumnType::Float), + schema::ColumnSchema::from("d", schema::ColumnType::String), + ], + vec![String::from(".")], + ); + Context::new(config, schema) } fn example_values() -> Vec { diff --git a/src/tsv/schema.rs b/src/tsv/schema.rs index 1e3228b4..536510a7 100644 --- a/src/tsv/schema.rs +++ b/src/tsv/schema.rs @@ -88,7 +88,7 @@ impl ColumnSchema { } impl FileSchema { - /// Ensure that all column names are the same and return an error if not. + /// Ensure that all null values and column names are the same and return an error if not. /// Otherwise, perform a column-wise extension of the column type. pub fn merge(&self, other: &FileSchema) -> Result { // Check that the column names are the same and in the same order. @@ -106,6 +106,14 @@ impl FileSchema { )); } } + // Check that the null values are the same. + if self.null_values != other.null_values { + return Err(error::Error::NullValues( + self.null_values.join(","), + other.null_values.join(","), + )); + } + // Now merge the column types. let columns = self .columns @@ -117,7 +125,10 @@ impl FileSchema { }) .collect(); - Ok(FileSchema { columns }) + Ok(FileSchema { + columns, + null_values: self.null_values.clone(), + }) } } @@ -126,12 +137,17 @@ impl FileSchema { pub struct FileSchema { /// The columns. pub columns: Vec, + /// The null values. + pub null_values: Vec, } impl FileSchema { /// Create a new schema from the given columns. - pub fn from(columns: Vec) -> Self { - Self { columns } + pub fn from(columns: Vec, null_values: Vec) -> Self { + Self { + columns, + null_values, + } } } @@ -147,7 +163,7 @@ pub mod infer { /// /// The `Default` trait provides appropriate defaults that could be used using /// VCF-style headers. - #[derive(Debug, Clone, PartialEq, Eq)] + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Config { /// Field delimiter to use. pub field_delimiter: char, @@ -316,9 +332,13 @@ pub mod infer { typ: self.default_column_config(&c.name), }) .collect(), + null_values: self.config.null_values.clone(), }) } else { - Ok(FileSchema { columns }) + Ok(FileSchema { + columns, + null_values: self.config.null_values.clone(), + }) } } else { Err(error::Error::HeaderMissing) @@ -365,6 +385,7 @@ mod test { typ: ColumnType::String, }, ], + null_values: vec![String::from(".")], }; let schema2 = FileSchema { columns: vec![ @@ -377,6 +398,7 @@ mod test { typ: ColumnType::Integer, }, ], + null_values: vec![String::from(".")], }; let merged = schema1.merge(&schema2)?; @@ -394,6 +416,7 @@ mod test { typ: ColumnType::String, }, ], + null_values: vec![String::from(".")], } ); diff --git a/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_header.snap b/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_header.snap index 335f43e5..2f01cabe 100644 --- a/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_header.snap +++ b/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_header.snap @@ -21,4 +21,7 @@ FileSchema { typ: Unknown, }, ], + null_values: [ + ".", + ], } diff --git a/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_values.snap b/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_values.snap index 7fe53fb6..1edcc112 100644 --- a/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_values.snap +++ b/src/tsv/snapshots/annonars__tsv__schema__test__infer_schema_values.snap @@ -21,4 +21,7 @@ FileSchema { typ: Integer, }, ], + null_values: [ + ".", + ], } diff --git a/tests/tsv/example/bootstrap.sh b/tests/tsv/example/bootstrap.sh new file mode 100644 index 00000000..278ec434 --- /dev/null +++ b/tests/tsv/example/bootstrap.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +set -euo pipefail +set -x + +rm -rf tests/tsv/example/data.tsv.gz.db +cargo run --all-features -- \ + tsv import \ + --add-default-null-values \ + --genome-release grch37 \ + --path-in-tsv tests/tsv/example/data.tsv.gz \ + --path-out-rocksdb tests/tsv/example/data.tsv.gz.db \ + --db-name example-tsv \ + --db-version 1.0 \ + --col-chrom CHROM \ + --col-start POS \ + --col-ref REF \ + --col-alt ALT +rm -f tests/tsv/example/data.tsv.gz.db/*.log diff --git a/tests/tsv/example/data.tsv b/tests/tsv/example/data.tsv index 2542f9b5..4216cb95 100644 --- a/tests/tsv/example/data.tsv +++ b/tests/tsv/example/data.tsv @@ -1,3 +1,3 @@ CHROM POS REF ALT payload -1 1000 A T 0.1 -1 1001 A T 0.2 +chr1 1000 A T 0.1 +chr1 1001 A T 0.2 diff --git a/tests/tsv/example/data.tsv.gz.db/000014.sst b/tests/tsv/example/data.tsv.gz.db/000014.sst new file mode 100644 index 00000000..f5d2fafb --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/000014.sst @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:921f0cba893131f55458962b3e5c1d63bd6310bf13e7a5278ef7e29b1f0b9876 +size 1491 diff --git a/tests/tsv/example/data.tsv.gz.db/000016.sst b/tests/tsv/example/data.tsv.gz.db/000016.sst new file mode 100644 index 00000000..65d02afc --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/000016.sst @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6b0f6c71619ac026fc6e08943c886f2893ef91063c2866ca1f97f5cc5c7a5f1a +size 1213 diff --git a/tests/tsv/example/data.tsv.gz.db/CURRENT b/tests/tsv/example/data.tsv.gz.db/CURRENT new file mode 100644 index 00000000..f8d50486 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/CURRENT @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9c283f6e81028b9eb0760d918ee4bc0aa256ed3b926393c1734c760c4bd724fd +size 16 diff --git a/tests/tsv/example/data.tsv.gz.db/IDENTITY b/tests/tsv/example/data.tsv.gz.db/IDENTITY new file mode 100644 index 00000000..20c4ca66 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/IDENTITY @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:80dc4cfd38f3b671e57311a136418eaa9efd36c0d736d421693dd8f1df335fe7 +size 36 diff --git a/tests/tsv/example/data.tsv.gz.db/LOCK b/tests/tsv/example/data.tsv.gz.db/LOCK new file mode 100644 index 00000000..e69de29b diff --git a/tests/tsv/example/data.tsv.gz.db/LOG b/tests/tsv/example/data.tsv.gz.db/LOG new file mode 100644 index 00000000..6e376cd0 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/LOG @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5b41c8c796e3d35d8146c5135edcdcd420ce504e35d26a73518f8d43c7b055d3 +size 61922 diff --git a/tests/tsv/example/data.tsv.gz.db/MANIFEST-000005 b/tests/tsv/example/data.tsv.gz.db/MANIFEST-000005 new file mode 100644 index 00000000..fb58f017 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/MANIFEST-000005 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5603615b42dec440148a740fec019aa1ce494f35dab305c0f22a5a04a7e5e3c4 +size 658 diff --git a/tests/tsv/example/data.tsv.gz.db/OPTIONS-000009 b/tests/tsv/example/data.tsv.gz.db/OPTIONS-000009 new file mode 100644 index 00000000..cc5acc38 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/OPTIONS-000009 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a4be9b1220e95144635b1e9799869b4b1ba1e4c7b9e89dd798b46c0d4db8982e +size 15443 diff --git a/tests/tsv/example/data.tsv.gz.db/OPTIONS-000011 b/tests/tsv/example/data.tsv.gz.db/OPTIONS-000011 new file mode 100644 index 00000000..cc5acc38 --- /dev/null +++ b/tests/tsv/example/data.tsv.gz.db/OPTIONS-000011 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a4be9b1220e95144635b1e9799869b4b1ba1e4c7b9e89dd798b46c0d4db8982e +size 15443