Skip to content

Commit

Permalink
Enable Paging to Use a Predicate to Determine Continuation (#21124)
Browse files Browse the repository at this point in the history
Enable Paging to Use a Predicate to Determine Continuation
  • Loading branch information
alzimmermsft authored May 14, 2021
1 parent fab105b commit 0ee26bb
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 107 deletions.
10 changes: 10 additions & 0 deletions eng/code-quality-reports/src/main/resources/revapi/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,16 @@
"new": "@interface com.azure.core.annotation.ServiceMethod",
"justification": "Updated the retention policy from SOURCE to CLASS which is not considered a public API change"
},
{
"code": "java.method.visibilityReduced",
"new": "method java.util.function.Predicate<C> com.azure.core.util.paging.ContinuablePagedFlux<C, T, P extends com.azure.core.util.paging.ContinuablePage<C extends java.lang.Object, T extends java.lang.Object>>::getContinuationPredicate()",
"justification": "Adding a final method to an abstract class that is rarely extended. New final method uses an unlikely name that wouldn't be found in sub-types."
},
{
"code": "java.method.finalMethodAddedToNonFinalClass",
"new": "method java.util.function.Predicate<C> com.azure.core.util.paging.ContinuablePagedFlux<C, T, P extends com.azure.core.util.paging.ContinuablePage<C extends java.lang.Object, T extends java.lang.Object>>::getContinuationPredicate()",
"justification": "Adding a final method to an abstract class that is rarely extended. New final method uses an unlikely name that wouldn't be found in sub-types."
},
{
"regex": true,
"code": "java\\.annotation\\.added",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.core.http.rest;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.paging.ContinuablePagedFluxCore;
import com.azure.core.util.paging.PageRetriever;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -74,10 +75,11 @@ public PagedFluxBase(Supplier<Mono<P>> firstPageRetriever, Function<String, Mono
* Create PagedFlux backed by Page Retriever Function Supplier.
*
* @param provider the Page Retrieval Provider
* @param ignored ignored
* @param ignored An additional ignored parameter as generic types are erased and this would conflict with
* {@link #PagedFluxBase(Supplier)} without it.
*/
PagedFluxBase(Supplier<PageRetriever<String, P>> provider, boolean ignored) {
super(provider);
super(provider, null, token -> !CoreUtils.isNullOrEmpty(token));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

import reactor.core.publisher.Flux;

import java.util.Objects;
import java.util.function.Predicate;

/**
* This class is a {@link Flux} implementation that provides the ability to operate on pages of type
* {@link ContinuablePage} and individual items in such pages. This type supports user-provided continuation tokens,
* allowing for restarting from a previously-retrieved continuation token.
* This class is a {@link Flux} implementation that provides the ability to operate on pages of type {@link
* ContinuablePage} and individual items in such pages. This type supports user-provided continuation tokens, allowing
* for restarting from a previously-retrieved continuation token.
*
* @param <C> Type of the continuation token.
* @param <T> Type of the elements in the page.
Expand All @@ -17,6 +20,30 @@
* @see ContinuablePage
*/
public abstract class ContinuablePagedFlux<C, T, P extends ContinuablePage<C, T>> extends Flux<T> {
private final Predicate<C> continuationPredicate;

/**
* Creates an instance of ContinuablePagedFlux.
* <p>
* Continuation completes when the last returned continuation token is null.
*/
// This is public as previously there was no empty constructor, so there was an implicit public empty constructor.
public ContinuablePagedFlux() {
this(Objects::nonNull);
}

/**
* Creates an instance of ContinuablePagedFlux.
* <p>
* If {@code continuationPredicate} is null then the predicate will only check if the continuation token is
* non-null.
*
* @param continuationPredicate A predicate which determines if paging should continue.
*/
protected ContinuablePagedFlux(Predicate<C> continuationPredicate) {
this.continuationPredicate = (continuationPredicate == null) ? Objects::nonNull : continuationPredicate;
}

/**
* Gets a {@link Flux} of {@link ContinuablePage} starting at the first page.
*
Expand Down Expand Up @@ -56,4 +83,13 @@ public abstract class ContinuablePagedFlux<C, T, P extends ContinuablePage<C, T>
* @return A {@link Flux} of {@link ContinuablePage}.
*/
public abstract Flux<P> byPage(C continuationToken, int preferredPageSize);

/**
* Gets the {@link Predicate} that determines if paging should continue.
*
* @return The {@link Predicate} that determines if paging should continue.
*/
protected final Predicate<C> getContinuationPredicate() {
return continuationPredicate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package com.azure.core.util.paging;

import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -37,32 +39,50 @@
*/
public abstract class ContinuablePagedFluxCore<C, T, P extends ContinuablePage<C, T>>
extends ContinuablePagedFlux<C, T, P> {
private final ClientLogger logger = new ClientLogger(ContinuablePagedFluxCore.class);

final Supplier<PageRetriever<C, P>> pageRetrieverProvider;
final Integer defaultPageSize;

/**
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider) {
this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
"'pageRetrieverProvider' function cannot be null.");
this.defaultPageSize = null;
this(pageRetrieverProvider, null, null);
}

/**
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
* @param pageSize the preferred page size
* @throws IllegalArgumentException if defaultPageSize is not greater than zero
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
* @throws IllegalArgumentException If {@code pageSize} is less than or equal to zero.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) {
this(pageRetrieverProvider, pageSize, null);
}

/**
* Creates an instance of {@link ContinuablePagedFluxCore}.
*
* @param pageRetrieverProvider A provider that returns {@link PageRetriever}.
* @param pageSize The preferred page size.
* @param continuationPredicate A predicate which determines if paging should continue.
* @throws NullPointerException If {@code pageRetrieverProvider} is null.
* @throws IllegalArgumentException If {@code pageSize} is not null and is less than or equal to zero.
*/
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize,
Predicate<C> continuationPredicate) {
super(continuationPredicate);
this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
"'pageRetrieverProvider' function cannot be null.");
if (pageSize <= 0) {
throw new IllegalArgumentException("pageSize > 0 required but provided: " + pageSize);
if (pageSize != null && pageSize <= 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("'pageSize' must be greater than 0 required but provided: " + pageSize));
}
this.defaultPageSize = pageSize;
}
Expand Down Expand Up @@ -163,7 +183,7 @@ private Flux<P> retrievePages(ContinuationState<C> state, PageRetriever<C, P> pa
*/
return retrievePage(state, pageRetriever, pageSize)
.expand(page -> {
state.setLastContinuationToken(page.getContinuationToken());
state.setLastContinuationToken(page.getContinuationToken(), t -> !getContinuationPredicate().test(t));
return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize));
}, 4);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.azure.core.util.paging;

import java.util.function.Predicate;

/**
* Maintains the continuation state for a {@link ContinuablePagedFlux} or {@link ContinuablePagedIterable}.
*
Expand All @@ -17,32 +19,48 @@ class ContinuationState<C> {
/**
* Creates ContinuationState.
*
* @param token the token to start with
* @param token An optional continuation token for the beginning state.
*/
ContinuationState(C token) {
this.lastContinuationToken = token;
}

/**
* Store the last seen continuation token.
* <p>
* Determination for continuation being done is checking if the continuation token is null.
*
* @param token the token
* @param token The continuation token.
*/
void setLastContinuationToken(C token) {
this.isDone = (token == null);
this.lastContinuationToken = token;
}

/**
* @return the last seen token
* Store the last seen continuation token and apply the predicate to determine if continuation is done.
*
* @param token The continuation token.
* @param isDonePredicate The predicate that tests if continuation is done.
*/
void setLastContinuationToken(C token, Predicate<C> isDonePredicate) {
this.isDone = isDonePredicate.test(token);
this.lastContinuationToken = token;
}

/**
* Gets the last continuation token that has been seen.
*
* @return The last continuation token.
*/
C getLastContinuationToken() {
return this.lastContinuationToken;
}

/**
* @return true if the PageRetrieval Function needs to be invoked
* for next set of pages.
* Gets whether continuation is done.
*
* @return A flag determining if continuation is done.
*/
boolean isDone() {
return this.isDone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,47 @@

import com.azure.core.util.IterableStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;


public class IterableResponseTest {


@BeforeEach
public void init(TestInfo testInfo) {
System.out.println("-------------- Running " + testInfo.getDisplayName() + " -----------------------------");
}
import static org.junit.jupiter.api.Assertions.assertTrue;


/*Ensure that if we call stream multiple times, it always returns same values and they are same as original list of values.*/
public class IterableResponseTest {
/**
* Ensure that if we call stream multiple times, it always returns same values and they are same as original list of
* values.
*/
@Test
public void testIterableResponseStreamFromStart() {
public void testIterableResponseStreamFromStart() {
IterableStream<Integer> iterableResponse = getIntegerIterableResponse(2, 5);
Assertions.assertEquals(iterableResponse.stream().collect(Collectors.toList()).size(), iterableResponse.stream().collect(Collectors.toList()).size());
Assertions.assertEquals(iterableResponse.stream().count(), iterableResponse.stream().count());

// ensure original list of values are same after calling iterator()
List<Integer> originalIntegerList = Arrays.asList(2, 3, 4, 5, 6);
iterableResponse.stream().forEach(number -> Assertions.assertTrue(originalIntegerList.contains(number)));
List<Integer> originalIntegerList = Arrays.asList(2, 3, 4, 5, 6);
iterableResponse.stream().forEach(number -> assertTrue(originalIntegerList.contains(number)));
}

/*Ensure that if we call iterator multiple times, it always returns same values and they are same as original list of values.*/
/**
* Ensure that if we call iterator multiple times, it always returns same values and they are same as original list
* of values.
*/
@Test
public void testIterableResponseIteratorFromStart() {
public void testIterableResponseIteratorFromStart() {
IterableStream<Integer> iterableResponse = getIntegerIterableResponse(2, 5);
List<Integer> actualNumberValues1 = new ArrayList<>();
List<Integer> actualNumberValues2 = new ArrayList<>();
iterableResponse.iterator().forEachRemaining(number -> actualNumberValues1.add(number));
iterableResponse.iterator().forEachRemaining(number -> actualNumberValues2.add(number));
iterableResponse.iterator().forEachRemaining(actualNumberValues1::add);
iterableResponse.iterator().forEachRemaining(actualNumberValues2::add);
Assertions.assertArrayEquals(actualNumberValues1.toArray(), actualNumberValues2.toArray());

// ensure original list of values are same after calling iterator()
List<Integer> originalIntegerList = Arrays.asList(2, 3, 4, 5, 6);
iterableResponse.iterator().forEachRemaining(number -> Assertions.assertTrue(originalIntegerList.contains(number)));
List<Integer> originalIntegerList = Arrays.asList(2, 3, 4, 5, 6);
iterableResponse.iterator().forEachRemaining(number -> assertTrue(originalIntegerList.contains(number)));
}

private IterableStream<Integer> getIntegerIterableResponse(int startNumber, int noOfValues) {
Expand Down
Loading

0 comments on commit 0ee26bb

Please sign in to comment.