Skip to content

Commit

Permalink
First revision - Added resume method
Browse files Browse the repository at this point in the history
  • Loading branch information
AWS authored and varunnvs92 committed Feb 28, 2018
1 parent 4b7f0cd commit de3b0e6
Show file tree
Hide file tree
Showing 29 changed files with 945 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ private AsyncPaginated(IntermediateModel model, OperationModel opModel) {

@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForAsyncOperation() : "";
return paginationDocs.getDocsForAsyncOperation();
}

@Override
protected void applyReturns(DocumentationBuilder docBuilder) {
docBuilder.returns("A custom publisher that can be subscribed to request a stream of response pages.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
public final class DocumentationBuilder {

// TODO This prefix is not suitable for paginated operations. Either remove it for paginated operations
// or change the statement to something generic
private static final String ASYNC_THROWS_PREFIX = "The CompletableFuture returned by this method can be completed " +
"exceptionally with the following exceptions.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.squareup.javapoet.TypeName;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
import software.amazon.awssdk.codegen.poet.PoetExtensions;
Expand Down Expand Up @@ -90,11 +91,12 @@ public String getDocsForAsyncOperation() {
operationModel.getMethodName(), requestType())
.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
+ "see {@link $T#$L($T)}. If there are errors in your "
+ "request, you will see the failures only after you start streaming the data.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType())
.add(getAsyncCodeSnippets())
+ "If there are errors in your request, you will see the failures only after you start streaming "
+ "the data. The subscribe method should be called as a request to start streaming data. "
+ "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new "
+ "{@link $T} i.e., a new contract to stream data from the starting request.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType())
.add(getAsyncCodeSnippets())
.build()
.toString();
}
Expand All @@ -112,10 +114,11 @@ clientInterface, getPaginatedMethodName(), requestType(), getPublisherType(),
syncResponsePageType())
.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
+ "see {@link $T#$L($T)}. If there are errors in your "
+ "request, you will see the failures only after you start streaming the data.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType())
+ "If there are errors in your request, you will see the failures only after you start streaming "
+ "the data. The subscribe method should be called as a request to start streaming data. "
+ "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new "
+ "{@link $T} i.e., a new contract to stream data from the starting request.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType())
.add(getAsyncCodeSnippets())
.build()
.toString();
Expand Down Expand Up @@ -160,7 +163,7 @@ private String getAsyncCodeSnippets() {

return CodeBlock.builder()
.add("\n\n<p>The following are few ways to use the response class:</p>")
.add("1) Using the forEach helper method. This uses @{@link $T} internally",
.add("1) Using the forEach helper method",
TypeName.get(SequentialSubscriber.class))
.add(buildCode(CodeBlock.builder()
.add(callOperationOnClient)
Expand Down Expand Up @@ -255,4 +258,11 @@ private ClassName getPublisherType() {
private ClassName getSubscriberType() {
return ClassName.get(Subscriber.class);
}

/**
* @return A Poet {@link ClassName} for the reactive streams {@link Subscription}.
*/
private ClassName getSubscriptionType() {
return ClassName.get(Subscription.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ private SyncPaginated(IntermediateModel model, OperationModel opModel) {

@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForSyncOperation() : "";
return paginationDocs.getDocsForSyncOperation();
}

@Override
protected void applyReturns(DocumentationBuilder docBuilder) {
docBuilder.returns("A custom iterable that can be used to iterate through all the response pages.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected boolean hasTasks() {
@Override
protected List<GeneratorTask> createTasks() throws Exception {
info("Emitting paginator classes");
return Stream.concat(createSyncTasks(), createASyncTasks())
return Stream.concat(createSyncTasks(), createAsyncTasks())
.collect(Collectors.toList());
}

Expand All @@ -63,7 +63,7 @@ private GeneratorTask createSyncTask(Map.Entry<String, PaginatorDefinition> entr
return new PoetGeneratorTask(paginatorsClassDir, model.getFileHeader(), classSpec);
}

private Stream<GeneratorTask> createASyncTasks() {
private Stream<GeneratorTask> createAsyncTasks() {
return model.getPaginators().entrySet().stream()
.filter(entry -> entry.getValue().isValid())
.map(safeFunction(this::createAsyncTask));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.codegen.poet.ClassSpec;
import software.amazon.awssdk.codegen.poet.PoetUtils;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.EmptySubscription;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
import software.amazon.awssdk.core.pagination.async.SdkPublisher;
Expand All @@ -48,6 +49,9 @@
public class AsyncResponseClassSpec extends PaginatorsClassSpec {

private static final String SUBSCRIBER = "subscriber";
private static final String SUBSCRIBE_METHOD = "subscribe";
private static final String LAST_PAGE_FIELD = "isLastPage";
private static final String LAST_PAGE_METHOD = "withLastPage";

public AsyncResponseClassSpec(IntermediateModel model, String c2jOperationName, PaginatorDefinition paginatorDefinition) {
super(model, c2jOperationName, paginatorDefinition);
Expand All @@ -56,16 +60,19 @@ public AsyncResponseClassSpec(IntermediateModel model, String c2jOperationName,
@Override
public TypeSpec poetSpec() {
TypeSpec.Builder specBuilder = TypeSpec.classBuilder(className())
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
.addModifiers(Modifier.PUBLIC)
.addAnnotation(PoetUtils.GENERATED)
.addSuperinterface(getAsyncResponseInterface())
.addFields(Stream.of(asyncClientInterfaceField(),
requestClassField(),
asyncPageFetcherField())
asyncPageFetcherField(),
lastPageField())
.collect(Collectors.toList()))
.addMethod(constructor())
.addMethod(subscribeMethod())
.addMethods(getMethodSpecsForResultKeyList())
.addMethod(resumeMethod())
.addMethod(lastPageMethod())
.addJavadoc(paginationDocs.getDocsForAsyncResponseClass(
getAsyncClientInterfaceName()))
.addType(nextPageFetcherClass());
Expand Down Expand Up @@ -100,6 +107,10 @@ private FieldSpec asyncPageFetcherField() {
return FieldSpec.builder(AsyncPageFetcher.class, NEXT_PAGE_FETCHER_MEMBER, Modifier.PRIVATE, Modifier.FINAL).build();
}

private FieldSpec lastPageField() {
return FieldSpec.builder(boolean.class, LAST_PAGE_FIELD, Modifier.PRIVATE).build();
}

private MethodSpec constructor() {
return MethodSpec.constructorBuilder()
.addModifiers(Modifier.PUBLIC)
Expand All @@ -115,7 +126,7 @@ private MethodSpec constructor() {
* A {@link MethodSpec} for the subscribe() method which is inherited from the interface.
*/
private MethodSpec subscribeMethod() {
return MethodSpec.methodBuilder("subscribe")
return MethodSpec.methodBuilder(SUBSCRIBE_METHOD)
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(ParameterizedTypeName.get(ClassName.get(Subscriber.class),
Expand Down Expand Up @@ -171,9 +182,10 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
resultKeyType)))
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
.addCode("\n")
.addStatement("return new $T(new $L(), getIterator)",
.addStatement("return new $T(new $L(), getIterator, $L)",
PaginatedItemsPublisher.class,
nextPageFetcherClassName())
nextPageFetcherClassName(),
LAST_PAGE_FIELD)
.addJavadoc(CodeBlock.builder()
.add("Returns a publisher that can be used to get a stream of data. You need to "
+ "subscribe to the publisher to request the stream of data. The publisher "
Expand All @@ -184,7 +196,7 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
.build();
}

/**
/**aW
* Generates a inner class that implements {@link AsyncPageFetcher}. This is a helper class that can be used
* to find if there are more pages in the response and to get the next page if exists.
*/
Expand All @@ -209,4 +221,37 @@ private TypeSpec nextPageFetcherClass() {
.build())
.build();
}

private MethodSpec resumeMethod() {
return resumeMethodBuilder().addCode(CodeBlock.builder()
.addStatement("return $L.$L(true)", anonymousClassWithEmptySubscription(),
LAST_PAGE_METHOD)
.build())
.build();
}

private TypeSpec anonymousClassWithEmptySubscription() {
return TypeSpec.anonymousClassBuilder("$L, $L", CLIENT_MEMBER, REQUEST_MEMBER)
.addSuperinterface(className())
.addMethod(MethodSpec.methodBuilder(SUBSCRIBE_METHOD)
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(ParameterizedTypeName.get(ClassName.get(Subscriber.class),
WildcardTypeName.supertypeOf(responseType())),
SUBSCRIBER)
.addStatement("$L.onSubscribe(new $T($L))", SUBSCRIBER,
TypeName.get(EmptySubscription.class), SUBSCRIBER)
.build())
.build();
}

private MethodSpec lastPageMethod() {
return MethodSpec.methodBuilder(LAST_PAGE_METHOD)
.returns(className())
.addParameter(boolean.class, LAST_PAGE_FIELD)
.addStatement("this.$L = $L", LAST_PAGE_FIELD, LAST_PAGE_FIELD)
.addStatement("return this")
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.TypeName;
import java.security.InvalidParameterException;
import java.util.Collections;
Expand All @@ -41,8 +42,10 @@ public abstract class PaginatorsClassSpec implements ClassSpec {
protected static final String NEXT_PAGE_FETCHER_MEMBER = "nextPageFetcher";
protected static final String HAS_NEXT_PAGE_METHOD = "hasNextPage";
protected static final String NEXT_PAGE_METHOD = "nextPage";
protected static final String RESUME_METHOD = "resume";
protected static final String PREVIOUS_PAGE_METHOD_ARGUMENT = "previousPage";
protected static final String RESPONSE_LITERAL = "response";
protected static final String LAST_SUCCESSFUL_PAGE_LITERAL = "lastSuccessfulPage";

protected final IntermediateModel model;
protected final String c2jOperationName;
Expand Down Expand Up @@ -90,6 +93,26 @@ protected String nextPageFetcherClassName() {
return operationModel.getReturnType().getReturnType() + "Fetcher";
}

protected MethodSpec.Builder resumeMethodBuilder() {
return MethodSpec.methodBuilder(RESUME_METHOD)
.addModifiers(Modifier.PUBLIC)
.addParameter(responseType(), LAST_SUCCESSFUL_PAGE_LITERAL, Modifier.FINAL)
.returns(className())
.addCode(CodeBlock.builder()
.beginControlFlow("if ($L.$L($L))", NEXT_PAGE_FETCHER_MEMBER,
HAS_NEXT_PAGE_METHOD, LAST_SUCCESSFUL_PAGE_LITERAL)
.addStatement("return new $T($L, $L)", className(), CLIENT_MEMBER,
constructRequestFromLastPage(LAST_SUCCESSFUL_PAGE_LITERAL))
.endControlFlow()
.build())
.addJavadoc(CodeBlock.builder()
.add("<p>A helper method to resume the pages in case of unexpected failures. "
+ "The method takes the last successful response page as input and returns an "
+ "instance of {@link $T} that can be used to retrieve the consecutive pages "
+ "that follows the input page.</p>", className())
.build());
}

/*
* Returns the {@link TypeName} for a value in the {@link PaginatorDefinition#getResultKey()} list.
*
Expand Down Expand Up @@ -188,19 +211,32 @@ protected CodeBlock nextPageMethodBody() {
*/
private String codeToGetNextPageIfOldResponseIsNotNull() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("return %s.%s(%s)", CLIENT_MEMBER,
operationModel.getMethodName(),
constructRequestFromLastPage(PREVIOUS_PAGE_METHOD_ARGUMENT)));
return sb.toString();
}

sb.append(String.format("return %s.%s(%s.toBuilder()", CLIENT_MEMBER, operationModel.getMethodName(), REQUEST_MEMBER));
/**
* Generates the code to construct a request object from the last successful page
* by setting the fields required to get the next page.
*
* Sample code: if responsePage string is "response"
* firstRequest.toBuilder().exclusiveStartTableName(response.lastEvaluatedTableName()).build()
*/
protected String constructRequestFromLastPage(String responsePage) {
StringBuilder sb = new StringBuilder();
sb.append(String.format("%s.toBuilder()", REQUEST_MEMBER));

List<String> requestSetterNames = fluentSetterMethodNamesForInputToken();
List<String> responseGetterMethods = fluentGetterMethodsForOutputToken();

for (int i = 0; i < paginatorDefinition.getInputToken().size(); i++) {
sb.append(String.format(".%s(%s.%s)", requestSetterNames.get(i), PREVIOUS_PAGE_METHOD_ARGUMENT,
sb.append(String.format(".%s(%s.%s)", requestSetterNames.get(i), responsePage,
responseGetterMethods.get(i)));
}

sb.append(".build())");

sb.append(".build()");
return sb.toString();
}

Expand Down
Loading

0 comments on commit de3b0e6

Please sign in to comment.