-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic Async Depagination #185
Comments
Here is my proposed design. We can use the reactive streams Publisher interface like we do for streaming operations. We would add a forEach method to the interface for simple cases where a customer would just want to process all the responses/items and firstPage for customers who just want a single response. Since it's using reactive streams it interops fully with libraries like RxJava which allow you to do a lot fancier stuff with the publisher. /**
* Publisher with a convenience forEach method to consume all items.
*
* @param <T> Type of item being published.
*/
public interface SdkPublisher<T> extends Publisher<T> {
default CompletableFuture<Void> forEach(Consumer<T> consumer) {
// Implementation omitted for simplicity
}
}
/**
* Publisher for a paginated operation.
*
* @param <ResponseT> POJO response type.
* @param <ItemT> Inner item of interest.
*/
public interface PaginatedPublisher<ResponseT, ItemT> extends SdkPublisher<ResponseT> {
/**
* @return CompletableFuture to the first page of results.
*/
default CompletableFuture<ResponseT> firstPage() {
// Implementation omitted for simplicity
}
/**
* @return Publisher of inner item of interest.
*/
SdkPublisher<ItemT> allItems();
}
public interface DynamoDBAsyncClient {
PaginatedPublisher<ListTablesResponse, String> listTables();
}
public class Examples {
public static void main(String[] args) throws Exception {
DynamoDBAsyncClient client = DynamoDBAsyncClient.create();
// If you only want the first page, returns CompletableFuture of response type
ListTablesResponse firstResponse = client.listTables().firstPage().get()
// If you want to consume pages in a simple way. Returns future you can use for completion or error.
CompletableFuture<Void> pageFuture = client.listTables().forEach(r -> System.out.println(r.lastEvaluatedTableName()));
// If you want to consume items in a simple way. Returns future you can use for completion or error.
CompletableFuture<Void> itemFuture = client.listTables().allItems().forEach(System.out::println);
// Advanced users can manually subscribe to the publisher and do whatever they want
client.listTables().subscribe(new Subscriber<ListTablesResponse>() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(ListTablesResponse listTablesResponse) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
// Since we return a Publisher it interops with things like RxJava very well
Flowable.fromPublisher(client.listTables())
.flatMapIterable(ListTablesResponse::tableNames)
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
}
} |
Should that be using The proposal looks fine to me. I like the general consistency with the sync version. |
Yes good catch. I'll update. |
The feature is released in "2.0.0-preview-9" version. Here is a blog post about this feature with code samples. Please try out the feature and provide us your feedback. Thank you. |
Same as #26 but for the Async programming model.
The text was updated successfully, but these errors were encountered: