Skip to content

Commit

Permalink
refactor: Make write return BytesSink instead (#194)
Browse files Browse the repository at this point in the history
* Save work

Signed-off-by: Xuanwo <[email protected]>

* Implement body sinker

Signed-off-by: Xuanwo <[email protected]>

* Add benchmark

Signed-off-by: Xuanwo <[email protected]>

* Implement write2 for fs

Signed-off-by: Xuanwo <[email protected]>

* Implement write2 for memory

Signed-off-by: Xuanwo <[email protected]>

* Implement write2 for azblob

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Rename to write

Signed-off-by: Xuanwo <[email protected]>

* Implement IntoWrite

Signed-off-by: Xuanwo <[email protected]>

* Make clippy happy

Signed-off-by: Xuanwo <[email protected]>

* Revert bench

Signed-off-by: Xuanwo <[email protected]>

* Code cleanup

Signed-off-by: Xuanwo <[email protected]>

* Remove not used code

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 31, 2022
1 parent 0883e6e commit 8571262
Show file tree
Hide file tree
Showing 12 changed files with 433 additions and 124 deletions.
11 changes: 5 additions & 6 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::error::Result;
use crate::io::BytesSink;
use crate::io::BytesStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::BoxedAsyncReader;
use crate::BoxedObjectStream;
use crate::Metadata;

Expand All @@ -40,9 +40,8 @@ pub trait Accessor: Send + Sync + Debug {
let _ = args;
unimplemented!()
}
/// Write data from input reader to the underlying storage.
async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> Result<usize> {
let (_, _) = (r, args);
async fn write(&self, args: &OpWrite) -> Result<BytesSink> {
let _ = args;
unimplemented!()
}
/// Invoke the `stat` operation on the specified path.
Expand Down Expand Up @@ -81,8 +80,8 @@ impl<T: Accessor> Accessor for Arc<T> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
self.as_ref().read(args).await
}
async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> Result<usize> {
self.as_ref().write(r, args).await
async fn write(&self, args: &OpWrite) -> Result<BytesSink> {
self.as_ref().write(args).await
}
async fn stat(&self, args: &OpStat) -> Result<Metadata> {
self.as_ref().stat(args).await
Expand Down
119 changes: 119 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use bytes::Bytes;
use futures::channel::mpsc::Sender;
use futures::channel::mpsc::{self};
use futures::ready;
use futures::Sink;
use futures::StreamExt;
use http::Response;
use hyper::client::ResponseFuture;
use hyper::Body;

use crate::error::Error;
use crate::error::Result;
use crate::ops::OpWrite;

pub fn new_channel() -> (Sender<Bytes>, Body) {
let (tx, rx) = mpsc::channel(0);

(tx, Body::wrap_stream(rx.map(Ok::<_, Error>)))
}

pub struct BodySinker {
op: OpWrite,
tx: Sender<Bytes>,
fut: ResponseFuture,
handle: fn(&OpWrite, Response<Body>) -> Result<()>,
}

impl BodySinker {
pub fn new(
op: &OpWrite,
tx: Sender<Bytes>,
fut: ResponseFuture,
handle: fn(&OpWrite, Response<Body>) -> Result<()>,
) -> BodySinker {
BodySinker {
op: op.clone(),
tx,
fut,
handle,
}
}

fn poll_response(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
match Pin::new(&mut self.fut).poll(cx) {
Poll::Ready(Ok(resp)) => Poll::Ready((self.handle)(&self.op, resp)),
// TODO: we need better error output.
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Unexpected(anyhow!(e)))),
Poll::Pending => Poll::Pending,
}
}
}

impl Sink<Bytes> for BodySinker {
type Error = Error;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
let this = &mut *self;

if let Poll::Ready(v) = Pin::new(this).poll_response(cx) {
unreachable!("response returned too early: {:?}", v)
}

self.tx
.poll_ready(cx)
.map_err(|e| Error::Unexpected(anyhow!(e)))
}

fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> std::result::Result<(), Self::Error> {
let this = &mut *self;

this.tx
.start_send(item)
.map_err(|e| Error::Unexpected(anyhow!(e)))
}

fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
if let Err(e) = ready!(Pin::new(&mut self.tx).poll_close(cx)) {
return Poll::Ready(Err(Error::Unexpected(anyhow!(e))));
}

self.poll_response(cx)
}
}
119 changes: 115 additions & 4 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use bytes::Buf;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::ready;
use futures::AsyncRead;
use futures::AsyncSeek;
use futures::AsyncWrite;
use futures::Sink;
use futures::SinkExt;
use futures::Stream;
use futures::TryStreamExt;

use crate::error::Error;
use crate::error::Result;
use crate::ops::OpRead;
use crate::ops::OpStat;
Expand All @@ -39,6 +45,8 @@ use crate::Metadata;
pub type BoxedAsyncReader = Box<dyn AsyncRead + Unpin + Send>;
/// BytesStream represents a stream of bytes.
pub type BytesStream = Box<dyn Stream<Item = Result<Bytes>> + Unpin + Send>;
/// BytesSink represents a sink of bytes.
pub type BytesSink = Box<dyn Sink<Bytes, Error = Error> + Unpin + Send>;

/// Reader is used for reading data from underlying backend.
///
Expand Down Expand Up @@ -193,16 +201,119 @@ impl Writer {
path: self.path.clone(),
size: bs.len() as u64,
};
let r = Box::new(futures::io::Cursor::new(bs));
let mut s = self.acc.write(op).await?;
s.feed(Bytes::from(bs)).await?;
s.close().await?;

self.acc.write(r, op).await
Ok(op.size as usize)
}
pub async fn write_reader(self, r: BoxedAsyncReader, size: u64) -> Result<usize> {
pub async fn write_reader(self, mut r: BoxedAsyncReader, size: u64) -> Result<usize> {
let op = &OpWrite {
path: self.path.clone(),
size,
};
let s = self.acc.write(op).await?;
let mut w = into_write(s);
Ok(futures::io::copy(&mut r, &mut w)
.await
.map_err(|e| Error::Unexpected(anyhow!(e)))? as usize)
}
}

pub fn into_sink<W: AsyncWrite + Send + Unpin>(w: W) -> IntoSink<W> {
IntoSink {
w,
b: bytes::Bytes::new(),
}
}

pub struct IntoSink<W: AsyncWrite + Send + Unpin> {
w: W,
b: bytes::Bytes,
}

impl<W> Sink<Bytes> for IntoSink<W>
where
W: AsyncWrite + Send + Unpin,
{
type Error = Error;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
while !self.b.is_empty() {
let b = &self.b.clone();
let n = ready!(Pin::new(&mut self.w).poll_write(cx, b))
.map_err(|e| Error::Unexpected(anyhow!(e)))?;
self.b.advance(n);
}

Poll::Ready(Ok(()))
}

fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> std::result::Result<(), Self::Error> {
self.b = item;
Ok(())
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
while !self.b.is_empty() {
let b = &self.b.clone();
let n = ready!(Pin::new(&mut self.w).poll_write(cx, b))
.map_err(|e| Error::Unexpected(anyhow!(e)))?;
self.b.advance(n);
}

Pin::new(&mut self.w)
.poll_flush(cx)
.map_err(|e| Error::Unexpected(anyhow!(e)))
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
self.poll_flush(cx)
}
}

pub fn into_write<S: Sink<Bytes, Error = Error> + Send + Unpin>(s: S) -> IntoWrite<S> {
IntoWrite { s }
}

pub struct IntoWrite<S: Sink<Bytes, Error = Error> + Send + Unpin> {
s: S,
}

impl<S> AsyncWrite for IntoWrite<S>
where
S: Sink<Bytes, Error = Error> + Send + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(Pin::new(&mut self.s).poll_ready(cx))?;

let size = buf.len();
Pin::new(&mut self.s).start_send(Bytes::copy_from_slice(buf))?;
Poll::Ready(Ok(size))
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.s)
.poll_flush(cx)
.map_err(io::Error::from)
}

self.acc.write(r, op).await
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.s)
.poll_close(cx)
.map_err(io::Error::from)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub use scheme::Scheme;

pub mod credential;
pub mod error;
pub mod http;
pub mod readers;

pub mod ops;
Expand Down
11 changes: 11 additions & 0 deletions src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use futures::ready;

use crate::error::Kind;
use crate::error::Result;
use crate::io::BytesSink;
use crate::io::BytesStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::Reader;
use crate::Writer;
Expand Down Expand Up @@ -64,6 +66,15 @@ impl Object {
.await
}

pub async fn sink(&self, size: u64) -> Result<BytesSink> {
self.acc
.write(&OpWrite {
path: self.meta.path().to_string(),
size,
})
.await
}

/// Create a new reader which can read the whole object.
///
/// # Example
Expand Down
Loading

0 comments on commit 8571262

Please sign in to comment.