Skip to content

Commit

Permalink
bam/async/io/reader/header: Add SAM header reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Dec 17, 2024
1 parent c7425ba commit ebf28d5
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 27 deletions.
3 changes: 2 additions & 1 deletion noodles-bam/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ repository = "https://github.com/zaeleus/noodles"
documentation = "https://docs.rs/noodles-bam"

[features]
async = ["dep:futures", "dep:tokio", "noodles-bgzf/async"]
async = ["dep:futures", "dep:pin-project-lite", "dep:tokio", "noodles-bgzf/async"]

[dependencies]
bstr.workspace = true
byteorder.workspace = true
futures = { workspace = true, optional = true, features = ["std"] }
indexmap.workspace = true
memchr.workspace = true
pin-project-lite = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["fs", "io-util"] }

noodles-bgzf = { path = "../noodles-bgzf", version = "0.34.0" }
Expand Down
30 changes: 4 additions & 26 deletions noodles-bam/src/async/io/reader/header.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
mod magic_number;
mod reference_sequences;
mod sam_header;

use noodles_sam as sam;
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt};

use self::{magic_number::read_magic_number, reference_sequences::read_reference_sequences};
use crate::io::reader::header::reference_sequences_eq;
Expand Down Expand Up @@ -40,7 +41,7 @@ where

let mut parser = sam::header::Parser::default();

let mut header_reader = BufReader::new(reader.take(l_text));
let mut header_reader = sam_header::Reader::new(reader, l_text);
let mut buf = Vec::new();

while read_line(&mut header_reader, &mut buf).await? != 0 {
Expand All @@ -49,7 +50,7 @@ where
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
}

discard_padding(&mut header_reader).await?;
header_reader.discard_to_end().await?;

Ok(parser.finish())
}
Expand All @@ -58,18 +59,11 @@ async fn read_line<R>(reader: &mut R, dst: &mut Vec<u8>) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
const NUL: u8 = 0x00;
const LINE_FEED: u8 = b'\n';
const CARRIAGE_RETURN: u8 = b'\r';

dst.clear();

let src = reader.fill_buf().await?;

if src.is_empty() || src[0] == NUL {
return Ok(0);
}

match reader.read_until(LINE_FEED, dst).await? {
0 => Ok(0),
n => {
Expand All @@ -86,22 +80,6 @@ where
}
}

async fn discard_padding<R>(reader: &mut R) -> io::Result<()>
where
R: AsyncBufRead + Unpin,
{
loop {
let src = reader.fill_buf().await?;

if src.is_empty() {
return Ok(());
}

let len = src.len();
reader.consume(len);
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
Expand Down
119 changes: 119 additions & 0 deletions noodles-bam/src/async/io/reader/header/sam_header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use bstr::ByteSlice;
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::io::{
self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader, ReadBuf, Take,
};

pin_project! {
pub(super) struct Reader<R> {
#[pin]
inner: BufReader<Take<R>>,
is_eol: bool,
}
}

impl<R> Reader<R>
where
R: AsyncRead + Unpin,
{
pub(super) fn new(inner: R, len: u64) -> Self {
Self {
inner: BufReader::new(inner.take(len)),
is_eol: true,
}
}

pub(super) async fn discard_to_end(&mut self) -> io::Result<usize> {
let mut n = 0;

loop {
let src = self.inner.fill_buf().await?;

if src.is_empty() {
return Ok(n);
}

let len = src.len();

self.inner.consume(len);

n += len;
}
}
}

impl<R> AsyncRead for Reader<R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let src = ready!(self.as_mut().poll_fill_buf(cx))?;

let amt = src.len().min(buf.remaining());
buf.put_slice(&src[..amt]);

self.consume(amt);

Poll::Ready(Ok(()))
}
}

impl<R> AsyncBufRead for Reader<R>
where
R: AsyncRead + Unpin,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
const NUL: u8 = 0x00;
const LINE_FEED: u8 = b'\n';

let this = self.project();
let src = ready!(this.inner.poll_fill_buf(cx))?;

let buf = if *this.is_eol && src.first().map(|&b| b == NUL).unwrap_or(true) {
&[]
} else if let Some(i) = src.as_bstr().find_byte(LINE_FEED) {
*this.is_eol = true;
&src[..=i]
} else {
*this.is_eol = false;
src
};

Poll::Ready(Ok(buf))
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.as_mut().inner.consume(amt);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_read_with_trailing_nul_padding() -> io::Result<()> {
const DATA: &[u8] = b"@HD\tVN:1.6\n";

let mut buf = DATA.to_vec();
buf.resize(1 << 10, 0);

let mut src = &buf[..];
let mut reader = Reader::new(&mut src, 1 << 10);

let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;

assert_eq!(buf, DATA);

Ok(())
}
}

0 comments on commit ebf28d5

Please sign in to comment.