Skip to content

Commit

Permalink
feat: querying of TSV files via CLI (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed May 16, 2023
1 parent e0a2402 commit 983a0e8
Show file tree
Hide file tree
Showing 38 changed files with 1,205 additions and 264 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tests/tsv/example/*.db/** filter=lfs diff=lfs merge=lfs -text
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.vscode

/target
/Cargo.lock

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ byteorder = "1.4.3"
log = { version = "0.4", optional = true}
rocksdb = { version = "0.21.0", features = ["multi-threaded-cf"] }
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
serde_json = { version = "1.0.96", features=["preserve_order"] }
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = {version = "0.3", optional = true}
Expand Down
8 changes: 8 additions & 0 deletions src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ pub struct Args {
pub verbose: clap_verbosity_flag::Verbosity<clap_verbosity_flag::InfoLevel>,
}

/// Output format to write.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, clap::ValueEnum, strum::Display)]
#[strum(serialize_all = "lowercase")]
pub enum OutputFormat {
/// JSONL format.
Jsonl,
}

/// Local genome release for command line arguments.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, clap::ValueEnum, strum::Display)]
#[strum(serialize_all = "lowercase")]
Expand Down
62 changes: 53 additions & 9 deletions src/common/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub struct Pos {
}

impl Pos {
/// Create new position.
pub fn new(chrom: String, pos: i32) -> Self {
Self { chrom, pos }
}

/// Create from the given chrom/pos pair.
pub fn from(chrom: &str, pos: i32) -> Self {
Pos {
Expand All @@ -30,6 +35,20 @@ impl From<Pos> for Vec<u8> {
}
}

impl From<&[u8]> for Pos {
fn from(value: &[u8]) -> Self {
let chrom = chrom_key_to_name(&value[0..2]);
let pos = i32::from_be_bytes(value[2..6].try_into().unwrap());
Self { chrom, pos }
}
}

impl From<super::spdi::Pos> for Pos {
fn from(other: super::spdi::Pos) -> Self {
Self::new(other.sequence, other.position)
}
}

/// A chromosomal change `CHROM-POS-REF-ALT`.
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct Var {
Expand All @@ -44,6 +63,16 @@ pub struct Var {
}

impl Var {
/// Create new VCF-style variant.
pub fn new(chrom: String, pos: i32, reference: String, alternative: String) -> Self {
Self {
chrom,
pos,
reference,
alternative,
}
}

/// Create from the given VCF-style variant.
pub fn from(chrom: &str, pos: i32, reference: &str, alternative: &str) -> Self {
Self {
Expand All @@ -68,6 +97,17 @@ impl From<Var> for Vec<u8> {
}
}

impl From<super::spdi::Var> for Var {
fn from(other: super::spdi::Var) -> Self {
Self::new(
other.sequence,
other.position,
other.deletion,
other.insertion,
)
}
}

/// Convert chromosome to key in RocksDB.
pub fn chrom_name_to_key(name: &str) -> String {
let chrom = if let Some(stripped) = name.strip_prefix("chr") {
Expand All @@ -92,12 +132,16 @@ pub fn chrom_name_to_key(name: &str) -> String {
}

/// Convert from RocksDB chromosome key part to chromosome name.
pub fn chrom_key_to_name(key: &str) -> String {
pub fn chrom_key_to_name(key: &[u8]) -> String {
assert!(key.len() == 2);
if key.starts_with('0') || key.starts_with(' ') {
key[1..].to_string()
if key.starts_with(b"0") || key.starts_with(b" ") {
std::str::from_utf8(&key[1..])
.expect("could not decode UTF-8")
.to_string()
} else {
key.to_string()
std::str::from_utf8(&key)
.expect("could not decode UTF-8")
.to_string()
}
}

Expand Down Expand Up @@ -147,10 +191,10 @@ mod test {

#[test]
fn test_chrom_key_to_name() {
assert_eq!(chrom_key_to_name("01"), "1");
assert_eq!(chrom_key_to_name("21"), "21");
assert_eq!(chrom_key_to_name(" X"), "X");
assert_eq!(chrom_key_to_name(" Y"), "Y");
assert_eq!(chrom_key_to_name("MT"), "MT");
assert_eq!(chrom_key_to_name(b"01"), "1");
assert_eq!(chrom_key_to_name(b"21"), "21");
assert_eq!(chrom_key_to_name(b" X"), "X");
assert_eq!(chrom_key_to_name(b" Y"), "Y");
assert_eq!(chrom_key_to_name(b"MT"), "MT");
}
}
220 changes: 2 additions & 218 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,221 +3,5 @@
#[cfg(feature = "cli")]
pub mod cli;
pub mod keys;

/// Utility code for rocksdb access.
pub mod rocks_utils {
use std::{path::Path, time::Instant};

use crate::error;

/// Tune `RocksDB` options for bulk insertion.
///
/// Example:
///
/// ```
/// use annonars::common::rocks_utils::tune_options;
///
/// let options = tune_options(rocksdb::Options::default(), None);
/// ```
///
/// # Arguments
///
/// * `options` - `RocksDB` options to tune.
/// * `wal_dir` - Optional directory for write-ahead log files.
///
/// # Returns
///
/// Tuned `RocksDB` options.
pub fn tune_options(options: rocksdb::Options, wal_dir: Option<&str>) -> rocksdb::Options {
let mut options = options;

options.create_if_missing(true);
options.create_missing_column_families(true);

options.prepare_for_bulk_load();

// compress all files with Zstandard
options.set_compression_per_level(&[]);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
// We only want to set level to 2 but have to set the rest as well using the
// Rust interface. The (default) values for the other levels were taken from
// the output of a RocksDB output folder created with default settings.
options.set_compression_options(-14, 2, 0, 0);
// options.set_zstd_max_train_bytes(100 * 1024);

options.set_max_background_jobs(16);
options.set_max_subcompactions(8);
options.increase_parallelism(8);
options.optimize_level_style_compaction(1 << 30);
options.set_min_write_buffer_number(1);
options.set_min_write_buffer_number_to_merge(1);
options.set_write_buffer_size(1 << 30);
options.set_target_file_size_base(1 << 30);
options.set_compaction_style(rocksdb::DBCompactionStyle::Universal);

if let Some(wal_dir) = wal_dir {
options.set_wal_dir(wal_dir);
}

options.set_bottommost_compression_options(-14, 3, 0, 1 << 14, true);
options.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
options.set_bottommost_zstd_max_train_bytes(1 << 22, true);

options.set_disable_auto_compactions(true);
options.optimize_for_point_lookup(1 << 26);

options
}

/// Force manual compaction of all column families at the given path.
///
/// This function will enumerate all column families and start a compaction of all of them.
/// It will then wait for the completion of all such compactions.
///
/// # Arguments
///
/// * `path` - Path to the `RocksDB` database.
/// * `options` - `RocksDB` options to use for opening database and column families.
/// * `wait_msg_prefix` - Optional prefix for the wait message.
pub fn force_compaction<P>(
path: P,
options: &rocksdb::Options,
wait_msg_prefix: Option<&str>,
) -> Result<(), error::Error>
where
P: AsRef<Path>,
{
let cf_names = rocksdb::DB::list_cf(options, path.as_ref())
.map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?;
let cfs = cf_names
.iter()
.map(|s| (s, options.clone()))
.collect::<Vec<_>>();
let db = rocksdb::DB::open_cf_with_opts(options, path.as_ref(), cfs)
.map_err(|e| error::Error::RocksDBOpen(path.as_ref().to_owned(), e))?;

let cf_names_str = cf_names
.iter()
.map(std::string::String::as_str)
.collect::<Vec<_>>();
force_compaction_cf(&db, cf_names_str, wait_msg_prefix)
}

/// Force manual compaction of the given column families in the given database.
///
/// The function will enforce compaction of the bottommost level of all column families.
/// The compression will depend on the options that the database was opened with. Using the
/// `tune_options` function is recommended to optimize the resulting database.
///
/// # Arguments
///
/// * `db` - `RocksDB` database to compact.
/// * `cf_names` - Names of the column families to compact.
/// * `wait_msg_prefix` - Optional prefix for the wait message.
pub fn force_compaction_cf<I, N>(
db: &rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
cf_names: I,
wait_msg_prefix: Option<&str>,
) -> Result<(), error::Error>
where
I: IntoIterator<Item = N>,
N: AsRef<str>,
{
// Collect columns families to run compaction for.
let cfs = cf_names
.into_iter()
.map(|cf| db.cf_handle(cf.as_ref()).unwrap())
.collect::<Vec<_>>();

// Create compaction options and enforce bottommost level compaction.
let mut compact_opt = rocksdb::CompactOptions::default();
compact_opt.set_exclusive_manual_compaction(true);
compact_opt.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force);

// Start the compaction for each column family.
cfs.iter()
.for_each(|cf| db.compact_range_cf_opt(cf, None::<&[u8]>, None::<&[u8]>, &compact_opt));
let compaction_start = Instant::now();
let mut last_logged = compaction_start;

// Wait until all compactions are done.
while db
.property_int_value(rocksdb::properties::COMPACTION_PENDING)
.map_err(error::Error::RocksDBProperty)?
.unwrap()
> 0
|| db
.property_int_value(rocksdb::properties::NUM_RUNNING_COMPACTIONS)
.map_err(error::Error::RocksDBProperty)?
.unwrap()
> 0
{
std::thread::sleep(std::time::Duration::from_millis(100));
// Log to info every second that compaction is still running.
if let Some(wait_msg_prefix) = wait_msg_prefix {
if last_logged.elapsed() > std::time::Duration::from_millis(1000) {
tracing::info!(
"{}still waiting for RocksDB compaction (since {:?})",
wait_msg_prefix,
compaction_start.elapsed()
);
last_logged = Instant::now();
}
}
}

Ok(())
}
}

#[cfg(test)]
mod test {
use temp_testdir::TempDir;

use super::rocks_utils;

/// Smoke test for the `rocks_utils::tune_options` function.
#[test]
fn smoke_test_tune_options() -> Result<(), anyhow::Error> {
let options = rocksdb::Options::default();
let _tuned = rocks_utils::tune_options(options, None);

Ok(())
}

/// Smoke test for the `rocks_utils::force_compaction` function.
#[test]
fn smoke_test_force_compaction() -> Result<(), anyhow::Error> {
let temp = TempDir::default();
let path_db = temp.join("rocksdb");

let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
{
let cf_names = &["foo", "bar"];
let _db = rocksdb::DB::open_cf(&options, &path_db, cf_names)?;
}

rocks_utils::force_compaction(&path_db, &options, Some("msg"))?;

Ok(())
}

/// Smoke test for the `rocks_utils::force_compaction` function.
#[test]
fn force_compaction_cf() -> Result<(), anyhow::Error> {
let temp = TempDir::default();
let path_db = temp.join("rocksdb");

let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
let cf_names = &["foo", "bar"];
let db = rocksdb::DB::open_cf(&options, path_db, cf_names)?;

rocks_utils::force_compaction_cf(&db, cf_names, Some("msg"))?;

Ok(())
}
}
pub mod rocks_utils;
pub mod spdi;
Loading

0 comments on commit 983a0e8

Please sign in to comment.