Skip to content

Commit

Permalink
Replace finished with is_terminated
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Dec 5, 2024
1 parent f5512a4 commit 4d87c53
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
1 change: 1 addition & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ impl<'a> S3Message<'a> {
#[derive(Debug)]
#[pin_project(PinnedDrop)]
struct S3HttpRequest<T, E> {
/// Receiver for the result of the `on_finish` callback.
#[pin]
receiver: Fuse<oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>>,
meta_request: MetaRequest,
Expand Down
20 changes: 8 additions & 12 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_trait::async_trait;
use std::future::Future;
use std::ops::Deref;
use std::os::unix::prelude::OsStrExt;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::channel::mpsc::UnboundedReceiver;
use futures::future::FusedFuture;
use futures::{select_biased, Stream};
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use mountpoint_s3_crt::s3::client::MetaRequestResult;
Expand Down Expand Up @@ -123,10 +124,12 @@ impl S3CrtClient {
}
};

// Guaranteed when select_biased! executes the headers branch.
assert!(!request.is_terminated());

Ok(S3GetObjectResponse {
request,
part_receiver,
finished: false,
requested_checksums,
enable_backpressure: self.inner.enable_backpressure,
headers,
Expand Down Expand Up @@ -155,7 +158,6 @@ pub struct S3GetObjectResponse {
request: S3HttpRequest<(), GetObjectError>,
#[pin]
part_receiver: UnboundedReceiver<GetBodyPart>,
finished: bool,
requested_checksums: bool,
enable_backpressure: bool,
headers: Headers,
Expand Down Expand Up @@ -203,7 +205,7 @@ impl Stream for S3GetObjectResponse {
type Item = ObjectClientResult<GetBodyPart, GetObjectError, S3RequestError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.finished {
if self.request.is_terminated() {
return Poll::Ready(None);
}

Expand All @@ -215,14 +217,8 @@ impl Stream for S3GetObjectResponse {
}

match this.request.poll(cx) {
Poll::Ready(Ok(_)) => {
*this.finished = true;
Poll::Ready(None)
}
Poll::Ready(Err(e)) => {
*this.finished = true;
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Ok(_)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => {
// If the request is still not finished but the read window is not enough to poll
// the next chunk we want to return error instead of keeping the request blocked.
Expand Down

0 comments on commit 4d87c53

Please sign in to comment.