From ebf28d567156da4970c1e0c412f76b478e0446ec Mon Sep 17 00:00:00 2001 From: Michael Macias Date: Tue, 17 Dec 2024 16:03:05 -0600 Subject: [PATCH] bam/async/io/reader/header: Add SAM header reader --- noodles-bam/Cargo.toml | 3 +- noodles-bam/src/async/io/reader/header.rs | 30 +---- .../src/async/io/reader/header/sam_header.rs | 119 ++++++++++++++++++ 3 files changed, 125 insertions(+), 27 deletions(-) create mode 100644 noodles-bam/src/async/io/reader/header/sam_header.rs diff --git a/noodles-bam/Cargo.toml b/noodles-bam/Cargo.toml index a8dcb8cb4..9210568b8 100644 --- a/noodles-bam/Cargo.toml +++ b/noodles-bam/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/zaeleus/noodles" documentation = "https://docs.rs/noodles-bam" [features] -async = ["dep:futures", "dep:tokio", "noodles-bgzf/async"] +async = ["dep:futures", "dep:pin-project-lite", "dep:tokio", "noodles-bgzf/async"] [dependencies] bstr.workspace = true @@ -19,6 +19,7 @@ byteorder.workspace = true futures = { workspace = true, optional = true, features = ["std"] } indexmap.workspace = true memchr.workspace = true +pin-project-lite = { workspace = true, optional = true } tokio = { workspace = true, optional = true, features = ["fs", "io-util"] } noodles-bgzf = { path = "../noodles-bgzf", version = "0.34.0" } diff --git a/noodles-bam/src/async/io/reader/header.rs b/noodles-bam/src/async/io/reader/header.rs index 82363735e..ae17d9b5e 100644 --- a/noodles-bam/src/async/io/reader/header.rs +++ b/noodles-bam/src/async/io/reader/header.rs @@ -1,8 +1,9 @@ mod magic_number; mod reference_sequences; +mod sam_header; use noodles_sam as sam; -use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; use self::{magic_number::read_magic_number, reference_sequences::read_reference_sequences}; use crate::io::reader::header::reference_sequences_eq; @@ -40,7 +41,7 @@ where let mut parser = sam::header::Parser::default(); - let mut header_reader = BufReader::new(reader.take(l_text)); + let mut header_reader = sam_header::Reader::new(reader, l_text); let mut buf = Vec::new(); while read_line(&mut header_reader, &mut buf).await? != 0 { @@ -49,7 +50,7 @@ where .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; } - discard_padding(&mut header_reader).await?; + header_reader.discard_to_end().await?; Ok(parser.finish()) } @@ -58,18 +59,11 @@ async fn read_line(reader: &mut R, dst: &mut Vec) -> io::Result where R: AsyncBufRead + Unpin, { - const NUL: u8 = 0x00; const LINE_FEED: u8 = b'\n'; const CARRIAGE_RETURN: u8 = b'\r'; dst.clear(); - let src = reader.fill_buf().await?; - - if src.is_empty() || src[0] == NUL { - return Ok(0); - } - match reader.read_until(LINE_FEED, dst).await? { 0 => Ok(0), n => { @@ -86,22 +80,6 @@ where } } -async fn discard_padding(reader: &mut R) -> io::Result<()> -where - R: AsyncBufRead + Unpin, -{ - loop { - let src = reader.fill_buf().await?; - - if src.is_empty() { - return Ok(()); - } - - let len = src.len(); - reader.consume(len); - } -} - #[cfg(test)] mod tests { use std::num::NonZeroUsize; diff --git a/noodles-bam/src/async/io/reader/header/sam_header.rs b/noodles-bam/src/async/io/reader/header/sam_header.rs new file mode 100644 index 000000000..e846fbe9a --- /dev/null +++ b/noodles-bam/src/async/io/reader/header/sam_header.rs @@ -0,0 +1,119 @@ +use bstr::ByteSlice; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::io::{ + self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader, ReadBuf, Take, +}; + +pin_project! { + pub(super) struct Reader { + #[pin] + inner: BufReader>, + is_eol: bool, + } +} + +impl Reader +where + R: AsyncRead + Unpin, +{ + pub(super) fn new(inner: R, len: u64) -> Self { + Self { + inner: BufReader::new(inner.take(len)), + is_eol: true, + } + } + + pub(super) async fn discard_to_end(&mut self) -> io::Result { + let mut n = 0; + + loop { + let src = self.inner.fill_buf().await?; + + if src.is_empty() { + return Ok(n); + } + + let len = src.len(); + + self.inner.consume(len); + + n += len; + } + } +} + +impl AsyncRead for Reader +where + R: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let src = ready!(self.as_mut().poll_fill_buf(cx))?; + + let amt = src.len().min(buf.remaining()); + buf.put_slice(&src[..amt]); + + self.consume(amt); + + Poll::Ready(Ok(())) + } +} + +impl AsyncBufRead for Reader +where + R: AsyncRead + Unpin, +{ + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + const NUL: u8 = 0x00; + const LINE_FEED: u8 = b'\n'; + + let this = self.project(); + let src = ready!(this.inner.poll_fill_buf(cx))?; + + let buf = if *this.is_eol && src.first().map(|&b| b == NUL).unwrap_or(true) { + &[] + } else if let Some(i) = src.as_bstr().find_byte(LINE_FEED) { + *this.is_eol = true; + &src[..=i] + } else { + *this.is_eol = false; + src + }; + + Poll::Ready(Ok(buf)) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.as_mut().inner.consume(amt); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_read_with_trailing_nul_padding() -> io::Result<()> { + const DATA: &[u8] = b"@HD\tVN:1.6\n"; + + let mut buf = DATA.to_vec(); + buf.resize(1 << 10, 0); + + let mut src = &buf[..]; + let mut reader = Reader::new(&mut src, 1 << 10); + + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await?; + + assert_eq!(buf, DATA); + + Ok(()) + } +}