Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Paging to Use a Predicate to Determine Continuation #21124

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.core.http.rest;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.paging.ContinuablePagedFluxCore;
import com.azure.core.util.paging.PageRetriever;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -74,10 +75,11 @@ public PagedFluxBase(Supplier<Mono<P>> firstPageRetriever, Function<String, Mono
* Create PagedFlux backed by Page Retriever Function Supplier.
*
* @param provider the Page Retrieval Provider
* @param ignored ignored
* @param ignored An additional ignored parameter as generic types are erased and this would conflict with
* {@link #PagedFluxBase(Supplier)} without it.
*/
PagedFluxBase(Supplier<PageRetriever<String, P>> provider, boolean ignored) {
super(provider);
super(provider, null, token -> !CoreUtils.isNullOrEmpty(token));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

import reactor.core.publisher.Flux;

import java.util.Objects;
import java.util.function.Predicate;

/**
* This class is a {@link Flux} implementation that provides the ability to operate on pages of type
* {@link ContinuablePage} and individual items in such pages. This type supports user-provided continuation tokens,
* allowing for restarting from a previously-retrieved continuation token.
* This class is a {@link Flux} implementation that provides the ability to operate on pages of type {@link
* ContinuablePage} and individual items in such pages. This type supports user-provided continuation tokens, allowing
* for restarting from a previously-retrieved continuation token.
*
* @param <C> Type of the continuation token.
* @param <T> Type of the elements in the page.
Expand All @@ -17,6 +20,29 @@
* @see ContinuablePage
*/
public abstract class ContinuablePagedFlux<C, T, P extends ContinuablePage<C, T>> extends Flux<T> {
final Predicate<C> continuationPredicate;
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates an instance of ContinuablePagedFlux.
* <p>
* Continuation completes when the last returned continuation token is null.
*/
protected ContinuablePagedFlux() {
this(Objects::nonNull);
}

/**
* Creates an instance of ContinuablePagedFlux.
* <p>
* If {@code continuationPredicate} is null then the predicate will only check if the continuation token is
* non-null.
*
* @param continuationPredicate A predicate which determines if paging should continue.
*/
protected ContinuablePagedFlux(Predicate<C> continuationPredicate) {
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
this.continuationPredicate = (continuationPredicate == null) ? Objects::nonNull : continuationPredicate;
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Gets a {@link Flux} of {@link ContinuablePage} starting at the first page.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -44,25 +45,40 @@ public abstract class ContinuablePagedFluxCore<C, T, P extends ContinuablePage<C
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider) {
this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
"'pageRetrieverProvider' function cannot be null.");
this.defaultPageSize = null;
this(pageRetrieverProvider, null, null);
}

/**
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
* @param pageSize the preferred page size
* @throws IllegalArgumentException if defaultPageSize is not greater than zero
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
* @throws IllegalArgumentException If {@code pageSize} is less than or equal to zero.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) {
this(pageRetrieverProvider, pageSize, null);
}

/**
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider A provider that returns {@link PageRetriever}.
* @param pageSize The preferred page size.
* @param continuationPredicate A predicate which determines if paging should continue.
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
* @throws IllegalArgumentException If {@code pageSize} is not null and is less than or equal to zero.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize,
Predicate<C> continuationPredicate) {
super(continuationPredicate);
this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
"'pageRetrieverProvider' function cannot be null.");
if (pageSize <= 0) {
throw new IllegalArgumentException("pageSize > 0 required but provided: " + pageSize);
if (pageSize != null && pageSize <= 0) {
throw new IllegalArgumentException("'pageSize' must be greater than 0 required but provided: " + pageSize);
alzimmermsft marked this conversation as resolved.
Show resolved Hide resolved
}
this.defaultPageSize = pageSize;
}
Expand Down Expand Up @@ -163,7 +179,7 @@ private Flux<P> retrievePages(ContinuationState<C> state, PageRetriever<C, P> pa
*/
return retrievePage(state, pageRetriever, pageSize)
.expand(page -> {
state.setLastContinuationToken(page.getContinuationToken());
state.setLastContinuationToken(page.getContinuationToken(), t -> !continuationPredicate.test(t));
return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize));
}, 4);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.azure.core.util.paging;

import java.util.function.Predicate;

/**
* Maintains the continuation state for a {@link ContinuablePagedFlux} or {@link ContinuablePagedIterable}.
*
Expand All @@ -17,32 +19,48 @@ class ContinuationState<C> {
/**
* Creates ContinuationState.
*
* @param token the token to start with
* @param token An optional continuation token for the beginning state.
*/
ContinuationState(C token) {
this.lastContinuationToken = token;
}

/**
* Store the last seen continuation token.
* <p>
* Determination for continuation being done is checking if the continuation token is null.
*
* @param token the token
* @param token The continuation token.
*/
void setLastContinuationToken(C token) {
this.isDone = (token == null);
this.lastContinuationToken = token;
}

/**
* @return the last seen token
* Store the last seen continuation token and apply the predicate to determine if continuation is done.
*
* @param token The continuation token.
* @param isDonePredicate The predicate that tests if continuation is done.
*/
void setLastContinuationToken(C token, Predicate<C> isDonePredicate) {
this.isDone = isDonePredicate.test(token);
this.lastContinuationToken = token;
}

/**
* Gets the last continuation token that has been seen.
*
* @return The last continuation token.
*/
C getLastContinuationToken() {
return this.lastContinuationToken;
}

/**
* @return true if the PageRetrieval Function needs to be invoked
* for next set of pages.
* Gets whether continuation is done.
*
* @return A flag determining if continuation is done.
*/
boolean isDone() {
return this.isDone;
Expand Down