diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/AsyncOperationDocProvider.java b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/AsyncOperationDocProvider.java index ccb9b51dc224..80be3fd23f31 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/AsyncOperationDocProvider.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/AsyncOperationDocProvider.java @@ -166,7 +166,12 @@ private AsyncPaginated(IntermediateModel model, OperationModel opModel) { @Override protected String appendToDescription() { - return opModel.isPaginated() ? paginationDocs.getDocsForAsyncOperation() : ""; + return paginationDocs.getDocsForAsyncOperation(); + } + + @Override + protected void applyReturns(DocumentationBuilder docBuilder) { + docBuilder.returns("A custom publisher that can be subscribed to request a stream of response pages."); } } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/DocumentationBuilder.java b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/DocumentationBuilder.java index a68f391d6444..fcf62d4d7c5c 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/DocumentationBuilder.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/DocumentationBuilder.java @@ -30,6 +30,8 @@ */ public final class DocumentationBuilder { + // TODO This prefix is not suitable for paginated operations. Either remove it for paginated operations + // or change the statement to something generic private static final String ASYNC_THROWS_PREFIX = "The CompletableFuture returned by this method can be completed " + "exceptionally with the following exceptions."; diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/PaginationDocs.java b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/PaginationDocs.java index 87596d25e4c9..86a3d49f2166 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/PaginationDocs.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/PaginationDocs.java @@ -20,6 +20,7 @@ import com.squareup.javapoet.TypeName; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel; import software.amazon.awssdk.codegen.model.intermediate.OperationModel; import software.amazon.awssdk.codegen.poet.PoetExtensions; @@ -90,11 +91,12 @@ public String getDocsForAsyncOperation() { operationModel.getMethodName(), requestType()) .add("
When the operation is called, an instance of this class is returned. At this point, " + "no service calls are made yet and so there is no guarantee that the request is valid. " - + "The subscribe method should be called as a request to stream data. For more info, " - + "see {@link $T#$L($T)}. If there are errors in your " - + "request, you will see the failures only after you start streaming the data.
", - getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType()) - .add(getAsyncCodeSnippets()) + + "If there are errors in your request, you will see the failures only after you start streaming " + + "the data. The subscribe method should be called as a request to start streaming data. " + + "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new " + + "{@link $T} i.e., a new contract to stream data from the starting request.", + getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType()) + .add(getAsyncCodeSnippets()) .build() .toString(); } @@ -112,10 +114,11 @@ clientInterface, getPaginatedMethodName(), requestType(), getPublisherType(), syncResponsePageType()) .add("When the operation is called, an instance of this class is returned. At this point, " + "no service calls are made yet and so there is no guarantee that the request is valid. " - + "The subscribe method should be called as a request to stream data. For more info, " - + "see {@link $T#$L($T)}. If there are errors in your " - + "request, you will see the failures only after you start streaming the data.
", - getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType()) + + "If there are errors in your request, you will see the failures only after you start streaming " + + "the data. The subscribe method should be called as a request to start streaming data. " + + "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new " + + "{@link $T} i.e., a new contract to stream data from the starting request.", + getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType()) .add(getAsyncCodeSnippets()) .build() .toString(); @@ -160,7 +163,7 @@ private String getAsyncCodeSnippets() { return CodeBlock.builder() .add("\n\nThe following are few ways to use the response class:
") - .add("1) Using the forEach helper method. This uses @{@link $T} internally", + .add("1) Using the forEach helper method", TypeName.get(SequentialSubscriber.class)) .add(buildCode(CodeBlock.builder() .add(callOperationOnClient) @@ -255,4 +258,11 @@ private ClassName getPublisherType() { private ClassName getSubscriberType() { return ClassName.get(Subscriber.class); } + + /** + * @return A Poet {@link ClassName} for the reactive streams {@link Subscription}. + */ + private ClassName getSubscriptionType() { + return ClassName.get(Subscription.class); + } } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/SyncOperationDocProvider.java b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/SyncOperationDocProvider.java index 489fe1a795f0..7bd82ecdb44f 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/docs/SyncOperationDocProvider.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/docs/SyncOperationDocProvider.java @@ -214,7 +214,12 @@ private SyncPaginated(IntermediateModel model, OperationModel opModel) { @Override protected String appendToDescription() { - return opModel.isPaginated() ? paginationDocs.getDocsForSyncOperation() : ""; + return paginationDocs.getDocsForSyncOperation(); + } + + @Override + protected void applyReturns(DocumentationBuilder docBuilder) { + docBuilder.returns("A custom iterable that can be used to iterate through all the response pages."); } } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/emitters/tasks/PaginatorsGeneratorTasks.java b/codegen/src/main/java/software/amazon/awssdk/codegen/emitters/tasks/PaginatorsGeneratorTasks.java index 3c8778ea1336..eadc4dfb6d25 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/emitters/tasks/PaginatorsGeneratorTasks.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/emitters/tasks/PaginatorsGeneratorTasks.java @@ -47,7 +47,7 @@ protected boolean hasTasks() { @Override protected ListA helper method to resume the pages in case of unexpected failures. " + + "The method takes the last successful response page as input and returns an " + + "instance of {@link $T} that can be used to retrieve the consecutive pages " + + "that follows the input page.
", className()) + .build()); + } + /* * Returns the {@link TypeName} for a value in the {@link PaginatorDefinition#getResultKey()} list. * @@ -188,19 +211,32 @@ protected CodeBlock nextPageMethodBody() { */ private String codeToGetNextPageIfOldResponseIsNotNull() { StringBuilder sb = new StringBuilder(); + sb.append(String.format("return %s.%s(%s)", CLIENT_MEMBER, + operationModel.getMethodName(), + constructRequestFromLastPage(PREVIOUS_PAGE_METHOD_ARGUMENT))); + return sb.toString(); + } - sb.append(String.format("return %s.%s(%s.toBuilder()", CLIENT_MEMBER, operationModel.getMethodName(), REQUEST_MEMBER)); + /** + * Generates the code to construct a request object from the last successful page + * by setting the fields required to get the next page. + * + * Sample code: if responsePage string is "response" + * firstRequest.toBuilder().exclusiveStartTableName(response.lastEvaluatedTableName()).build() + */ + protected String constructRequestFromLastPage(String responsePage) { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("%s.toBuilder()", REQUEST_MEMBER)); List* When the operation is called, an instance of this class is returned. At this point, no service calls are made yet - * and so there is no guarantee that the request is valid. The subscribe method should be called as a request to - * stream data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. - * If there are errors in your request, you will see the failures only after you start streaming the data. + * and so there is no guarantee that the request is valid. If there are errors in your request, you will see the + * failures only after you start streaming the data. The subscribe method should be called as a request to start + * streaming data. For more info, see + * {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the subscribe + * method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data from the + * starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -395,8 +397,7 @@ publicCompletableFuture streamingOutputOperation( * * * @param paginatedOperationWithResultKeyRequest - * @return A Java Future containing the result of the PaginatedOperationWithResultKey operation returned by the - * service.
+ * @return A custom publisher that can be subscribed to request a stream of response pages.
* The CompletableFuture returned by this method can be completed exceptionally with the following * exceptions. *@@ -426,16 +427,18 @@ public PaginatedOperationWithResultKeyPublisher paginatedOperationWithResultKeyP * *
* When the operation is called, an instance of this class is returned. At this point, no service calls are made yet - * and so there is no guarantee that the request is valid. The subscribe method should be called as a request to - * stream data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. - * If there are errors in your request, you will see the failures only after you start streaming the data. + * and so there is no guarantee that the request is valid. If there are errors in your request, you will see the + * failures only after you start streaming the data. The subscribe method should be called as a request to start + * streaming data. For more info, see + * {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the subscribe + * method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data from the + * starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -467,8 +470,7 @@ public PaginatedOperationWithResultKeyPublisher paginatedOperationWithResultKeyP * * * @param paginatedOperationWithoutResultKeyRequest - * @return A Java Future containing the result of the PaginatedOperationWithoutResultKey operation returned by the - * service.
+ * @return A custom publisher that can be subscribed to request a stream of response pages.
* The CompletableFuture returned by this method can be completed exceptionally with the following * exceptions. *diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-async-client-interface.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-async-client-interface.java index 3c8a432b0179..97a265cad9d6 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-async-client-interface.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-async-client-interface.java @@ -281,6 +281,29 @@ default CompletableFuture
paginatedOper throw new UnsupportedOperationException(); } + /** + * Some paginated operation with result_key in paginators.json file + * + * @return A Java Future containing the result of the PaginatedOperationWithResultKey operation returned by the + * service.
+ * The CompletableFuture returned by this method can be completed exceptionally with the following + * exceptions. + *+ *
+ * @sample JsonAsyncClient.PaginatedOperationWithResultKey + * @see AWS API Documentation + */ + default CompletableFuture- SdkException Base class for all exceptions that can be thrown by the SDK (both service and client). + * Can be used for catch all scenarios.
+ *- SdkClientException If any client side error occurs such as an IO related failure, failure to get + * credentials, etc.
+ *- JsonException Base class for all service exceptions. Unknown exceptions will be thrown as an instance + * of this type.
+ *paginatedOperationWithResultKey() { + return paginatedOperationWithResultKey(PaginatedOperationWithResultKeyRequest.builder().build()); + } + /** * Some paginated operation with result_key in paginators.json file
* This is a convenience which creates an instance of the {@link PaginatedOperationWithResultKeyRequest.Builder} @@ -320,16 +343,18 @@ default CompletableFuturepaginatedOper * * * When the operation is called, an instance of this class is returned. At this point, no service calls are made yet - * and so there is no guarantee that the request is valid. The subscribe method should be called as a request to - * stream data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. - * If there are errors in your request, you will see the failures only after you start streaming the data. + * and so there is no guarantee that the request is valid. If there are errors in your request, you will see the + * failures only after you start streaming the data. The subscribe method should be called as a request to start + * streaming data. For more info, see + * {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the subscribe + * method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data from the + * starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -361,8 +386,7 @@ default CompletableFuturepaginatedOper * * * @param paginatedOperationWithResultKeyRequest - * @return A Java Future containing the result of the PaginatedOperationWithResultKey operation returned by the - * service.
+ * @return A custom publisher that can be subscribed to request a stream of response pages.
* The CompletableFuture returned by this method can be completed exceptionally with the following * exceptions. *@@ -382,6 +406,77 @@ default PaginatedOperationWithResultKeyPublisher paginatedOperationWithResultKey throw new UnsupportedOperationException(); } + /** + * Some paginated operation with result_key in paginators.json file
+ *+ * This is a variant of + * {@link #paginatedOperationWithResultKey(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyRequest)} + * operation. The return type is a custom publisher that can be subscribed to request a stream of response pages. + * SDK will internally handle making service calls for you. + *
+ *+ * When the operation is called, an instance of this class is returned. At this point, no service calls are made yet + * and so there is no guarantee that the request is valid. If there are errors in your request, you will see the + * failures only after you start streaming the data. The subscribe method should be called as a request to start + * streaming data. For more info, see + * {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the subscribe + * method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data from the + * starting request. + *
+ * + *+ * The following are few ways to use the response class: + *
+ * 1) Using the forEach helper method + * + *+ * {@code + * software.amazon.awssdk.services.json.paginators.PaginatedOperationWithResultKeyPublisher publisher = client.paginatedOperationWithResultKeyPaginator(request); + * CompletableFuture+ * + * 2) Using a custom subscriber + * + *future = publisher.forEach(res -> { // Do something with the response }); + * future.get(); + * } + * + * {@code + * software.amazon.awssdk.services.json.paginators.PaginatedOperationWithResultKeyPublisher publisher = client.paginatedOperationWithResultKeyPaginator(request); + * publisher.subscribe(new Subscriber+ * + * As the response is a publisher, it can work well with third party reactive streams implementations like RxJava2. + *() { + * + * public void onSubscribe(org.reactivestreams.Subscriber subscription) { //... }; + * + * + * public void onNext(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyResponse response) { //... }; + * });} + * + * Note: If you prefer to have control on service calls, use the + * {@link #paginatedOperationWithResultKey(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyRequest)} + * operation. + *
+ * + * @return A custom publisher that can be subscribed to request a stream of response pages.
+ * The CompletableFuture returned by this method can be completed exceptionally with the following + * exceptions. + *+ *
+ * @sample JsonAsyncClient.PaginatedOperationWithResultKey + * @see AWS API Documentation + */ + default PaginatedOperationWithResultKeyPublisher paginatedOperationWithResultKeyPaginator() { + return paginatedOperationWithResultKeyPaginator(PaginatedOperationWithResultKeyRequest.builder().build()); + } + /** * Some paginated operation without result_key in paginators.json file * @@ -446,16 +541,18 @@ default CompletableFuture- SdkException Base class for all exceptions that can be thrown by the SDK (both service and client). + * Can be used for catch all scenarios.
+ *- SdkClientException If any client side error occurs such as an IO related failure, failure to get + * credentials, etc.
+ *- JsonException Base class for all service exceptions. Unknown exceptions will be thrown as an instance + * of this type.
+ *paginatedO * * * When the operation is called, an instance of this class is returned. At this point, no service calls are made yet - * and so there is no guarantee that the request is valid. The subscribe method should be called as a request to - * stream data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. - * If there are errors in your request, you will see the failures only after you start streaming the data. + * and so there is no guarantee that the request is valid. If there are errors in your request, you will see the + * failures only after you start streaming the data. The subscribe method should be called as a request to start + * streaming data. For more info, see + * {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the subscribe + * method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data from the + * starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -487,8 +584,7 @@ default CompletableFuturepaginatedO * * * @param paginatedOperationWithoutResultKeyRequest - * @return A Java Future containing the result of the PaginatedOperationWithoutResultKey operation returned by the - * service.
+ * @return A custom publisher that can be subscribed to request a stream of response pages.
* The CompletableFuture returned by this method can be completed exceptionally with the following * exceptions. *diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-class.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-class.java index f030b90dbd34..149f0edd4cfb 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-class.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-class.java @@ -276,7 +276,7 @@ public PaginatedOperationWithResultKeyResponse paginatedOperationWithResultKey( * * * @param paginatedOperationWithResultKeyRequest - * @return Result of the PaginatedOperationWithResultKey operation returned by the service. + * @return A custom iterable that can be used to iterate through all the response pages. * @throws SdkException * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for * catch all scenarios. @@ -384,7 +384,7 @@ public PaginatedOperationWithoutResultKeyResponse paginatedOperationWithoutResul * * * @param paginatedOperationWithoutResultKeyRequest - * @return Result of the PaginatedOperationWithoutResultKey operation returned by the service. + * @return A custom iterable that can be used to iterate through all the response pages. * @throws SdkException * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for * catch all scenarios. diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-interface.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-interface.java index f14edf764684..fce4db7ac788 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-interface.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-json-client-interface.java @@ -236,6 +236,27 @@ default GetWithoutRequiredMembersResponse getWithoutRequiredMembers( .build()); } + /** + * Some paginated operation with result_key in paginators.json file + * + * @return Result of the PaginatedOperationWithResultKey operation returned by the service. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws JsonException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample JsonClient.PaginatedOperationWithResultKey + * @see #paginatedOperationWithResultKey(PaginatedOperationWithResultKeyRequest) + * @see AWS API Documentation + */ + default PaginatedOperationWithResultKeyResponse paginatedOperationWithResultKey() throws SdkServiceException, + SdkClientException, JsonException { + return paginatedOperationWithResultKey(PaginatedOperationWithResultKeyRequest.builder().build()); + } + /** * Some paginated operation with result_key in paginators.json file * @@ -281,6 +302,78 @@ default PaginatedOperationWithResultKeyResponse paginatedOperationWithResultKey( .apply(paginatedOperationWithResultKeyRequest).build()); } + /** + * Some paginated operation with result_key in paginators.json file
+ *+ * This is a variant of + * {@link #paginatedOperationWithResultKey(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyRequest)} + * operation. The return type is a custom iterable that can be used to iterate through all the pages. SDK will + * internally handle making service calls for you. + *
+ *+ * When this operation is called, a custom iterable is returned but no service calls are made yet. So there is no + * guarantee that the request is valid. As you iterate through the iterable, SDK will start lazily loading response + * pages by making service calls until there are no pages left or your iteration stops. If there are errors in your + * request, you will see the failures only after you start iterating through the iterable. + *
+ * + *+ * The following are few ways to iterate through the response pages: + *
+ * 1) Using a Stream + * + *+ * {@code + * software.amazon.awssdk.services.json.paginators.PaginatedOperationWithResultKeyIterable responses = client.paginatedOperationWithResultKeyPaginator(request); + * responses.stream().forEach(....); + * } + *+ * + * 2) Using For loop + * + *+ * { + * @code + * software.amazon.awssdk.services.json.paginators.PaginatedOperationWithResultKeyIterable responses = client + * .paginatedOperationWithResultKeyPaginator(request); + * for (software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyResponse response : responses) { + * // do something; + * } + * } + *+ * + * 3) Use iterator directly + * + *+ * {@code + * software.amazon.awssdk.services.json.paginators.PaginatedOperationWithResultKeyIterable responses = client.paginatedOperationWithResultKeyPaginator(request); + * responses.iterator().forEachRemaining(....); + * } + *+ *+ * Note: If you prefer to have control on service calls, use the + * {@link #paginatedOperationWithResultKey(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyRequest)} + * operation. + *
+ * + * @return A custom iterable that can be used to iterate through all the response pages. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws JsonException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample JsonClient.PaginatedOperationWithResultKey + * @see #paginatedOperationWithResultKeyPaginator(PaginatedOperationWithResultKeyRequest) + * @see AWS API Documentation + */ + default PaginatedOperationWithResultKeyIterable paginatedOperationWithResultKeyPaginator() throws SdkServiceException, + SdkClientException, JsonException { + return paginatedOperationWithResultKeyPaginator(PaginatedOperationWithResultKeyRequest.builder().build()); + } + /** * Some paginated operation with result_key in paginators.json file
*@@ -336,7 +429,7 @@ default PaginatedOperationWithResultKeyResponse paginatedOperationWithResultKey( *
* * @param paginatedOperationWithResultKeyRequest - * @return Result of the PaginatedOperationWithResultKey operation returned by the service. + * @return A custom iterable that can be used to iterate through all the response pages. * @throws SdkException * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for * catch all scenarios. @@ -454,7 +547,7 @@ default PaginatedOperationWithoutResultKeyResponse paginatedOperationWithoutResu * * * @param paginatedOperationWithoutResultKeyRequest - * @return Result of the PaginatedOperationWithoutResultKey operation returned by the service. + * @return A custom iterable that can be used to iterate through all the response pages. * @throws SdkException * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for * catch all scenarios. diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyIterable.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyIterable.java index fb9a4be80a72..7d8d89d6975f 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyIterable.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyIterable.java @@ -68,7 +68,7 @@ * */ @Generated("software.amazon.awssdk:codegen") -public final class PaginatedOperationWithResultKeyIterable implements SdkIterable{ +public class PaginatedOperationWithResultKeyIterable implements SdkIterable { private final JsonProtocolTestsClient client; private final PaginatedOperationWithResultKeyRequest firstRequest; @@ -106,6 +106,26 @@ public SdkIterable items() { return new PaginatedItemsIterable(this, getIterator); } + /** + * + * A helper method to resume the pages in case of unexpected failures. The method takes the last successful response + * page as input and returns an instance of {@link PaginatedOperationWithResultKeyIterable} that can be used to + * retrieve the consecutive pages that follows the input page. + *
+ */ + public PaginatedOperationWithResultKeyIterable resume(final PaginatedOperationWithResultKeyResponse lastSuccessfulPage) { + if (nextPageFetcher.hasNextPage(lastSuccessfulPage)) { + return new PaginatedOperationWithResultKeyIterable(client, firstRequest.toBuilder() + .nextToken(lastSuccessfulPage.nextToken()).build()); + } + return new PaginatedOperationWithResultKeyIterable(client, firstRequest) { + @Override + public Iteratoriterator() { + return Collections.emptyIterator(); + } + }; + } + private class PaginatedOperationWithResultKeyResponseFetcher implements SyncPageFetcher { @Override diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java index a48fd2ac0be2..ea387ddbc64a 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java @@ -7,6 +7,7 @@ import javax.annotation.Generated; import org.reactivestreams.Subscriber; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; +import software.amazon.awssdk.core.pagination.async.EmptySubscription; import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; import software.amazon.awssdk.core.pagination.async.SdkPublisher; @@ -26,16 +27,17 @@ * * * When the operation is called, an instance of this class is returned. At this point, no service calls are made yet and - * so there is no guarantee that the request is valid. The subscribe method should be called as a request to stream - * data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. If there - * are errors in your request, you will see the failures only after you start streaming the data. + * so there is no guarantee that the request is valid. If there are errors in your request, you will see the failures + * only after you start streaming the data. The subscribe method should be called as a request to start streaming data. + * For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the + * subscribe method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data + * from the starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -67,13 +69,15 @@ * */ @Generated("software.amazon.awssdk:codegen") -public final class PaginatedOperationWithResultKeyPublisher implements SdkPublisher{ +public class PaginatedOperationWithResultKeyPublisher implements SdkPublisher { private final JsonProtocolTestsAsyncClient client; private final PaginatedOperationWithResultKeyRequest firstRequest; private final AsyncPageFetcher nextPageFetcher; + private boolean isLastPage; + public PaginatedOperationWithResultKeyPublisher(final JsonProtocolTestsAsyncClient client, final PaginatedOperationWithResultKeyRequest firstRequest) { this.client = client; @@ -98,7 +102,32 @@ public SdkPublisher items() { } return Collections.emptyIterator(); }; - return new PaginatedItemsPublisher(new PaginatedOperationWithResultKeyResponseFetcher(), getIterator); + return new PaginatedItemsPublisher(new PaginatedOperationWithResultKeyResponseFetcher(), getIterator, isLastPage); + } + + /** + * + * A helper method to resume the pages in case of unexpected failures. The method takes the last successful response + * page as input and returns an instance of {@link PaginatedOperationWithResultKeyPublisher} that can be used to + * retrieve the consecutive pages that follows the input page. + *
+ */ + public PaginatedOperationWithResultKeyPublisher resume(final PaginatedOperationWithResultKeyResponse lastSuccessfulPage) { + if (nextPageFetcher.hasNextPage(lastSuccessfulPage)) { + return new PaginatedOperationWithResultKeyPublisher(client, firstRequest.toBuilder() + .nextToken(lastSuccessfulPage.nextToken()).build()); + } + return new PaginatedOperationWithResultKeyPublisher(client, firstRequest) { + @Override + public void subscribe(Subscriber super PaginatedOperationWithResultKeyResponse> subscriber) { + subscriber.onSubscribe(new EmptySubscription(subscriber)); + } + }.withLastPage(true); + } + + PaginatedOperationWithResultKeyPublisher withLastPage(boolean isLastPage) { + this.isLastPage = isLastPage; + return this; } private class PaginatedOperationWithResultKeyResponseFetcher implements diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyIterable.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyIterable.java index 557a3eabeeaa..675fad2aac45 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyIterable.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyIterable.java @@ -1,5 +1,6 @@ package software.amazon.awssdk.services.jsonprotocoltests.paginators; +import java.util.Collections; import java.util.Iterator; import javax.annotation.Generated; import software.amazon.awssdk.core.pagination.PaginatedResponsesIterator; @@ -64,7 +65,7 @@ * */ @Generated("software.amazon.awssdk:codegen") -public final class PaginatedOperationWithoutResultKeyIterable implements SdkIterable{ +public class PaginatedOperationWithoutResultKeyIterable implements SdkIterable { private final JsonProtocolTestsClient client; private final PaginatedOperationWithoutResultKeyRequest firstRequest; @@ -83,6 +84,26 @@ public Iterator iterator() { return new PaginatedResponsesIterator(nextPageFetcher); } + /** + * + * A helper method to resume the pages in case of unexpected failures. The method takes the last successful response + * page as input and returns an instance of {@link PaginatedOperationWithoutResultKeyIterable} that can be used to + * retrieve the consecutive pages that follows the input page. + *
+ */ + public PaginatedOperationWithoutResultKeyIterable resume(final PaginatedOperationWithoutResultKeyResponse lastSuccessfulPage) { + if (nextPageFetcher.hasNextPage(lastSuccessfulPage)) { + return new PaginatedOperationWithoutResultKeyIterable(client, firstRequest.toBuilder() + .nextToken(lastSuccessfulPage.nextToken()).build()); + } + return new PaginatedOperationWithoutResultKeyIterable(client, firstRequest) { + @Override + public Iteratoriterator() { + return Collections.emptyIterator(); + } + }; + } + private class PaginatedOperationWithoutResultKeyResponseFetcher implements SyncPageFetcher { @Override diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyPublisher.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyPublisher.java index 29dfdec7d4d5..ac86f1aae79d 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyPublisher.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithoutResultKeyPublisher.java @@ -4,6 +4,7 @@ import javax.annotation.Generated; import org.reactivestreams.Subscriber; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; +import software.amazon.awssdk.core.pagination.async.EmptySubscription; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; import software.amazon.awssdk.core.pagination.async.SdkPublisher; import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient; @@ -21,16 +22,17 @@ * * * When the operation is called, an instance of this class is returned. At this point, no service calls are made yet and - * so there is no guarantee that the request is valid. The subscribe method should be called as a request to stream - * data. For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. If there - * are errors in your request, you will see the failures only after you start streaming the data. + * so there is no guarantee that the request is valid. If there are errors in your request, you will see the failures + * only after you start streaming the data. The subscribe method should be called as a request to start streaming data. + * For more info, see {@link org.reactivestreams.Publisher#subscribe(org.reactivestreams.Subscriber)}. Each call to the + * subscribe method will result in a new {@link org.reactivestreams.Subscription} i.e., a new contract to stream data + * from the starting request. *
* ** The following are few ways to use the response class: *
- * 1) Using the forEach helper method. This uses @ - * {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally + * 1) Using the forEach helper method * ** {@code @@ -62,14 +64,15 @@ * */ @Generated("software.amazon.awssdk:codegen") -public final class PaginatedOperationWithoutResultKeyPublisher implements - SdkPublisher{ +public class PaginatedOperationWithoutResultKeyPublisher implements SdkPublisher { private final JsonProtocolTestsAsyncClient client; private final PaginatedOperationWithoutResultKeyRequest firstRequest; private final AsyncPageFetcher nextPageFetcher; + private boolean isLastPage; + public PaginatedOperationWithoutResultKeyPublisher(final JsonProtocolTestsAsyncClient client, final PaginatedOperationWithoutResultKeyRequest firstRequest) { this.client = client; @@ -82,6 +85,31 @@ public void subscribe(Subscriber super PaginatedOperationWithoutResultKeyRespo subscriber.onSubscribe(new ResponsesSubscription(subscriber, nextPageFetcher)); } + /** + * + * A helper method to resume the pages in case of unexpected failures. The method takes the last successful response + * page as input and returns an instance of {@link PaginatedOperationWithoutResultKeyPublisher} that can be used to + * retrieve the consecutive pages that follows the input page. + *
+ */ + public PaginatedOperationWithoutResultKeyPublisher resume(final PaginatedOperationWithoutResultKeyResponse lastSuccessfulPage) { + if (nextPageFetcher.hasNextPage(lastSuccessfulPage)) { + return new PaginatedOperationWithoutResultKeyPublisher(client, firstRequest.toBuilder() + .nextToken(lastSuccessfulPage.nextToken()).build()); + } + return new PaginatedOperationWithoutResultKeyPublisher(client, firstRequest) { + @Override + public void subscribe(Subscriber super PaginatedOperationWithoutResultKeyResponse> subscriber) { + subscriber.onSubscribe(new EmptySubscription(subscriber)); + } + }.withLastPage(true); + } + + PaginatedOperationWithoutResultKeyPublisher withLastPage(boolean isLastPage) { + this.isLastPage = isLastPage; + return this; + } + private class PaginatedOperationWithoutResultKeyResponseFetcher implements AsyncPageFetcher{ @Override diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customization.config b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customization.config index c90352b1dd80..53d1e1898170 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customization.config +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customization.config @@ -3,5 +3,6 @@ "allTypes", "nestedContainers", "operationWithNoInputOrOutput" - ] + ], + "verifiedSimpleMethods" : ["paginatedOperationWithResultKey"] } \ No newline at end of file diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/service-2.json b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/service-2.json index 8dff2991dccb..49e9ca1caa8b 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/service-2.json +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/service-2.json @@ -294,9 +294,6 @@ "Timestamp":{"type":"timestamp"}, "PaginatedOperationWithResultKeyRequest": { "type": "structure", - "required": [ - "NextToken" - ], "members": { "NextToken": { "shape": "String", diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterable.java b/core/src/main/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterable.java index 1b241bfaa261..98931eb027d3 100644 --- a/core/src/main/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterable.java +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterable.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.core.pagination; +import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.function.Function; @@ -51,17 +52,17 @@ private class ItemsIterator implements Iterator { ItemsIterator(final Iterator pagesIterator) { this.pagesIterator = pagesIterator; - this.singlePageItemsIterator = getItemIterator.apply(pagesIterator.next()); + this.singlePageItemsIterator = pagesIterator.hasNext() ? getItemIterator.apply(pagesIterator.next()) + : Collections.emptyIterator(); } @Override public boolean hasNext() { - while ((singlePageItemsIterator == null || !singlePageItemsIterator.hasNext()) - && pagesIterator.hasNext()) { + while (!hasMoreItems() && pagesIterator.hasNext()) { singlePageItemsIterator = getItemIterator.apply(pagesIterator.next()); } - if (singlePageItemsIterator != null && singlePageItemsIterator.hasNext()) { + if (hasMoreItems()) { return true; } @@ -76,6 +77,10 @@ public ItemT next() { return singlePageItemsIterator.next(); } + + private boolean hasMoreItems() { + return singlePageItemsIterator.hasNext(); + } } } diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/async/EmptySubscription.java b/core/src/main/java/software/amazon/awssdk/core/pagination/async/EmptySubscription.java new file mode 100644 index 000000000000..923fb0ad7307 --- /dev/null +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/async/EmptySubscription.java @@ -0,0 +1,62 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.pagination.async; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * A NoOp implementation of {@link Subscription} interface. + * + * This subscription calls {@link Subscriber#onComplete()} on first request for data and then terminates the subscription. + */ +public class EmptySubscription implements Subscription { + + private final AtomicBoolean isTerminated = new AtomicBoolean(false); + private final Subscriber subscriber; + + public EmptySubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + if (isTerminated()) { + return; + } + + if (n <= 0) { + throw new IllegalArgumentException("Non-positive request signals are illegal"); + } + + subscriber.onComplete(); + terminate(); + } + + @Override + public void cancel() { + terminate(); + } + + private void terminate() { + isTerminated.compareAndSet(false, true); + } + + private boolean isTerminated() { + return isTerminated.get(); + } +} diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/async/ItemsSubscription.java b/core/src/main/java/software/amazon/awssdk/core/pagination/async/ItemsSubscription.java new file mode 100644 index 000000000000..4765e3952574 --- /dev/null +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/async/ItemsSubscription.java @@ -0,0 +1,96 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.pagination.async; + +import java.util.Iterator; +import java.util.function.Function; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * An implementation of the {@link Subscription} interface that can be used to signal and cancel demand for + * paginated items across pages + * + * @param The type of a single response page + * @param The type of paginated member in a response page + */ +public class ItemsSubscription extends PaginationSubscription { + private final Function > getIteratorFunction; + private volatile Iterator singlePageItemsIterator; + + public ItemsSubscription(Subscriber subscriber, + AsyncPageFetcher nextPageFetcher, + Function > getIteratorFunction) { + super(subscriber, nextPageFetcher); + this.getIteratorFunction = getIteratorFunction; + } + + + protected void handleRequests() { + if (!hasMoreItems() && !hasNextPage()) { + completeSubscription(); + return; + } + + if (outstandingRequests.get() > 0 && !isTerminated()) { + /** + * Current page is null only the first time the method is called. + * Once initialized, current page will never be null + */ + if (currentPage == null || (!hasMoreItems() && hasNextPage())) { + fetchNextPage(); + + } else if (hasMoreItems()) { + sendNextElement(); + + // All valid cases are covered above. Throw an exception if any combination is missed + } else { + throw new IllegalStateException("Execution should have not reached here"); + } + } + } + + private void fetchNextPage() { + nextPageFetcher.nextPage(currentPage) + .whenComplete(((response, error) -> { + if (response != null) { + currentPage = response; + singlePageItemsIterator = getIteratorFunction.apply(response); + sendNextElement(); + } + if (error != null) { + subscriber.onError(error); + cleanup(); + } + })); + } + + /** + * Calls onNext and calls the recursive method. + */ + private void sendNextElement() { + if (singlePageItemsIterator.hasNext()) { + subscriber.onNext(singlePageItemsIterator.next()); + outstandingRequests.getAndDecrement(); + } + + handleRequests(); + } + + private boolean hasMoreItems() { + return singlePageItemsIterator != null && singlePageItemsIterator.hasNext(); + } +} diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java b/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java index 734cf2b06b62..c69d251882f1 100644 --- a/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java @@ -16,9 +16,6 @@ package software.amazon.awssdk.core.pagination.async; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import org.reactivestreams.Subscriber; @@ -35,112 +32,19 @@ public class PaginatedItemsPublisher implements SdkPublisher> getIteratorFunction; + private final boolean isLastPage; + public PaginatedItemsPublisher(AsyncPageFetcher nextPageFetcher, - Function > getIteratorFunction) { + Function > getIteratorFunction, + boolean isLastPage) { this.nextPageFetcher = nextPageFetcher; this.getIteratorFunction = getIteratorFunction; + this.isLastPage = isLastPage; } @Override public void subscribe(Subscriber super ItemT> subscriber) { - subscriber.onSubscribe(new ItemsSubscription(subscriber)); - } - - private class ItemsSubscription implements org.reactivestreams.Subscription { - - private AtomicLong outstandingRequests = new AtomicLong(0); - - private AtomicBoolean isTaskRunning = new AtomicBoolean(false); - - private final Subscriber subscriber; - - private volatile Iterator singlePageItemsIterator; - - private volatile ResponseT currentPage; - - ItemsSubscription(Subscriber subscriber) { - this.subscriber = subscriber; - } - - @Override - public void request(long n) { - if (n <= 0) { - throw new IllegalArgumentException("Non-positive request signals are illegal"); - } - - outstandingRequests.addAndGet(n); - - synchronized (this) { - if (!isTaskRunning.get()) { - isTaskRunning.set(true); - handleRequestsRecursively(); - } - } - } - - private void handleRequestsRecursively() { - if (currentPage != null && !nextPageFetcher.hasNextPage(currentPage) && - singlePageItemsIterator != null && !singlePageItemsIterator.hasNext()) { - subscriber.onComplete(); - isTaskRunning.set(false); - return; - } - - if (outstandingRequests.get() <= 0) { - isTaskRunning.set(false); - return; - } - - /** - * Current page is null only the first time the method is called. - * Once initialized, current page will never be null - */ - if (currentPage == null && singlePageItemsIterator == null) { - fetchNextPage(); - - } else if (singlePageItemsIterator != null && singlePageItemsIterator.hasNext()) { - sendNextElement(); - - } else if (singlePageItemsIterator != null && !singlePageItemsIterator.hasNext() && - currentPage != null && nextPageFetcher.hasNextPage(currentPage)) { - fetchNextPage(); - - // All valid cases are covered above. Throw an exception if any combination is missed - } else { - throw new IllegalStateException("Execution should have not reached here"); - } - } - - private void fetchNextPage() { - CompletableFuture future = nextPageFetcher.nextPage(currentPage); - future.whenComplete(((response, error) -> { - if (response != null) { - currentPage = response; - singlePageItemsIterator = getIteratorFunction.apply(response); - sendNextElement(); - } - if (error != null) { - subscriber.onError(error); - } - })); - } - - /** - * Calls onNext and calls the recursive method. - */ - private void sendNextElement() { - if (singlePageItemsIterator.hasNext()) { - subscriber.onNext(singlePageItemsIterator.next()); - outstandingRequests.getAndDecrement(); - } - - handleRequestsRecursively(); - } - - @Override - public void cancel() { - outstandingRequests.set(0); - isTaskRunning.set(false); - } + subscriber.onSubscribe(isLastPage ? new EmptySubscription(subscriber) + : new ItemsSubscription(subscriber, nextPageFetcher, getIteratorFunction)); } } diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginationSubscription.java b/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginationSubscription.java new file mode 100644 index 000000000000..7bd4c0005ace --- /dev/null +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginationSubscription.java @@ -0,0 +1,99 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.pagination.async; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public abstract class PaginationSubscription implements Subscription { + + protected AtomicLong outstandingRequests = new AtomicLong(0); + protected final Subscriber subscriber; + protected final AsyncPageFetcher nextPageFetcher; + protected volatile ResponseT currentPage; + + // boolean indicating whether subscription is terminated + private AtomicBoolean isTerminated = new AtomicBoolean(false); + + // boolean indicating whether task to handle requests is running + private AtomicBoolean isTaskRunning = new AtomicBoolean(false); + + public PaginationSubscription(Subscriber subscriber, AsyncPageFetcher nextPageFetcher) { + this.subscriber = subscriber; + this.nextPageFetcher = nextPageFetcher; + } + + @Override + public void request(long n) { + if (isTerminated()) { + return; + } + + if (n <= 0) { + throw new IllegalArgumentException("Non-positive request signals are illegal"); + } + + outstandingRequests.addAndGet(n); + if (startTask()) { + handleRequests(); + } + } + + /** + * Recursive method to deal with requests until there are no outstandingRequests or + * no more pages. + */ + protected abstract void handleRequests(); + + @Override + public void cancel() { + cleanup(); + } + + protected boolean hasNextPage() { + return currentPage == null || nextPageFetcher.hasNextPage(currentPage); + } + + protected void completeSubscription() { + if (!isTerminated()) { + subscriber.onComplete(); + cleanup(); + } + } + + private void terminate() { + isTerminated.compareAndSet(false, true); + } + + protected boolean isTerminated() { + return isTerminated.get(); + } + + private void stopTask() { + isTaskRunning.set(false); + } + + private synchronized boolean startTask() { + return !isTerminated() && isTaskRunning.compareAndSet(false, true); + } + + protected synchronized void cleanup() { + terminate(); + stopTask(); + } +} diff --git a/core/src/main/java/software/amazon/awssdk/core/pagination/async/ResponsesSubscription.java b/core/src/main/java/software/amazon/awssdk/core/pagination/async/ResponsesSubscription.java index 503f124fda3e..c21a9cfd62e0 100644 --- a/core/src/main/java/software/amazon/awssdk/core/pagination/async/ResponsesSubscription.java +++ b/core/src/main/java/software/amazon/awssdk/core/pagination/async/ResponsesSubscription.java @@ -15,85 +15,41 @@ package software.amazon.awssdk.core.pagination.async; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; /** - * A publisher to request for a stream of response pages. The class can be used to request data until - * there are no more pages. + * An implementation of the {@link Subscription} interface that can be used to signal and cancel demand for + * paginated response pages. * * @param The type of a single response page */ -public class ResponsesSubscription implements Subscription { - private final Subscriber subscriber; - - private AtomicLong outstandingRequests = new AtomicLong(0); - - private AsyncPageFetcher nextPageFetcher; - - private volatile ResponseT currentPage; - - private AtomicBoolean isTaskRunning = new AtomicBoolean(false); +public class ResponsesSubscription extends PaginationSubscription { public ResponsesSubscription(Subscriber subscriber, AsyncPageFetcher nextPageFetcher) { - this.subscriber = subscriber; - this.nextPageFetcher = nextPageFetcher; - } - - @Override - public void request(long n) { - if (n <= 0) { - throw new IllegalArgumentException("Non-positive request signals are illegal"); - } - - outstandingRequests.addAndGet(n); - - synchronized (this) { - if (!isTaskRunning.get()) { - isTaskRunning.set(true); - handleRequest(); - } - } + super(subscriber, nextPageFetcher); } - /** - * Recursive method to deal with requests until there are no outstandingRequests or - * no more pages. - */ - private synchronized void handleRequest() { - if (currentPage != null && !nextPageFetcher.hasNextPage(currentPage)) { - subscriber.onComplete(); - isTaskRunning.set(false); + protected void handleRequests() { + if (!hasNextPage()) { + completeSubscription(); return; } - if (outstandingRequests.get() <= 0) { - isTaskRunning.set(false); - return; + if (outstandingRequests.get() > 0 && !isTerminated()) { + outstandingRequests.getAndDecrement(); + nextPageFetcher.nextPage(currentPage) + .whenComplete(((response, error) -> { + if (response != null) { + currentPage = response; + subscriber.onNext(response); + handleRequests(); + } + if (error != null) { + subscriber.onError(error); + cleanup(); + } + })); } - - outstandingRequests.getAndDecrement(); - - CompletableFuture future = nextPageFetcher.nextPage(currentPage); - future.whenComplete((response, error) -> { - if (response != null) { - currentPage = response; - subscriber.onNext(response); - handleRequest(); - } - - if (error != null) { - subscriber.onError(error); - } - }); - } - - @Override - public void cancel() { - outstandingRequests.set(0); - isTaskRunning.set(false); } } diff --git a/core/src/test/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterableTest.java b/core/src/test/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterableTest.java index ebf4ed184d9c..d65477018aa4 100644 --- a/core/src/test/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterableTest.java +++ b/core/src/test/java/software/amazon/awssdk/core/pagination/PaginatedItemsIterableTest.java @@ -67,14 +67,18 @@ public void hasNext_ReturnsFalse_WhenItemsAndPagesIteratorHasNoNextElement() { @Test public void hasNext_ReturnsTrue_WhenItemsIteratorHasNextElement() { + when(pagesIterator.hasNext()).thenReturn(true); + when(getItemIteratorFunction.apply(any())).thenReturn(singlePageItemsIterator); when(singlePageItemsIterator.hasNext()).thenReturn(true); - when(pagesIterator.hasNext()).thenReturn(false); assertTrue(itemsIterable.iterator().hasNext()); } @Test public void hasNextMethodDoesNotRetrieveNextPage_WhenItemsIteratorHasAnElement() { + when(pagesIterator.hasNext()).thenReturn(true); + when(getItemIteratorFunction.apply(any())).thenReturn(singlePageItemsIterator); + when(singlePageItemsIterator.hasNext()).thenReturn(true); Iterator itemsIterator = itemsIterable.iterator(); @@ -102,12 +106,12 @@ public void hasNextMethodGetsNextPage_WhenCurrentItemsIteratorHasNoElements() { } @Test - public void hasNextMethodGetsNextPage_WhenCurrentItemsIteratorIsNull() { + public void hasNextMethodGetsNextPage_WhenCurrentItemsIteratorIsEmpty() { when(pagesIterator.hasNext()).thenReturn(true); - when(getItemIteratorFunction.apply(any())).thenReturn(null, singlePageItemsIterator); + when(getItemIteratorFunction.apply(any())).thenReturn(singlePageItemsIterator); - when(singlePageItemsIterator.hasNext()).thenReturn(true); + when(singlePageItemsIterator.hasNext()).thenReturn(false, true); itemsIterable.iterator().hasNext(); diff --git a/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/ScanPaginatorIntegrationTest.java b/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/ScanPaginatorIntegrationTest.java index d9e355849e0d..0ab7419ff0e3 100644 --- a/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/ScanPaginatorIntegrationTest.java +++ b/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/ScanPaginatorIntegrationTest.java @@ -15,12 +15,14 @@ package software.amazon.awssdk.services.dynamodb; +import static org.junit.Assert.assertEquals; + import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.stream.Stream; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import software.amazon.awssdk.core.pagination.SdkIterable; @@ -86,14 +88,14 @@ public void test_MultipleIteration_On_Responses_Iterable() { for (ScanResponse response : scanResponses) { count += response.count(); } - Assert.assertEquals(ITEM_COUNT, count); + assertEquals(ITEM_COUNT, count); // Iterate second time count = 0; for (ScanResponse response : scanResponses) { count += response.count(); } - Assert.assertEquals(ITEM_COUNT, count); + assertEquals(ITEM_COUNT, count); } @Test @@ -107,14 +109,14 @@ public void test_MultipleIteration_On_PaginatedMember_Iterable() { for (Map item : items) { count++; } - Assert.assertEquals(ITEM_COUNT, count); + assertEquals(ITEM_COUNT, count); // Iterate second time count = 0; for (Map item : items) { count++; } - Assert.assertEquals(ITEM_COUNT, count); + assertEquals(ITEM_COUNT, count); } @Test @@ -124,17 +126,17 @@ public void test_MultipleIteration_On_Responses_Stream() { ScanIterable scanResponses = dynamo.scanPaginator(request); // Iterate once - Assert.assertEquals(ITEM_COUNT, scanResponses.stream() + assertEquals(ITEM_COUNT, scanResponses.stream() .mapToInt(response -> response.count()) .sum()); // Iterate second time - Assert.assertEquals(ITEM_COUNT, scanResponses.stream() + assertEquals(ITEM_COUNT, scanResponses.stream() .mapToInt(response -> response.count()) .sum()); // Number of pages - Assert.assertEquals(Math.ceil((double) ITEM_COUNT/results_per_page), scanResponses.stream().count(), 0); + assertEquals(Math.ceil((double) ITEM_COUNT/results_per_page), scanResponses.stream().count(), 0); } @Test @@ -144,10 +146,10 @@ public void test_MultipleIteration_On_PaginatedMember_Stream() { SdkIterable