Skip to content

Commit

Permalink
Update to the new async support of the ogg crate
Browse files Browse the repository at this point in the history
  • Loading branch information
est31 committed Jun 8, 2017
1 parent 768ca68 commit 1be5946
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 170 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ readme = "README.md"

[features]
default = ["ogg"]
async_ogg = ["ogg", "ogg/async"]
async_ogg = ["ogg", "ogg/async", "futures", "tokio-io"]
nightly = []

[dependencies]
byteorder = "1.0"
ogg = { version = "0.5", optional = true }
ieee754 = { version = "0.2", optional = true }
tokio-io = { version = "0.1", optional = true }
futures = { version = "0.1", optional = true }

[dev-dependencies]
ogg = "0.5"
Expand Down
97 changes: 0 additions & 97 deletions examples/async_decoder.rs

This file was deleted.

173 changes: 101 additions & 72 deletions src/inside_ogg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,98 +181,127 @@ impl<T: Read + Seek> OggStreamReader<T> {
}
}


#[cfg(feature = "async_ogg")]
mod async_utils {
/**
Support for async I/O
This module provides support for asyncronous I/O.
*/
pub mod async {

use ogg::{AdvanceAndSeekBack, Packet};
use ::inside_ogg::OggStreamReader;
use std::io::{Read, Seek};
use header::*;
use VorbisError;
use ogg::PacketReader;
use ::audio::PreviousWindowRight;
use super::*;
use ogg::reading::async::PacketReader;
use futures::stream::Stream;
use tokio_io::AsyncRead;
use futures::{Async, Future, Poll};
use std::io::{Error, ErrorKind};
use std::mem::replace;

/// Async ready creator utility to read headers out of an
/// ogg stream.
///
/// This struct is async ready, meaning that it keeps its
/// internal state consistent even if some calls to underlying
/// read result with non fatal errors like the `WouldBlock` error
/// kind.
///
/// This allows trivial wrapping with your favourite async framework.
///
/// All functions this struct has are ready to be used for operation with async I/O.
pub struct HeadersReader<T: Read + Seek + AdvanceAndSeekBack> {
rdr :PacketReader<T>,

pub struct HeadersReader<T: AsyncRead> {
pck_rd :PacketReader<T>,
ident_hdr :Option<IdentHeader>,
comment_hdr :Option<CommentHeader>,
setup_hdr :Option<SetupHeader>,
}

impl <T: Read + Seek + AdvanceAndSeekBack> HeadersReader<T> {
pub fn new(rdr :PacketReader<T>) -> Self {
return HeadersReader {
rdr : rdr,
impl<T: AsyncRead> HeadersReader<T> {
pub fn new(inner :T) -> Self {
HeadersReader::from_packet_reader(PacketReader::new(inner))
}
pub fn from_packet_reader(pck_rd :PacketReader<T>) -> Self {
HeadersReader {
pck_rd,
ident_hdr : None,
comment_hdr : None,
setup_hdr : None,
};
}
/// Tries to advance the header read process
///
/// Call this function to try to advance the header read process.
/// Once it returns `Ok(())`, the header reading is done. After that
/// you may call the into_ functions.
///
/// This function is async-ready, meaning that it will keep the internal
/// state consistent, and pass through any WouldBlock error kind errors.
pub fn try_read_headers(&mut self) -> Result<(), VorbisError> {
if self.ident_hdr.is_none() {
let pck :Packet = try!(self.rdr.read_packet());
self.ident_hdr = Some(try!(read_header_ident(&pck.data)));
}
if self.comment_hdr.is_none() {
let pck :Packet = try!(self.rdr.read_packet());
self.comment_hdr = Some(try!(read_header_comment(&pck.data)));
}
if self.setup_hdr.is_none() {
let pck :Packet = try!(self.rdr.read_packet());
let ident_hdr = self.ident_hdr.as_ref().unwrap();
self.setup_hdr = Some(try!(read_header_setup(&pck.data,
ident_hdr.audio_channels, (ident_hdr.blocksize_0, ident_hdr.blocksize_1))));
}
}
impl<T: AsyncRead> Future for HeadersReader<T> {
type Item = HeaderSet;
type Error = VorbisError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(pck) = try_ready!(self.pck_rd.poll()) {
let setup_hdr = if let Some(ref ident) = self.ident_hdr {
if self.comment_hdr.is_some() {
try!(read_header_setup(&pck.data,
ident.audio_channels,
(ident.blocksize_0, ident.blocksize_1)))
} else {
self.comment_hdr =
Some(try!(read_header_comment(&pck.data)));
return Ok(Async::NotReady);
}
} else {
self.ident_hdr =
Some(try!(read_header_ident(&pck.data)));
return Ok(Async::NotReady);
};
let ident_hdr = replace(&mut self.ident_hdr, None).unwrap();
let comment_hdr = replace(&mut self.comment_hdr, None).unwrap();
Ok(Async::Ready(((ident_hdr, comment_hdr, setup_hdr))))
} else {
try!(Err(Error::new(ErrorKind::UnexpectedEof,
"Expected header packet but found end of stream")))
}
return Ok(());
}
}
/// Reading ogg/vorbis files or streams
///
/// This is a small helper struct to help reading ogg/vorbis files
/// or streams in that format.
///
/// It only supports the main use case of pure audio ogg files streams.
/// Reading a file where vorbis is only one of multiple streams, like
/// in the case of ogv, is not supported.
///
/// If you need support for this, you need to use the lower level methods
/// instead.
pub struct OggStreamReader<T :AsyncRead> {
pck_rd :PacketReader<T>,
pwr :PreviousWindowRight,

pub ident_hdr :IdentHeader,
pub comment_hdr :CommentHeader,
pub setup_hdr :SetupHeader,

absgp_of_last_read :Option<u64>,
}

/// Initializes an OggStreamReader with the headers that have been read
///
/// Panics if the header reading process is not finished yet.
pub fn into_ogg_stream_reader(self) -> OggStreamReader<T> {
return OggStreamReader {
rdr : self.rdr,
impl<T :AsyncRead> OggStreamReader<T> {
/// Creates a new OggStreamReader from the given parameters
pub fn new(hdr_rdr :HeadersReader<T>, hdrs :HeaderSet) -> Self {
OggStreamReader::from_pck_rdr(hdr_rdr.pck_rd, hdrs)
}
/// Creates a new OggStreamReader from the given parameters
pub fn from_pck_rdr(pck_rd :PacketReader<T>, hdrs :HeaderSet) -> Self {
OggStreamReader {
pck_rd,
pwr : PreviousWindowRight::new(),
ident_hdr : self.ident_hdr.unwrap(),
comment_hdr : self.comment_hdr.unwrap(),
setup_hdr : self.setup_hdr.unwrap(),

ident_hdr : hdrs.0,
comment_hdr : hdrs.1,
setup_hdr : hdrs.2,

absgp_of_last_read : None,
};
}
/// Returns the headers that have been read
///
/// Panics if the header reading process is not finished yet.
pub fn into_header_triple(self)
-> (IdentHeader, CommentHeader, SetupHeader) {
return (self.ident_hdr.unwrap(), self.comment_hdr.unwrap(), self.setup_hdr.unwrap());
}
}
}

pub fn into_inner(self) -> PacketReader<T> {
return self.rdr;
impl<T :AsyncRead> Stream for OggStreamReader<T> {
type Item = Vec<Vec<i16>>;
type Error = VorbisError;

fn poll(&mut self) -> Poll<Option<Vec<Vec<i16>>>, VorbisError> {
let pck = match try_ready!(self.pck_rd.poll()) {
Some(p) => p,
None => return Ok(Async::Ready(None)),
};
let decoded_pck = try!(read_audio_packet(&self.ident_hdr,
&self.setup_hdr, &pck.data, &mut self.pwr));
self.absgp_of_last_read = Some(pck.absgp_page);
Ok(Async::Ready(Some(decoded_pck)))
}
}
}

#[cfg(feature = "async_ogg")]
pub use self::async_utils::HeadersReader as HeadersReader;
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ extern crate byteorder;
extern crate ogg;
#[cfg(feature = "ieee754")]
extern crate ieee754;
#[cfg(feature = "async_ogg")]
#[macro_use]
extern crate futures;
#[cfg(feature = "async_ogg")]
extern crate tokio_io;
/*
// This little thing is very useful.
macro_rules! try {
Expand Down

0 comments on commit 1be5946

Please sign in to comment.