Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Return ObjectReader in Accessor::read #929

Merged
merged 1 commit into from
Nov 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::BytesReader;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;
use crate::Scheme;

Expand Down Expand Up @@ -119,12 +120,12 @@ pub trait Accessor: Send + Sync + Debug + 'static {
}

/// Invoke the `read` operation on the specified path, returns a
/// [`BytesReader`][crate::BytesReader] if operate successful.
/// [`ObjectReader`][crate::ObjectReader] if operate successful.
///
/// # Behavior
///
/// - Input path MUST be file path, DON'T NEED to check object mode.
async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
match self.inner() {
Some(inner) => inner.read(path, args).await,
None => Err(new_unsupported_object_error(Operation::Read, path)),
Expand Down Expand Up @@ -366,7 +367,7 @@ impl<T: Accessor> Accessor for Arc<T> {
async fn create(&self, path: &str, args: OpCreate) -> Result<()> {
self.as_ref().create(path, args).await
}
async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
self.as_ref().read(path, args).await
}
async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
Expand Down
8 changes: 5 additions & 3 deletions src/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::BytesReader;
use crate::ObjectEntry;
use crate::ObjectMetadata;
use crate::ObjectMode;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// Backend of kv service.
Expand Down Expand Up @@ -129,7 +130,7 @@ where
Ok(())
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
let p = build_rooted_abs_path(&self.root, path);
let inode = self.lookup(&p).await?;
let meta = self.get_inode(inode).await?;
Expand All @@ -150,7 +151,7 @@ where
if let Some(v) = args.size() {
let _ = buf.split_off(v as usize);
}
Ok(Box::new(futures::io::Cursor::new(buf)))
Ok(ObjectReader::new(Box::new(futures::io::Cursor::new(buf))))
}
};
}
Expand All @@ -160,7 +161,8 @@ where
args.size().unwrap_or_else(|| meta.content_length()),
);
let r = BlockReader::new(self.clone(), inode, blocks);
Ok(Box::new(r))

Ok(ObjectReader::new(Box::new(r)))
}

async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
Expand Down
3 changes: 2 additions & 1 deletion src/io_util/seekable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::Accessor;
use crate::BytesReader;
use crate::Object;
use crate::ObjectMetadata;
use crate::ObjectReader;

/// Add seek support for object via internal lazy operation.
///
Expand Down Expand Up @@ -84,7 +85,7 @@ pub struct SeekableReader {

enum State {
Idle,
Sending(BoxFuture<'static, Result<BytesReader>>),
Sending(BoxFuture<'static, Result<ObjectReader>>),
Seeking(BoxFuture<'static, Result<ObjectMetadata>>),
Reading(BytesReader),
}
Expand Down
5 changes: 3 additions & 2 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::ObjectEntry;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// ConcurrentLimitLayer will add concurrent limit for OpenDAL.
Expand Down Expand Up @@ -110,7 +111,7 @@ impl Accessor for ConcurrentLimitAccessor {
self.inner.create(path, args).await
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
let permit = self
.semaphore
.clone()
Expand All @@ -121,7 +122,7 @@ impl Accessor for ConcurrentLimitAccessor {
self.inner
.read(path, args)
.await
.map(|r| Box::new(ConcurrentLimitReader::new(r, permit)) as BytesReader)
.map(|r| r.map_reader(|r| Box::new(ConcurrentLimitReader::new(r, permit))))
}

async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
Expand Down
7 changes: 5 additions & 2 deletions src/layers/content_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::BlockingBytesReader;
use crate::BytesReader;
use crate::Layer;
use crate::ObjectIterator;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// ContentCacheLayer will add content data cache support for OpenDAL.
Expand Down Expand Up @@ -104,15 +105,17 @@ impl Accessor for ContentCacheAccessor {
self.inner.create(path, args).await
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
match self.cache.read(path, args.clone()).await {
Ok(r) => Ok(r),
Err(err) if err.kind() == ErrorKind::NotFound => {
let meta = self.inner.stat(path, OpStat::new()).await?;
let r = if meta.mode().is_file() {
let size = meta.content_length();
let reader = self.inner.read(path, OpRead::new(..)).await?;
self.cache.write(path, OpWrite::new(size), reader).await?;
self.cache
.write(path, OpWrite::new(size), reader.into_reader())
.await?;
self.cache.read(path, args).await?
} else {
self.inner.read(path, args).await?
Expand Down
14 changes: 11 additions & 3 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::ObjectEntry;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;
use crate::Scheme;

Expand Down Expand Up @@ -160,7 +161,7 @@ impl Accessor for LoggingAccessor {
})
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
debug!(
target: "opendal::services",
"service={} operation={} path={} offset={:?} size={:?} -> started",
Expand All @@ -177,8 +178,15 @@ impl Accessor for LoggingAccessor {
self.scheme, Operation::Read, path,
args.offset(), args.size()
);
let r = LoggingReader::new(self.scheme, Operation::Read, path, args.size(), v);
Box::new(r) as BytesReader
v.map_reader(|r| {
Box::new(LoggingReader::new(
self.scheme,
Operation::Read,
path,
args.size(),
r,
))
})
})
.map_err(|err| {
if err.kind() == ErrorKind::Other {
Expand Down
21 changes: 12 additions & 9 deletions src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::Layer;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;

static METRIC_REQUESTS_TOTAL: &str = "opendal_requests_total";
Expand Down Expand Up @@ -662,20 +663,22 @@ impl Accessor for MetricsAccessor {
})
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
self.handle.requests_total_read.increment(1);

let start = Instant::now();

let result = self.inner.read(path, args).await.map(|reader| {
Box::new(MetricReader::new(
reader,
self.handle.bytes_total_read.clone(),
self.handle.failures_total_read.clone(),
self.handle.errors_total_read.clone(),
self.handle.requests_duration_seconds_read.clone(),
Some(start),
)) as BytesReader
reader.map_reader(|r| {
Box::new(MetricReader::new(
r,
self.handle.bytes_total_read.clone(),
self.handle.failures_total_read.clone(),
self.handle.errors_total_read.clone(),
self.handle.requests_duration_seconds_read.clone(),
Some(start),
))
})
});

result.map_err(|e| {
Expand Down
19 changes: 9 additions & 10 deletions src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::Layer;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// RetryLayer will add retry for OpenDAL.
Expand Down Expand Up @@ -136,7 +137,7 @@ where
.map_err(convert_interrupted_error)
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
let r = { || self.inner.read(path, args.clone()) }
.retry(self.backoff.clone())
.when(|e| e.kind() == ErrorKind::Interrupted)
Expand All @@ -148,11 +149,8 @@ where
})
.await
.map_err(convert_interrupted_error)?;
Ok(Box::new(RetryReader::new(
r,
Operation::Read,
self.backoff.clone(),
)))

Ok(r.map_reader(|r| Box::new(RetryReader::new(r, Operation::Read, self.backoff.clone()))))
}

/// Return `Interrupted` Error even after retry.
Expand Down Expand Up @@ -591,6 +589,7 @@ mod tests {
use crate::ops::OpWrite;
use crate::Accessor;
use crate::BytesReader;
use crate::ObjectReader;
use crate::Operator;

#[derive(Debug, Clone, Default)]
Expand All @@ -600,7 +599,7 @@ mod tests {

#[async_trait]
impl Accessor for MockService {
async fn read(&self, path: &str, _: OpRead) -> io::Result<BytesReader> {
async fn read(&self, path: &str, _: OpRead) -> io::Result<ObjectReader> {
let mut attempt = self.attempt.lock().unwrap();
*attempt += 1;

Expand Down Expand Up @@ -700,10 +699,10 @@ mod tests {

#[async_trait]
impl Accessor for MockReadService {
async fn read(&self, _: &str, _: OpRead) -> io::Result<BytesReader> {
Ok(Box::new(MockReader {
async fn read(&self, _: &str, _: OpRead) -> io::Result<ObjectReader> {
Ok(ObjectReader::new(Box::new(MockReader {
attempt: self.attempt.clone(),
}))
})))
}

async fn write(&self, _: &str, args: OpWrite, mut r: BytesReader) -> io::Result<u64> {
Expand Down
3 changes: 2 additions & 1 deletion src/layers/subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::ObjectEntry;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// SubdirLayer to switch to subdir for existing operator.
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Accessor for SubdirAccessor {
self.inner.create(&path, args).await
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
let path = self.prepend_subdir(path);

self.inner.read(&path, args).await
Expand Down
5 changes: 3 additions & 2 deletions src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::ObjectEntry;
use crate::ObjectIterator;
use crate::ObjectMetadata;
use crate::ObjectPart;
use crate::ObjectReader;
use crate::ObjectStreamer;

/// TracingLayer will add tracing for OpenDAL.
Expand Down Expand Up @@ -93,11 +94,11 @@ impl Accessor for TracingAccessor {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
self.inner
.read(path, args)
.await
.map(|r| Box::new(TracingReader::new(Span::current(), r)) as BytesReader)
.map(|r| r.map_reader(|r| Box::new(TracingReader::new(Span::current(), r))))
}

#[tracing::instrument(level = "debug", skip(self, r))]
Expand Down
21 changes: 19 additions & 2 deletions src/object/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ use crate::ObjectMode;
/// ObjectReader is a bytes reader that carries it's related metadata.
/// Users could fetch part of metadata that carried by read response.
pub struct ObjectReader {
inner: BytesReader,
meta: ObjectMetadata,
inner: BytesReader,
}

impl ObjectReader {
/// Create a new object reader.
pub fn new(inner: BytesReader) -> Self {
ObjectReader {
inner,
// the object meta must be file.
meta: ObjectMetadata::new(ObjectMode::FILE),
inner,
}
}

Expand All @@ -47,6 +47,23 @@ impl ObjectReader {
self
}

/// Replace the bytes reader with new one.
pub fn with_reader(mut self, inner: BytesReader) -> Self {
self.inner = inner;
self
}

/// Replace the bytes reader with new one.
pub fn map_reader(mut self, f: impl FnOnce(BytesReader) -> BytesReader) -> Self {
self.inner = f(self.inner);
self
}

/// Convert into a bytes reader to consume the reader.
pub fn into_reader(self) -> BytesReader {
self.inner
}

/// Content length of this object reader.
///
/// `Content-Length` is defined by [RFC 7230](https://httpwg.org/specs/rfc7230.html#header.content-length)
Expand Down
7 changes: 5 additions & 2 deletions src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::path::normalize_root;
use crate::Accessor;
use crate::BytesReader;
use crate::ObjectMode;
use crate::ObjectReader;
use crate::ObjectStreamer;
use crate::Scheme;

Expand Down Expand Up @@ -286,15 +287,17 @@ impl Accessor for Backend {
}
}

async fn read(&self, path: &str, args: OpRead) -> Result<BytesReader> {
async fn read(&self, path: &str, args: OpRead) -> Result<ObjectReader> {
let resp = self
.azblob_get_blob(path, args.offset(), args.size())
.await?;

let status = resp.status();

match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(resp.into_body().reader()),
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Ok(ObjectReader::new(resp.into_body().reader()))
}
_ => {
let er = parse_error_response(resp).await?;
let err = parse_error(Operation::Read, path, er);
Expand Down
Loading