Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Async pagination #419

Merged
merged 1 commit into from
Mar 5, 2018
Merged

Support Async pagination #419

merged 1 commit into from
Mar 5, 2018

Conversation

varunnvs92
Copy link
Contributor

@varunnvs92 varunnvs92 commented Feb 2, 2018

Add support for Async Pagination
#185

Description

Generates extra methods in client and interface for paginated methods. These methods return a custom publisher that can be used to request a stream of data without the customer explicitly making service calls.
Added RxJava2 as test dependency to show a test case on using it. I am not fully satisfied with the async user experience using the pub-sub interfaces. Using third party reactive streams implementations will help in making experience better as there are lot of helper methods.

Edit1

  • Added resume method in the response that takes in last successful page and returns a new instance of the response class. The new instance can be used to iterate/request data after the last successful page

Motivation and Context

#185

Testing

Added unit tests
See ListObjectsV2PaginatorsIntegrationTest.java for integration test and how customer uses

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have read the README document
  • I have added tests to cover my changes
  • All new and existing tests passed
  • A short description of the change has been added to the CHANGELOG

License

  • I confirm that this pull request can be released under the Apache 2 license


@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForAsyncOperation() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null will cause a Null Dereference risk on fortify, we can probably return Optional here

@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch 3 times, most recently from f7c140d to 6beb521 Compare February 16, 2018 01:02
@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch from 6beb521 to 4b7f0cd Compare February 22, 2018 20:11
+ "see {@link $T#$L($T)}. If there are errors in your "
+ "request, you will see the failures only after you start streaming the data.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType())
.add(getAsyncCodeSnippets())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

/**
* @return A Poet {@link ClassName} for the reactive streams {@link Publisher}.
*/
private ClassName getPublisherType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this and the one below in separate methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are used 2-3 places in the class. I thought it would be better to have them in separate method rather than calling the ClassName.get() method.

return new PoetGeneratorTask(paginatorsClassDir, model.getFileHeader(), classSpec);
}

private Stream<GeneratorTask> createASyncTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ASync -> Async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*/
@ReviewBeforeRelease("Naming of response shape for paginated APIs")
public ClassName getResponseClassForPaginatedSyncOperation(String operationName) {
return ClassName.get(model.getMetadata().getFullPaginatorsPackageName(), operationName + "Paginator");
return ClassName.get(model.getMetadata().getFullPaginatorsPackageName(), operationName + "Iterable");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change in name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method ends with "Paginator", so didn't want same name for response type. For example,
ListTablesIterable listTablesPaginator(ListTablesRequest);
over
ListTablesPaginator listTablesPaginator(ListTablesRequest);

This also conveys the return type is a Iterable. It also matches the convention for async method.
// Async method declaration
ListTablesPublisher listTablesPaginator(ListTablesRequest);

return paginatedTraditionalMethodBody(builder, opModel).build();
}

protected MethodSpec.Builder paginatedTraditionalMethodBody(MethodSpec.Builder builder, OperationModel operationModel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overriden in AsyncClientClass

* 1) Using the forEach helper method. This uses @
* {@link software.amazon.awssdk.core.pagination.async.SequentialSubscriber} internally
*
* <pre>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this all look good in javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting is good. But classes are displayed with FQCN. So it looks little verbose.

subscriber.onSubscribe(new ItemsSubscription(subscriber));
}

private class ItemsSubscription implements org.reactivestreams.Subscription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the full fqcn import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -247,6 +247,11 @@
<artifactId>netty-reactive-streams-http</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the size of this dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size is 2.1MB. I added it to show that customer can use 3rd party implementations for additional helper methods. I can remove it if we decide so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a test dependency?

Copy link
Contributor Author

@varunnvs92 varunnvs92 Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have marked the scope as test in services pom. Do we need it here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No just where it's used is fine. Just making sure it's a test dependency.

Copy link
Contributor

@shorea shorea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looking at it, will review more later.


@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForAsyncOperation() : "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to check isPaginated? Would this be invoked if the operation isn't paginated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, removed the check.

.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
+ "see {@link $T#$L($T)}. If there are errors in your "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a question for both sync and async pagination. How do we handle retryable/transient errors while paginating? Is the iterable/publisher considered terminated at that point and the customer has to make a new request? Is there a way they can get the request object to reconnect (i.e. without getting the last response tokens manually)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, added the resume() method in the Response.

syncResponsePageType())
.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some documentation on what multiple subscribes means? I.E. each one is a separate iteration from the starting request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


return CodeBlock.builder()
.add("\n\n<p>The following are few ways to use the response class:</p>")
.add("1) Using the forEach helper method. This uses @{@link $T} internally",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do they need to care what it uses internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, removed.

.add(buildCode(CodeBlock.builder()
.add(callOperationOnClient)
.add(CodeBlock.builder()
.addStatement("CompletableFuture<Void> future = publisher"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the future here be fulfilled with the last response? Would that be useful or confusing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would you do it? Can you show me sample code

.collect(Collectors.toList());
}

private Stream<GeneratorTask> createSyncTasks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to reduce a little duplication here and create a method that takes in this::createSyncTask this::createAsyncTask respectively.

@@ -128,6 +129,14 @@ private MethodSpec closeMethod() {

}

@Override
protected MethodSpec.Builder paginatedTraditionalMethodBody(MethodSpec.Builder builder, OperationModel opModel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does traditional mean in this context? Non-simple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, a method that takes in a request and returns a response.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to use "traditional" here (vs nothing) and in other cases? Are there any cases beyond paginated methods and paginated simple methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. There is no need for the "traditional" part as its the only method body for paginated operations. Will remove.

* {@link #paginatedOperationWithResultKey(software.amazon.awssdk.services.json.model.PaginatedOperationWithResultKeyRequest)}
* operation.</b>
* </p>
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no onError and onComplete in the custom Subscriber example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the body of the methods are empty, didn't think it would be useful to add all of them and clutter the docs. Anyways when customers tries to add code like this, compiler will give errors about the missing methods

* </p>
*
* @param paginatedOperationWithResultKeyRequest
* @return A Java Future containing the result of the PaginatedOperationWithResultKey operation returned by the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect

Copy link
Contributor Author

@varunnvs92 varunnvs92 Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the return statement.
Needs some more changes to fix the next line (prefix for async throws statement)

* @see <a href="http://docs.aws.amazon.com/goto/WebAPI/json-service-2010-05-08/PaginatedOperationWithResultKey"
* target="_top">AWS API Documentation</a>
*/
default PaginatedOperationWithResultKeyPublisher paginatedOperationWithResultKeyPaginator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that exercises the no arg simple method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add


outstandingRequests.addAndGet(n);

synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use compareAndSet here and lose the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

private void handleRequestsRecursively() {
if (currentPage != null && !nextPageFetcher.hasNextPage(currentPage) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This expression is a bit hard to read. Some methods may help?

if(!(hasNextPage() || hasMoreItems())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

private void fetchNextPage() {
CompletableFuture<ResponseT> future = nextPageFetcher.nextPage(currentPage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sendNextElement();
}
if (error != null) {
subscriber.onError(error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something that tells the publisher to stop publishing items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a isTerminated flag


@Override
public void cancel() {
outstandingRequests.set(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would future calls to request(n) re-activate the subscription? Should we guard against that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a isTerminated flag

* @param <ResponseT> The type of a single response page
* @param <ItemT> The type of paginated member in a response page
*/
public class PaginatedItemsPublisher<ResponseT, ItemT> implements SdkPublisher<ItemT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of duplicated code from the Response publisher. Is there a way to consolidate?

This code from Rx might be relevant for making this more abstract

https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't use the above link. But abstracted out the common code.


outstandingRequests.getAndDecrement();

CompletableFuture<ResponseT> future = nextPageFetcher.nextPage(currentPage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline

return;
}

if (outstandingRequests.get() <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking there's a race condition between this and the code that initiates a new task.

Evaluation order
If(outstandingRequests.get() <= 0) evalues to true
customer requests more demand
outstandingRequests incremented
we check if task is still running, see that it is, and bail
we set isTaskRunning to false here and now we've dropped a request

I think we would need to acquire the same lock to prevent this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the code a bit. Still looking for race conditions

@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch 2 times, most recently from de3b0e6 to cdcba96 Compare March 1, 2018 20:38
@@ -128,6 +129,14 @@ private MethodSpec closeMethod() {

}

@Override
protected MethodSpec.Builder paginatedTraditionalMethodBody(MethodSpec.Builder builder, OperationModel opModel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to use "traditional" here (vs nothing) and in other cases? Are there any cases beyond paginated methods and paginated simple methods?

return new PaginatedOperationWithResultKeyIterable(client, firstRequest.toBuilder()
.nextToken(lastSuccessfulPage.nextToken()).build());
}
return new PaginatedOperationWithResultKeyIterable(client, firstRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior here? To return an empty list if no additional pages are available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If customer tries to resume from last page (for some reason), the iterator method would return empty iterator as there would be no more pages.


private final AsyncPageFetcher nextPageFetcher;

private boolean isLastPage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In resume method, we return EmptySubscription (EmptyIterator for sync) on subscribe if there are no more pages. This works when customer subscribes to get pages. If you try to subscribe to get Items (PaginatedItemsPublisher class), there is currently no way to recognize if input is last page and return EmptySubscription. The two options I see are:

  1. Pass a value to PaginatedItemsPublisher to indicate its last page (current approach)
  2. Override all the items methods in resume method to use EmptySubscription (if there are no more pages). I felt this will bloat all the publisher classes and chose option 1.
    The resume method in option 2 will look like:
    public PaginatedOperationWithResultKeyPublisher resume(final PaginatedOperationWithResultKeyResponse lastSuccessfulPage) {
        if (nextPageFetcher.hasNextPage(lastSuccessfulPage)) {
            return new PaginatedOperationWithResultKeyPublisher(client, firstRequest.toBuilder()
                                                                                    .nextToken(lastSuccessfulPage.nextToken()).build());
        }
        return new PaginatedOperationWithResultKeyPublisher(client, firstRequest) {
            @Override
            public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyResponse> subscriber) {
                subscriber.onSubscribe(new EmptySubscription(subscriber));
            }

            @Override
            public SdkPublisher<SimpleStruct> items() {
                return new EmptyPublisher<>(); // A new publisher that creates EmptySubscription when subscribed to
            }
            
            //More overriden methods if there are multiple result keys
        };
    }

@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch from cdcba96 to bc78fb0 Compare March 1, 2018 22:55
Copy link
Contributor

@shorea shorea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll finish looking at this tonight.

throw new IllegalArgumentException("Non-positive request signals are illegal");
}

subscriber.onComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition here, if I queue up two requests then I can get two completes.

This could should fix it.

if(isTerminated.compareAndSet(false, true)) {
   subscriber.onComplete();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}.withLastPage(true);
}

PaginatedOperationWithoutResultKeyPublisher withLastPage(boolean isLastPage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a setter for this? Can't you just set it in the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be set in constructor too. But having it in setter might hide it better constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually thinking again, having a private constructor is better than the setter. Will fix it.

@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch from bc78fb0 to 8558232 Compare March 2, 2018 03:53
@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch from 60e8177 to 813a7c2 Compare March 5, 2018 19:51
@varunnvs92 varunnvs92 force-pushed the varunkn/AsyncPagination branch from 813a7c2 to 3b12e24 Compare March 5, 2018 19:55
@varunnvs92 varunnvs92 merged commit 7544074 into master Mar 5, 2018
@varunnvs92 varunnvs92 deleted the varunkn/AsyncPagination branch May 15, 2018 17:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants