Skip to content

Commit

Permalink
First revision - Added resume method
Browse files Browse the repository at this point in the history
  • Loading branch information
AWS authored and varunnvs92 committed Mar 2, 2018
1 parent 4b7f0cd commit 8558232
Show file tree
Hide file tree
Showing 31 changed files with 959 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ private AsyncPaginated(IntermediateModel model, OperationModel opModel) {

@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForAsyncOperation() : "";
return paginationDocs.getDocsForAsyncOperation();
}

@Override
protected void applyReturns(DocumentationBuilder docBuilder) {
docBuilder.returns("A custom publisher that can be subscribed to request a stream of response pages.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
public final class DocumentationBuilder {

// TODO This prefix is not suitable for paginated operations. Either remove it for paginated operations
// or change the statement to something generic
private static final String ASYNC_THROWS_PREFIX = "The CompletableFuture returned by this method can be completed " +
"exceptionally with the following exceptions.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.squareup.javapoet.TypeName;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
import software.amazon.awssdk.codegen.poet.PoetExtensions;
Expand Down Expand Up @@ -90,11 +91,12 @@ public String getDocsForAsyncOperation() {
operationModel.getMethodName(), requestType())
.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
+ "see {@link $T#$L($T)}. If there are errors in your "
+ "request, you will see the failures only after you start streaming the data.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType())
.add(getAsyncCodeSnippets())
+ "If there are errors in your request, you will see the failures only after you start streaming "
+ "the data. The subscribe method should be called as a request to start streaming data. "
+ "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new "
+ "{@link $T} i.e., a new contract to stream data from the starting request.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType())
.add(getAsyncCodeSnippets())
.build()
.toString();
}
Expand All @@ -112,10 +114,11 @@ clientInterface, getPaginatedMethodName(), requestType(), getPublisherType(),
syncResponsePageType())
.add("<p>When the operation is called, an instance of this class is returned. At this point, "
+ "no service calls are made yet and so there is no guarantee that the request is valid. "
+ "The subscribe method should be called as a request to stream data. For more info, "
+ "see {@link $T#$L($T)}. If there are errors in your "
+ "request, you will see the failures only after you start streaming the data.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType())
+ "If there are errors in your request, you will see the failures only after you start streaming "
+ "the data. The subscribe method should be called as a request to start streaming data. "
+ "For more info, see {@link $T#$L($T)}. Each call to the subscribe method will result in a new "
+ "{@link $T} i.e., a new contract to stream data from the starting request.</p>",
getPublisherType(), SUBSCRIBE_METHOD_NAME, getSubscriberType(), getSubscriptionType())
.add(getAsyncCodeSnippets())
.build()
.toString();
Expand Down Expand Up @@ -160,7 +163,7 @@ private String getAsyncCodeSnippets() {

return CodeBlock.builder()
.add("\n\n<p>The following are few ways to use the response class:</p>")
.add("1) Using the forEach helper method. This uses @{@link $T} internally",
.add("1) Using the forEach helper method",
TypeName.get(SequentialSubscriber.class))
.add(buildCode(CodeBlock.builder()
.add(callOperationOnClient)
Expand Down Expand Up @@ -255,4 +258,11 @@ private ClassName getPublisherType() {
private ClassName getSubscriberType() {
return ClassName.get(Subscriber.class);
}

/**
* @return A Poet {@link ClassName} for the reactive streams {@link Subscription}.
*/
private ClassName getSubscriptionType() {
return ClassName.get(Subscription.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ private SyncPaginated(IntermediateModel model, OperationModel opModel) {

@Override
protected String appendToDescription() {
return opModel.isPaginated() ? paginationDocs.getDocsForSyncOperation() : "";
return paginationDocs.getDocsForSyncOperation();
}

@Override
protected void applyReturns(DocumentationBuilder docBuilder) {
docBuilder.returns("A custom iterable that can be used to iterate through all the response pages.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static software.amazon.awssdk.utils.FunctionalUtils.safeFunction;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -46,15 +47,15 @@ protected boolean hasTasks() {

@Override
protected List<GeneratorTask> createTasks() throws Exception {
info("Emitting paginator classes");
return Stream.concat(createSyncTasks(), createASyncTasks())
.collect(Collectors.toList());
}

private Stream<GeneratorTask> createSyncTasks() {
return model.getPaginators().entrySet().stream()
.filter(entry -> entry.getValue().isValid())
.map(safeFunction(this::createSyncTask));
.flatMap(safeFunction(this::createSyncAndAsyncTasks))
.collect(Collectors.toList());
}

private Stream<GeneratorTask> createSyncAndAsyncTasks(Map.Entry<String, PaginatorDefinition> entry) throws IOException {
return Arrays.asList(createSyncTask(entry), createAsyncTask(entry))
.stream();
}

private GeneratorTask createSyncTask(Map.Entry<String, PaginatorDefinition> entry) throws IOException {
Expand All @@ -63,12 +64,6 @@ private GeneratorTask createSyncTask(Map.Entry<String, PaginatorDefinition> entr
return new PoetGeneratorTask(paginatorsClassDir, model.getFileHeader(), classSpec);
}

private Stream<GeneratorTask> createASyncTasks() {
return model.getPaginators().entrySet().stream()
.filter(entry -> entry.getValue().isValid())
.map(safeFunction(this::createAsyncTask));
}

private GeneratorTask createAsyncTask(Map.Entry<String, PaginatorDefinition> entry) throws IOException {
ClassSpec classSpec = new AsyncResponseClassSpec(model, entry.getKey(), entry.getValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
}

@Override
protected MethodSpec.Builder paginatedTraditionalMethodBody(MethodSpec.Builder builder, OperationModel opModel) {
protected MethodSpec.Builder paginatedMethodBody(MethodSpec.Builder builder, OperationModel opModel) {
return builder.addModifiers(Modifier.PUBLIC)
.addStatement("return new $T(this, $L)",
poetExtensions.getResponseClassForPaginatedAsyncOperation(opModel.getOperationName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ private MethodSpec paginatedTraditionalMethod(OperationModel opModel) {
ClientType.ASYNC,
SimpleMethodOverload.PAGINATED));

return paginatedTraditionalMethodBody(builder, opModel).build();
return paginatedMethodBody(builder, opModel).build();
}

protected MethodSpec.Builder paginatedTraditionalMethodBody(MethodSpec.Builder builder, OperationModel operationModel) {
protected MethodSpec.Builder paginatedMethodBody(MethodSpec.Builder builder, OperationModel operationModel) {
return builder.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
.addStatement("throw new $T()", UnsupportedOperationException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.codegen.poet.ClassSpec;
import software.amazon.awssdk.codegen.poet.PoetUtils;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.EmptySubscription;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
import software.amazon.awssdk.core.pagination.async.SdkPublisher;
Expand All @@ -48,6 +49,9 @@
public class AsyncResponseClassSpec extends PaginatorsClassSpec {

private static final String SUBSCRIBER = "subscriber";
private static final String SUBSCRIBE_METHOD = "subscribe";
private static final String LAST_PAGE_FIELD = "isLastPage";
private static final String LAST_PAGE_METHOD = "withLastPage";

public AsyncResponseClassSpec(IntermediateModel model, String c2jOperationName, PaginatorDefinition paginatorDefinition) {
super(model, c2jOperationName, paginatorDefinition);
Expand All @@ -56,16 +60,19 @@ public AsyncResponseClassSpec(IntermediateModel model, String c2jOperationName,
@Override
public TypeSpec poetSpec() {
TypeSpec.Builder specBuilder = TypeSpec.classBuilder(className())
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
.addModifiers(Modifier.PUBLIC)
.addAnnotation(PoetUtils.GENERATED)
.addSuperinterface(getAsyncResponseInterface())
.addFields(Stream.of(asyncClientInterfaceField(),
requestClassField(),
asyncPageFetcherField())
asyncPageFetcherField(),
lastPageField())
.collect(Collectors.toList()))
.addMethod(constructor())
.addMethod(publicConstructor())
.addMethod(privateConstructor())
.addMethod(subscribeMethod())
.addMethods(getMethodSpecsForResultKeyList())
.addMethod(resumeMethod())
.addJavadoc(paginationDocs.getDocsForAsyncResponseClass(
getAsyncClientInterfaceName()))
.addType(nextPageFetcherClass());
Expand Down Expand Up @@ -100,13 +107,28 @@ private FieldSpec asyncPageFetcherField() {
return FieldSpec.builder(AsyncPageFetcher.class, NEXT_PAGE_FETCHER_MEMBER, Modifier.PRIVATE, Modifier.FINAL).build();
}

private MethodSpec constructor() {
private FieldSpec lastPageField() {
return FieldSpec.builder(boolean.class, LAST_PAGE_FIELD, Modifier.PRIVATE).build();
}

private MethodSpec publicConstructor() {
return MethodSpec.constructorBuilder()
.addModifiers(Modifier.PUBLIC)
.addParameter(getAsyncClientInterfaceName(), CLIENT_MEMBER, Modifier.FINAL)
.addParameter(requestType(), REQUEST_MEMBER, Modifier.FINAL)
.addStatement("this($L, $L, false)", CLIENT_MEMBER, REQUEST_MEMBER)
.build();
}

private MethodSpec privateConstructor() {
return MethodSpec.constructorBuilder()
.addModifiers(Modifier.PRIVATE)
.addParameter(getAsyncClientInterfaceName(), CLIENT_MEMBER, Modifier.FINAL)
.addParameter(requestType(), REQUEST_MEMBER, Modifier.FINAL)
.addParameter(boolean.class, LAST_PAGE_FIELD, Modifier.FINAL)
.addStatement("this.$L = $L", CLIENT_MEMBER, CLIENT_MEMBER)
.addStatement("this.$L = $L", REQUEST_MEMBER, REQUEST_MEMBER)
.addStatement("this.$L = $L", LAST_PAGE_FIELD, LAST_PAGE_FIELD)
.addStatement("this.$L = new $L()", NEXT_PAGE_FETCHER_MEMBER, nextPageFetcherClassName())
.build();
}
Expand All @@ -115,7 +137,7 @@ private MethodSpec constructor() {
* A {@link MethodSpec} for the subscribe() method which is inherited from the interface.
*/
private MethodSpec subscribeMethod() {
return MethodSpec.methodBuilder("subscribe")
return MethodSpec.methodBuilder(SUBSCRIBE_METHOD)
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(ParameterizedTypeName.get(ClassName.get(Subscriber.class),
Expand Down Expand Up @@ -171,9 +193,10 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
resultKeyType)))
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
.addCode("\n")
.addStatement("return new $T(new $L(), getIterator)",
.addStatement("return new $T(new $L(), getIterator, $L)",
PaginatedItemsPublisher.class,
nextPageFetcherClassName())
nextPageFetcherClassName(),
LAST_PAGE_FIELD)
.addJavadoc(CodeBlock.builder()
.add("Returns a publisher that can be used to get a stream of data. You need to "
+ "subscribe to the publisher to request the stream of data. The publisher "
Expand All @@ -184,7 +207,7 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
.build();
}

/**
/**aW
* Generates a inner class that implements {@link AsyncPageFetcher}. This is a helper class that can be used
* to find if there are more pages in the response and to get the next page if exists.
*/
Expand All @@ -209,4 +232,26 @@ private TypeSpec nextPageFetcherClass() {
.build())
.build();
}

private MethodSpec resumeMethod() {
return resumeMethodBuilder().addCode(CodeBlock.builder()
.addStatement("return $L", anonymousClassWithEmptySubscription())
.build())
.build();
}

private TypeSpec anonymousClassWithEmptySubscription() {
return TypeSpec.anonymousClassBuilder("$L, $L, true", CLIENT_MEMBER, REQUEST_MEMBER)
.addSuperinterface(className())
.addMethod(MethodSpec.methodBuilder(SUBSCRIBE_METHOD)
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(ParameterizedTypeName.get(ClassName.get(Subscriber.class),
WildcardTypeName.supertypeOf(responseType())),
SUBSCRIBER)
.addStatement("$L.onSubscribe(new $T($L))", SUBSCRIBER,
TypeName.get(EmptySubscription.class), SUBSCRIBER)
.build())
.build();
}
}
Loading

0 comments on commit 8558232

Please sign in to comment.