diff --git a/src/view/bam.rs b/src/view/bam.rs index 29f63ae..3c1cc90 100644 --- a/src/view/bam.rs +++ b/src/view/bam.rs @@ -1,36 +1,37 @@ //! BAM viewing -use std::io; -use std::io::Write; use std::path::PathBuf; use anyhow::Context; +use futures::TryStreamExt; use noodles::bam::bai; use noodles::sam; -use noodles::sam::AlignmentWriter; +use tokio::io::{self, AsyncWriteExt}; -use crate::utils::formats::bam::ParsedBAMFile; +use crate::utils::formats::bam::ParsedAsyncBAMFile; use crate::utils::formats::utils::IndexCheck; use crate::view::command::Mode; /// Main method for BAM viewing. -pub fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<()> { +pub async fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<()> { // (1) Opens and parses the BAM file. - let ParsedBAMFile { + let ParsedAsyncBAMFile { mut reader, header, index_path: bai_path, .. - } = crate::utils::formats::bam::open_and_parse(&src, IndexCheck::HeaderOnly)?; + } = crate::utils::formats::bam::open_and_parse_async(&src, IndexCheck::HeaderOnly).await?; // (2) Determine the handle with which to write the output. TODO: for now, we just // default to stdout, but in the future we will support writing to another path. - let stdout = io::stdout(); - let mut handle = stdout.lock(); + let mut handle = io::stdout(); // (3) If the user specified to output the header, output the header. if mode == Mode::Full || mode == Mode::HeaderOnly { - write!(handle, "{}", header.raw).with_context(|| "writing header to stream")?; + handle + .write_all(header.raw.to_string().as_bytes()) + .await + .with_context(|| "writing header to stream")?; } // (4) If the mode is header only, nothing left to do, so return. @@ -39,29 +40,31 @@ pub fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<( } // (5) Writes the records to the output stream. - let mut writer = sam::Writer::new(handle); + let mut writer = sam::AsyncWriter::new(handle); if let Some(query) = query { // (a) If a query is specified, print just the records that fall within the query. let index = bai::read(bai_path).with_context(|| "reading BAM index")?; let region = query.parse().with_context(|| "parsing query")?; - let records = reader + let mut records = reader .query(&header.parsed, &index, ®ion) .with_context(|| "querying BAM file")?; - for result in records { - let record = result?; - writer.write_alignment_record(&header.parsed, &record)?; + while let Some(record) = records.try_next().await? { + writer + .write_alignment_record(&header.parsed, &record) + .await?; } } else { // (b) Else, print all of the records in the file. - for result in reader.records(&header.parsed) { - let record = result?; - writer.write_alignment_record(&header.parsed, &record)?; + let mut records = reader.records(&header.parsed); + while let Some(record) = records.try_next().await? { + writer + .write_alignment_record(&header.parsed, &record) + .await?; } } - writer.finish(&header.parsed)?; Ok(()) } diff --git a/src/view/command.rs b/src/view/command.rs index f206b19..e44fcda 100644 --- a/src/view/command.rs +++ b/src/view/command.rs @@ -55,14 +55,21 @@ pub fn view(args: ViewArgs) -> anyhow::Result<()> { let reference_fasta = args.reference_fasta; let mode = args.mode; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + match BioinformaticsFileFormat::try_detect(&src) { - Some(BioinformaticsFileFormat::SAM) => super::sam::view(src, query, mode), - Some(BioinformaticsFileFormat::BAM) => super::bam::view(src, query, mode), + Some(BioinformaticsFileFormat::SAM) => rt.block_on(super::sam::view(src, query, mode)), + Some(BioinformaticsFileFormat::BAM) => rt.block_on(super::bam::view(src, query, mode)), Some(BioinformaticsFileFormat::CRAM) => { if let Some(reference_fasta) = reference_fasta { - super::cram::view(src, query, reference_fasta, mode) + rt.block_on(super::cram::view(src, query, reference_fasta, mode)) } else { - bail!("Reference FASTA is required to view a CRAM file.") + bail!( + "--reference-fasta is a required argument when converting to/from a CRAM file" + ) } } Some(BioinformaticsFileFormat::GFF) | Some(BioinformaticsFileFormat::GFF_GZ) => { diff --git a/src/view/cram.rs b/src/view/cram.rs index a7389d7..21237d8 100644 --- a/src/view/cram.rs +++ b/src/view/cram.rs @@ -1,18 +1,17 @@ //! CRAM viewing -use std::fs::File; -use std::io; -use std::io::Write; use std::path::PathBuf; use anyhow::bail; use anyhow::Context; +use futures::TryStreamExt; use noodles::cram; use noodles::cram::crai; use noodles::fasta; use noodles::fasta::repository::adapters::IndexedReader; use noodles::sam; -use noodles::sam::AlignmentWriter; +use tokio::io; +use tokio::io::AsyncWriteExt; use tracing::debug; use crate::utils::formats::sam::parse_header; @@ -20,7 +19,7 @@ use crate::utils::pathbuf::AppendExtension; use crate::view::command::Mode; /// Main method for BAM viewing. -pub fn view( +pub async fn view( src: PathBuf, query: Option, reference_fasta: PathBuf, @@ -28,13 +27,13 @@ pub fn view( ) -> anyhow::Result<()> { // (1) Reads the file from disk. debug!("reading CRAM file from disk"); - let mut reader = File::open(&src) - .map(cram::Reader::new) + let mut reader = tokio::fs::File::open(&src) + .await + .map(cram::AsyncReader::new) .with_context(|| "opening src file")?; // (2) Determine the handle with which to write the output. - let stdout = io::stdout(); - let mut handle = stdout.lock(); + let mut handle = io::stdout(); // (3) Build the FASTA repository and associated index. let repository = fasta::indexed_reader::Builder::default() @@ -56,16 +55,20 @@ pub fn view( } // (4) Read the file's definition. - reader.read_file_definition()?; + reader.read_file_definition().await?; // (5) If the user specified to output the header, output the raw header (before // applying any corrections). let ht = reader .read_file_header() + .await .with_context(|| "reading CRAM header")?; if mode == Mode::Full || mode == Mode::HeaderOnly { - write!(handle, "{}", ht).with_context(|| "writing header to stream")?; + handle + .write_all(ht.as_bytes()) + .await + .with_context(|| "writing header to stream")?; } // (6) If the mode is header-only, nothing left to do, so return. @@ -73,38 +76,40 @@ pub fn view( return Ok(()); } - // (7) Parse the header and apply corrections. + // (7) Parses the header text. let header = parse_header(ht).with_context(|| "parsing header")?; // (8) Writes the records to the output stream. - let mut writer = sam::Writer::new(handle); + let mut writer = sam::AsyncWriter::new(handle); if let Some(query) = query { // (a) If a query is specified, print just the records that fall within the query. - let index = - crai::read(src.with_extension("cram.crai")).with_context(|| "reading CRAM index")?; + let index = crai::r#async::read(src.with_extension("cram.crai")) + .await + .with_context(|| "reading CRAM index")?; let region = query.parse().with_context(|| "parsing query")?; - let records = reader + let mut records = reader .query(&repository, &header, &index, ®ion) .with_context(|| "querying CRAM file")?; - for result in records { - let record = result - .and_then(|record| record.try_into_alignment_record(&header)) + while let Some(record) = records.try_next().await? { + let record = record + .try_into_alignment_record(&header) .with_context(|| "reading record")?; - writer.write_alignment_record(&header, &record)?; + writer.write_alignment_record(&header, &record).await?; } } else { // (b) Else, print all of the records in the file. - for result in reader.records(&repository, &header) { - let record = result - .and_then(|record| record.try_into_alignment_record(&header)) + let mut records = reader.records(&repository, &header); + + while let Some(record) = records.try_next().await? { + let record = record + .try_into_alignment_record(&header) .with_context(|| "reading record")?; - writer.write_alignment_record(&header, &record)?; + writer.write_alignment_record(&header, &record).await?; } } - writer.finish(&header)?; Ok(()) } diff --git a/src/view/sam.rs b/src/view/sam.rs index 7078064..a882fa4 100644 --- a/src/view/sam.rs +++ b/src/view/sam.rs @@ -1,19 +1,19 @@ //! SAM viewing -use std::io; -use std::io::Write; use std::path::PathBuf; use anyhow::bail; use anyhow::Context; +use futures::TryStreamExt; use noodles::sam; -use noodles::sam::AlignmentWriter; +use tokio::io; +use tokio::io::AsyncWriteExt; -use crate::utils::formats::sam::ParsedSAMFile; +use crate::utils::formats::sam::ParsedAsyncSAMFile; use crate::view::command::Mode; /// Main method for SAM viewing. -pub fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<()> { +pub async fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<()> { // (1) Check if the user provided a query. If they did, we do not support any sort // tabix-like indexing and we would highly recommend the user take advantage of // BAM/CRAM files. If anyone stumbles across this comment and sees a reason we @@ -27,18 +27,21 @@ pub fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<( } // (2) Opens and parses the SAM file. - let ParsedSAMFile { + let ParsedAsyncSAMFile { mut reader, header, .. - } = crate::utils::formats::sam::open_and_parse(&src)?; + } = crate::utils::formats::sam::open_and_parse_async(&src).await?; // (3) Determine the handle with which to write the output. TODO: for now, we just - // default to stdout, but in the future we will support writing to another path. - let stdout = io::stdout(); - let mut handle = stdout.lock(); + // default to stdout, but in the future we will support writing to another + // path. + let mut handle = io::stdout(); // (4) If the user specified to output the header, output the header. if mode == Mode::Full || mode == Mode::HeaderOnly { - write!(handle, "{}", header.raw).with_context(|| "writing header to stream")?; + handle + .write_all(header.raw.to_string().as_bytes()) + .await + .with_context(|| "writing header to stream")?; } // (5) If the mode is header only, nothing left to do, so return. @@ -47,13 +50,13 @@ pub fn view(src: PathBuf, query: Option, mode: Mode) -> anyhow::Result<( } // (6) Writes the records to the output stream. - let mut writer = sam::Writer::new(handle); - - for result in reader.records(&header.parsed) { - let record = result?; - writer.write_alignment_record(&header.parsed, &record)?; + let mut writer = sam::AsyncWriter::new(handle); + let mut records = reader.records(&header.parsed); + while let Some(record) = records.try_next().await? { + writer + .write_alignment_record(&header.parsed, &record) + .await?; } - writer.finish(&header.parsed)?; Ok(()) }