Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SdkBody callbacks for progress reporting #2092

Open
lsr0 opened this issue Dec 13, 2022 · 3 comments
Open

SdkBody callbacks for progress reporting #2092

lsr0 opened this issue Dec 13, 2022 · 3 comments
Labels
documentation Improvements or additions to documentation

Comments

@lsr0
Copy link
Contributor

lsr0 commented Dec 13, 2022

Responding to the removal of SdkBody callbacks in #2065.

I'm currently using with_body_callback() for one thing only: to implement progress reporting for upload operations (e.g. S3's put_object). In this case for retryability, ByteStream::from_path() is used, which precludes sneaking in a hook for progress otherwise. For data going in the other direction (e.g. get_object) it's easy to put progress reporting in stream consumption so no such need for a hook exists.

It seems there are enough escape hatches to implement this manually in an API consumer, e.g. SdkBody::retryable(). But it's a little bit more unclear what the recommended and hopefully somewhat minimal and future proof method would be for doing so.

@Velfi
Copy link
Contributor

Velfi commented Dec 13, 2022

The way to accomplish this going forward is with "body wrappers."

I've put an issue on the example writer's backlog to add an example of how to do this.

Depending on your familiarity with Rust, you may be able to accomplish this on your own. You'd need to create a struct that looks similar to this:

pin_project! {
    pub struct ProgressBody<InnerBody> {
        #[pin]
        inner: InnerBody,
        bytes_written: u64,
        content_length: u64,
    }
}

impl ProgressBody<SdkBody> {
    /// Given an `SdkBody`, a `Box<dyn HttpChecksum>`, and a precalculated checksum represented
    /// as `Bytes`, create a new `ChecksumBody<SdkBody>`.
    pub fn new(
        body: SdkBody,
        content_length: u64,
    ) -> Self {
        Self {
            inner: body,
            content_length,
        }
    }

    fn poll_inner(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Bytes, aws_smithy_http::body::Error>>> {
        use http_body::Body;

        let this = self.project();

        match this.inner.poll_data(cx) {
            Poll::Ready(Some(Ok(data))) => {
                this.bytes_written += data.len() as u64;
                let progress = *this.bytes_written as f64 / *this.content_length as f64;
                tracing::trace!(
                    "Read {} bytes, progress: {:.2}%",
                    data.len(),
                    progress * 100.0
                );

                Poll::Ready(Some(Ok(data)))
            }
            // Once the inner body has stopped returning data, check the checksum
            // and return an error if it doesn't match.
            Poll::Ready(None) => {
                tracing::trace!("done");
            }
            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
            Poll::Pending => Poll::Pending,
        }
    }
}

(I didn't compile this to see if it actually works, but it should get you very close to what you need.)

And then call SdkBody.map() to wrap the body in your ProgressBody:

let body = ByteStream::from(SdkBody::from(<your file>).map(move |body| {
    let body = ProgressBody::new(body, <body content length>);
    aws_smithy_http::body::SdkBody::from_dyn(aws_smithy_http::body::BoxBody::new(body))
}));

@Velfi Velfi added the documentation Improvements or additions to documentation label Dec 13, 2022
@Velfi
Copy link
Contributor

Velfi commented Dec 13, 2022

I'll leave this issue open until we have an example.

@lsr0
Copy link
Contributor Author

lsr0 commented Dec 19, 2022

Understood, and thankyou! Is there a concise version of SdkBody::from(<your file>) recommended? As far as I can see the smithy implementation (PathBody) isn't exported, and only indirectly available via FsBuilder, which only provides ByteStream, not SdkBody.

I had a stab at doing it via hyper::Body::wrap_stream, and seems to provide everything except size_hint, but unsure if I'm losing anything else this way. Setting length on the put operation itself solves the need for size_hint.

use futures::future::TryFutureExt;

fn path_to_sdk_body(path: PathBuf, progress: cli::ProgressFn) -> SdkBody
{
    let open_fut = async move {
        let file = tokio::fs::File::open(path).await?;
        Ok(tokio_util::io::ReaderStream::new(file))
    };
    let flattened = open_fut.try_flatten_stream();
    let inspected = flattened.inspect_ok(move |bytes| progress(cli::Update::StateProgress(bytes.len())));
    let hyper_body = hyper::body::Body::wrap_stream(inspected);
    SdkBody::from(hyper_body)
}

fn path_to_bytestream(path: PathBuf, progress: cli::ProgressFn) -> ByteStream
{
    let retryable = SdkBody::retryable(move || {
        progress(cli::Update::StateRetried);
        path_to_sdk_body(path.clone(), progress.clone())
    });
    ByteStream::from(retryable)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

2 participants