From 4d87c53789ca3694d1ec098d0f833f9a6df2ec41 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Wed, 4 Dec 2024 15:19:58 +0000 Subject: [PATCH] Replace finished with is_terminated Signed-off-by: Alessandro Passaro --- mountpoint-s3-client/src/s3_crt_client.rs | 1 + .../src/s3_crt_client/get_object.rs | 20 ++++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index 2b71ffa18..1642c7895 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -952,6 +952,7 @@ impl<'a> S3Message<'a> { #[derive(Debug)] #[pin_project(PinnedDrop)] struct S3HttpRequest { + /// Receiver for the result of the `on_finish` callback. #[pin] receiver: Fuse>>, meta_request: MetaRequest, diff --git a/mountpoint-s3-client/src/s3_crt_client/get_object.rs b/mountpoint-s3-client/src/s3_crt_client/get_object.rs index 4191cd943..15d00426a 100644 --- a/mountpoint-s3-client/src/s3_crt_client/get_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/get_object.rs @@ -1,11 +1,12 @@ -use async_trait::async_trait; use std::future::Future; use std::ops::Deref; use std::os::unix::prelude::OsStrExt; use std::pin::Pin; use std::task::{Context, Poll}; +use async_trait::async_trait; use futures::channel::mpsc::UnboundedReceiver; +use futures::future::FusedFuture; use futures::{select_biased, Stream}; use mountpoint_s3_crt::http::request_response::{Header, Headers}; use mountpoint_s3_crt::s3::client::MetaRequestResult; @@ -123,10 +124,12 @@ impl S3CrtClient { } }; + // Guaranteed when select_biased! executes the headers branch. + assert!(!request.is_terminated()); + Ok(S3GetObjectResponse { request, part_receiver, - finished: false, requested_checksums, enable_backpressure: self.inner.enable_backpressure, headers, @@ -155,7 +158,6 @@ pub struct S3GetObjectResponse { request: S3HttpRequest<(), GetObjectError>, #[pin] part_receiver: UnboundedReceiver, - finished: bool, requested_checksums: bool, enable_backpressure: bool, headers: Headers, @@ -203,7 +205,7 @@ impl Stream for S3GetObjectResponse { type Item = ObjectClientResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if self.finished { + if self.request.is_terminated() { return Poll::Ready(None); } @@ -215,14 +217,8 @@ impl Stream for S3GetObjectResponse { } match this.request.poll(cx) { - Poll::Ready(Ok(_)) => { - *this.finished = true; - Poll::Ready(None) - } - Poll::Ready(Err(e)) => { - *this.finished = true; - Poll::Ready(Some(Err(e))) - } + Poll::Ready(Ok(_)) => Poll::Ready(None), + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), Poll::Pending => { // If the request is still not finished but the read window is not enough to poll // the next chunk we want to return error instead of keeping the request blocked.