Skip to content

Commit

Permalink
use pin_project_lite
Browse files Browse the repository at this point in the history
  • Loading branch information
yjhmelody committed Oct 26, 2019
2 parents f5a0a0b + 81e3cab commit c9e6d3a
Show file tree
Hide file tree
Showing 55 changed files with 2,430 additions and 871 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
build_and_test:
name: Build and test
runs-on: ${{ matrix.os }}
env:
RUSTFLAGS: -Dwarnings
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
Expand All @@ -29,7 +31,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --all --benches --bins --examples --tests
args: --all --bins --examples

- name: check unstable
uses: actions-rs/cargo@v1
Expand All @@ -46,6 +48,8 @@ jobs:
check_fmt_and_docs:
name: Checking fmt and docs
runs-on: ubuntu-latest
env:
RUSTFLAGS: -Dwarnings
steps:
- uses: actions/checkout@master

Expand Down Expand Up @@ -77,6 +81,9 @@ jobs:
clippy_check:
name: Clippy check
runs-on: ubuntu-latest
# TODO: There is a lot of warnings
# env:
# RUSTFLAGS: -Dwarnings
steps:
- uses: actions/checkout@v1
- id: component
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = [
edition = "2018"
license = "Apache-2.0/MIT"
repository = "https://github.com/async-rs/async-std"
homepage = "https://github.com/async-rs/async-std"
homepage = "https://async.rs"
documentation = "https://docs.rs/async-std"
description = "Async version of the Rust standard library"
keywords = ["async", "await", "future", "std", "task"]
Expand Down Expand Up @@ -43,9 +43,11 @@ pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
pin-project-lite = "0.1"

[dev-dependencies]
femme = "1.2.0"
rand = "0.7.2"
# surf = "1.0.2"
tempdir = "0.3.7"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
Expand Down
25 changes: 13 additions & 12 deletions src/future/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::pin::Pin;
use std::time::Duration;

use futures_timer::Delay;
use pin_project_lite::pin_project;

use crate::future::Future;
use crate::task::{Context, Poll};
Expand Down Expand Up @@ -39,24 +40,24 @@ where
f.await
}

/// A future that times out after a duration of time.
struct TimeoutFuture<F> {
future: F,
delay: Delay,
}

impl<F> TimeoutFuture<F> {
pin_utils::unsafe_pinned!(future: F);
pin_utils::unsafe_pinned!(delay: Delay);
pin_project! {
/// A future that times out after a duration of time.
struct TimeoutFuture<F> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
}

impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, TimeoutError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().poll(cx) {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Ready(v) => Poll::Ready(Ok(v)),
Poll::Pending => match self.delay().poll(cx) {
Poll::Pending => match this.delay.poll(cx) {
Poll::Ready(_) => Poll::Ready(Err(TimeoutError { _private: () })),
Poll::Pending => Poll::Pending,
},
Expand Down
63 changes: 34 additions & 29 deletions src/io/buf_read/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,55 @@ use std::mem;
use std::pin::Pin;
use std::str;

use pin_project_lite::pin_project;

use super::read_until_internal;
use crate::io::{self, BufRead};
use crate::stream::Stream;
use crate::task::{Context, Poll};

/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
pin_project! {
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
#[pin]
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}
}

impl<R: BufRead> Stream for Lines<R> {
type Item = io::Result<String>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
let this = self.project();
let n = futures_core::ready!(read_line_internal(
this.reader,
cx,
this.buf,
this.bytes,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
if this.buf.ends_with('\n') {
this.buf.pop();
if this.buf.ends_with('\r') {
this.buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
}
}

Expand Down
59 changes: 32 additions & 27 deletions src/io/buf_read/split.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,51 @@
use std::mem;
use std::pin::Pin;

use pin_project_lite::pin_project;

use super::read_until_internal;
use crate::io::{self, BufRead};
use crate::stream::Stream;
use crate::task::{Context, Poll};

/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
///
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Split`].
///
/// [`split`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
#[derive(Debug)]
pub struct Split<R> {
pub(crate) reader: R,
pub(crate) buf: Vec<u8>,
pub(crate) read: usize,
pub(crate) delim: u8,
pin_project! {
/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
///
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Split`].
///
/// [`split`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
#[derive(Debug)]
pub struct Split<R> {
#[pin]
pub(crate) reader: R,
pub(crate) buf: Vec<u8>,
pub(crate) read: usize,
pub(crate) delim: u8,
}
}

impl<R: BufRead> Stream for Split<R> {
type Item = io::Result<Vec<u8>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
read,
delim,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_until_internal(reader, cx, *delim, buf, read))?;
if n == 0 && buf.is_empty() {
let this = self.project();
let n = futures_core::ready!(read_until_internal(
this.reader,
cx,
*this.delim,
this.buf,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if buf[buf.len() - 1] == *delim {
buf.pop();
if this.buf[this.buf.len() - 1] == *this.delim {
this.buf.pop();
}
Poll::Ready(Some(Ok(mem::replace(buf, vec![]))))
Poll::Ready(Some(Ok(mem::replace(this.buf, vec![]))))
}
}
Loading

0 comments on commit c9e6d3a

Please sign in to comment.