Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client paginated _stream() methods are confused about limit #755

Closed
jgallagher opened this issue Apr 1, 2024 · 0 comments · Fixed by #756
Closed

Client paginated _stream() methods are confused about limit #755

jgallagher opened this issue Apr 1, 2024 · 0 comments · Fixed by #756

Comments

@jgallagher
Copy link
Contributor

@bnaecker and I ran into this while investigating oxidecomputer/omicron#5364. Looking at this macro-expanded code from progenitor::generate_api!:

    /**List all metric producers assigned to an oximeter collector as a Stream

Sends repeated `GET` requests to `/metrics/collectors/{collector_id}/producers` until there are no more results.

Arguments:
- `collector_id`: The ID of the oximeter collector.
- `limit`: Maximum number of items returned by a single call
- `sort_by`
*/
    pub fn cpapi_assigned_producers_list_stream<'a>(
        &'a self,
        collector_id: &'a uuid::Uuid,
        limit: Option<std::num::NonZeroU32>,
        sort_by: Option<types::IdSortMode>,
    ) -> impl futures::Stream<
        Item = Result<types::ProducerEndpoint, Error<types::Error>>,
    > + Unpin + '_ {
        use futures::StreamExt;
        use futures::TryFutureExt;
        use futures::TryStreamExt;
        let final_stream_limit = limit
            .clone()
            .and_then(|x| std::num::NonZeroUsize::try_from(x).ok())
            .map(std::num::NonZeroUsize::get)
            .unwrap_or(usize::MAX);
        self.cpapi_assigned_producers_list(collector_id, limit, None, sort_by)
            .map_ok(move |page| {
                let page = page.into_inner();
                let first = futures::stream::iter(page.items).map(Ok);
                let rest = futures::stream::try_unfold(
                        page.next_page,
                        move |state| async move {
                            if state.is_none() {
                                Ok(None)
                            } else {
                                self.cpapi_assigned_producers_list(
                                        collector_id,
                                        None,
                                        state.as_deref(),
                                        None,
                                    )
                                    .map_ok(|page| {
                                        let page = page.into_inner();
                                        Some((
                                            futures::stream::iter(page.items).map(Ok),
                                            page.next_page,
                                        ))
                                    })
                                    .await
                            }
                        },
                    )
                    .try_flatten();
                first.chain(rest)
            })
            .try_flatten_stream()
            .take(final_stream_limit)
            .boxed()
    }

it looks like there is internal confusion about what limit is supposed to be:

  1. It is converted into final_stream_limit and then applied to the returned stream via .take(final_stream_limit)
  2. It is passed to the first call to cpapi_assigned_producers_list as the page limit, but...
  3. ...it is not passed to all subsequent calls to cpapi_assigned_producers_list

I think because of 1, this function does not do what it claims; it will only fetch at most limit items. That seems wrong, and also final_stream_limit seems superfluous? Clients can call .take(_) on the returned stream themselves if they want to apply a final limit.

I'll start working on a test for this and a change that will (a) remove final_stream_limit and (b) pass limit to every invocation of the inner _list() call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant