Skip to content

Commit

Permalink
Merge pull request #24 from stjude-rust-labs/cmcleod/async-views
Browse files Browse the repository at this point in the history
revise: changes `view` subcommand to use async readers and writers
  • Loading branch information
claymcleod authored Mar 24, 2023
2 parents 569272b + ef7f937 commit c0510b9
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 65 deletions.
41 changes: 22 additions & 19 deletions src/view/bam.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
//! BAM viewing
use std::io;
use std::io::Write;
use std::path::PathBuf;

use anyhow::Context;
use futures::TryStreamExt;
use noodles::bam::bai;
use noodles::sam;
use noodles::sam::AlignmentWriter;
use tokio::io::{self, AsyncWriteExt};

use crate::utils::formats::bam::ParsedBAMFile;
use crate::utils::formats::bam::ParsedAsyncBAMFile;
use crate::utils::formats::utils::IndexCheck;
use crate::view::command::Mode;

/// Main method for BAM viewing.
pub fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<()> {
pub async fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<()> {
// (1) Opens and parses the BAM file.
let ParsedBAMFile {
let ParsedAsyncBAMFile {
mut reader,
header,
index_path: bai_path,
..
} = crate::utils::formats::bam::open_and_parse(&src, IndexCheck::HeaderOnly)?;
} = crate::utils::formats::bam::open_and_parse_async(&src, IndexCheck::HeaderOnly).await?;

// (2) Determine the handle with which to write the output. TODO: for now, we just
// default to stdout, but in the future we will support writing to another path.
let stdout = io::stdout();
let mut handle = stdout.lock();
let mut handle = io::stdout();

// (3) If the user specified to output the header, output the header.
if mode == Mode::Full || mode == Mode::HeaderOnly {
write!(handle, "{}", header.raw).with_context(|| "writing header to stream")?;
handle
.write_all(header.raw.to_string().as_bytes())
.await
.with_context(|| "writing header to stream")?;
}

// (4) If the mode is header only, nothing left to do, so return.
Expand All @@ -39,29 +40,31 @@ pub fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<(
}

// (5) Writes the records to the output stream.
let mut writer = sam::Writer::new(handle);
let mut writer = sam::AsyncWriter::new(handle);

if let Some(query) = query {
// (a) If a query is specified, print just the records that fall within the query.
let index = bai::read(bai_path).with_context(|| "reading BAM index")?;
let region = query.parse().with_context(|| "parsing query")?;

let records = reader
let mut records = reader
.query(&header.parsed, &index, &region)
.with_context(|| "querying BAM file")?;

for result in records {
let record = result?;
writer.write_alignment_record(&header.parsed, &record)?;
while let Some(record) = records.try_next().await? {
writer
.write_alignment_record(&header.parsed, &record)
.await?;
}
} else {
// (b) Else, print all of the records in the file.
for result in reader.records(&header.parsed) {
let record = result?;
writer.write_alignment_record(&header.parsed, &record)?;
let mut records = reader.records(&header.parsed);
while let Some(record) = records.try_next().await? {
writer
.write_alignment_record(&header.parsed, &record)
.await?;
}
}

writer.finish(&header.parsed)?;
Ok(())
}
15 changes: 11 additions & 4 deletions src/view/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,21 @@ pub fn view(args: ViewArgs) -> anyhow::Result<()> {
let reference_fasta = args.reference_fasta;
let mode = args.mode;

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

match BioinformaticsFileFormat::try_detect(&src) {
Some(BioinformaticsFileFormat::SAM) => super::sam::view(src, query, mode),
Some(BioinformaticsFileFormat::BAM) => super::bam::view(src, query, mode),
Some(BioinformaticsFileFormat::SAM) => rt.block_on(super::sam::view(src, query, mode)),
Some(BioinformaticsFileFormat::BAM) => rt.block_on(super::bam::view(src, query, mode)),
Some(BioinformaticsFileFormat::CRAM) => {
if let Some(reference_fasta) = reference_fasta {
super::cram::view(src, query, reference_fasta, mode)
rt.block_on(super::cram::view(src, query, reference_fasta, mode))
} else {
bail!("Reference FASTA is required to view a CRAM file.")
bail!(
"--reference-fasta is a required argument when converting to/from a CRAM file"
)
}
}
Some(BioinformaticsFileFormat::GFF) | Some(BioinformaticsFileFormat::GFF_GZ) => {
Expand Down
55 changes: 30 additions & 25 deletions src/view/cram.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
//! CRAM viewing
use std::fs::File;
use std::io;
use std::io::Write;
use std::path::PathBuf;

use anyhow::bail;
use anyhow::Context;
use futures::TryStreamExt;
use noodles::cram;
use noodles::cram::crai;
use noodles::fasta;
use noodles::fasta::repository::adapters::IndexedReader;
use noodles::sam;
use noodles::sam::AlignmentWriter;
use tokio::io;
use tokio::io::AsyncWriteExt;
use tracing::debug;

use crate::utils::formats::sam::parse_header;
use crate::utils::pathbuf::AppendExtension;
use crate::view::command::Mode;

/// Main method for BAM viewing.
pub fn view(
pub async fn view(
src: PathBuf,
query: Option<String>,
reference_fasta: PathBuf,
mode: Mode,
) -> anyhow::Result<()> {
// (1) Reads the file from disk.
debug!("reading CRAM file from disk");
let mut reader = File::open(&src)
.map(cram::Reader::new)
let mut reader = tokio::fs::File::open(&src)
.await
.map(cram::AsyncReader::new)
.with_context(|| "opening src file")?;

// (2) Determine the handle with which to write the output.
let stdout = io::stdout();
let mut handle = stdout.lock();
let mut handle = io::stdout();

// (3) Build the FASTA repository and associated index.
let repository = fasta::indexed_reader::Builder::default()
Expand All @@ -56,55 +55,61 @@ pub fn view(
}

// (4) Read the file's definition.
reader.read_file_definition()?;
reader.read_file_definition().await?;

// (5) If the user specified to output the header, output the raw header (before
// applying any corrections).
let ht = reader
.read_file_header()
.await
.with_context(|| "reading CRAM header")?;

if mode == Mode::Full || mode == Mode::HeaderOnly {
write!(handle, "{}", ht).with_context(|| "writing header to stream")?;
handle
.write_all(ht.as_bytes())
.await
.with_context(|| "writing header to stream")?;
}

// (6) If the mode is header-only, nothing left to do, so return.
if mode == Mode::HeaderOnly {
return Ok(());
}

// (7) Parse the header and apply corrections.
// (7) Parses the header text.
let header = parse_header(ht).with_context(|| "parsing header")?;

// (8) Writes the records to the output stream.
let mut writer = sam::Writer::new(handle);
let mut writer = sam::AsyncWriter::new(handle);

if let Some(query) = query {
// (a) If a query is specified, print just the records that fall within the query.
let index =
crai::read(src.with_extension("cram.crai")).with_context(|| "reading CRAM index")?;
let index = crai::r#async::read(src.with_extension("cram.crai"))
.await
.with_context(|| "reading CRAM index")?;
let region = query.parse().with_context(|| "parsing query")?;

let records = reader
let mut records = reader
.query(&repository, &header, &index, &region)
.with_context(|| "querying CRAM file")?;

for result in records {
let record = result
.and_then(|record| record.try_into_alignment_record(&header))
while let Some(record) = records.try_next().await? {
let record = record
.try_into_alignment_record(&header)
.with_context(|| "reading record")?;
writer.write_alignment_record(&header, &record)?;
writer.write_alignment_record(&header, &record).await?;
}
} else {
// (b) Else, print all of the records in the file.
for result in reader.records(&repository, &header) {
let record = result
.and_then(|record| record.try_into_alignment_record(&header))
let mut records = reader.records(&repository, &header);

while let Some(record) = records.try_next().await? {
let record = record
.try_into_alignment_record(&header)
.with_context(|| "reading record")?;
writer.write_alignment_record(&header, &record)?;
writer.write_alignment_record(&header, &record).await?;
}
}

writer.finish(&header)?;
Ok(())
}
37 changes: 20 additions & 17 deletions src/view/sam.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
//! SAM viewing
use std::io;
use std::io::Write;
use std::path::PathBuf;

use anyhow::bail;
use anyhow::Context;
use futures::TryStreamExt;
use noodles::sam;
use noodles::sam::AlignmentWriter;
use tokio::io;
use tokio::io::AsyncWriteExt;

use crate::utils::formats::sam::ParsedSAMFile;
use crate::utils::formats::sam::ParsedAsyncSAMFile;
use crate::view::command::Mode;

/// Main method for SAM viewing.
pub fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<()> {
pub async fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<()> {
// (1) Check if the user provided a query. If they did, we do not support any sort
// tabix-like indexing and we would highly recommend the user take advantage of
// BAM/CRAM files. If anyone stumbles across this comment and sees a reason we
Expand All @@ -27,18 +27,21 @@ pub fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<(
}

// (2) Opens and parses the SAM file.
let ParsedSAMFile {
let ParsedAsyncSAMFile {
mut reader, header, ..
} = crate::utils::formats::sam::open_and_parse(&src)?;
} = crate::utils::formats::sam::open_and_parse_async(&src).await?;

// (3) Determine the handle with which to write the output. TODO: for now, we just
// default to stdout, but in the future we will support writing to another path.
let stdout = io::stdout();
let mut handle = stdout.lock();
// default to stdout, but in the future we will support writing to another
// path.
let mut handle = io::stdout();

// (4) If the user specified to output the header, output the header.
if mode == Mode::Full || mode == Mode::HeaderOnly {
write!(handle, "{}", header.raw).with_context(|| "writing header to stream")?;
handle
.write_all(header.raw.to_string().as_bytes())
.await
.with_context(|| "writing header to stream")?;
}

// (5) If the mode is header only, nothing left to do, so return.
Expand All @@ -47,13 +50,13 @@ pub fn view(src: PathBuf, query: Option<String>, mode: Mode) -> anyhow::Result<(
}

// (6) Writes the records to the output stream.
let mut writer = sam::Writer::new(handle);

for result in reader.records(&header.parsed) {
let record = result?;
writer.write_alignment_record(&header.parsed, &record)?;
let mut writer = sam::AsyncWriter::new(handle);
let mut records = reader.records(&header.parsed);
while let Some(record) = records.try_next().await? {
writer
.write_alignment_record(&header.parsed, &record)
.await?;
}

writer.finish(&header.parsed)?;
Ok(())
}

0 comments on commit c0510b9

Please sign in to comment.