From a9860b365e1e458f8b7b7b4f89a5dba898f1aa8c Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 6 Sep 2018 17:53:35 -0400 Subject: [PATCH] Bigtable: improve list tables spooler (#3639) * Bigtable: improve list tables spooler Avoid blocking the event loop. Previously the first page would be fetched asynchronously, but all of the other pages would be fetched synchronously which would block the grpc event loop. The new implementation uses future chaining. * update async test as well * reformat --- .../admin/v2/BigtableTableAdminClient.java | 63 ++++++++++++++++--- .../v2/BigtableTableAdminClientTest.java | 59 +++++++++++------ 2 files changed, 93 insertions(+), 29 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index 06f593c0fee1..b5e3977998fd 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.admin.v2; +import com.google.api.core.ApiAsyncFunction; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -27,6 +28,7 @@ import com.google.bigtable.admin.v2.InstanceName; import com.google.bigtable.admin.v2.ListTablesRequest; import com.google.bigtable.admin.v2.TableName; +import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -433,22 +435,65 @@ public ApiFuture> listTablesAsync() { ListTablesRequest request = ListTablesRequest.newBuilder().setParent(instanceName.toString()) .build(); - ApiFuture listResp = - this.stub.listTablesPagedCallable().futureCall(request); + // TODO(igorbernstein2): try to upstream pagination spooling or figure out a way to expose the + // paginated responses while maintaining the wrapper facade. - return ApiFutures.transform( - listResp, - new ApiFunction>() { + // Fetch the first page. + ApiFuture firstPageFuture = ApiFutures.transform( + stub.listTablesPagedCallable().futureCall(request), + new ApiFunction() { @Override - public List apply(ListTablesPagedResponse response) { - List results = Lists.newArrayList(); - for (com.google.bigtable.admin.v2.Table proto : response.iterateAll()) { + public ListTablesPage apply(ListTablesPagedResponse response) { + return response.getPage(); + } + }, + MoreExecutors.directExecutor() + ); + + // Fetch the rest of the pages by chaining the futures. + ApiFuture> allProtos = ApiFutures + .transformAsync( + firstPageFuture, + new ApiAsyncFunction>() { + List responseAccumulator = Lists + .newArrayList(); + + @Override + public ApiFuture> apply( + ListTablesPage page) { + // Add all entries from the page + responseAccumulator.addAll(Lists.newArrayList(page.getValues())); + + // If this is the last page, just return the accumulated responses. + if (!page.hasNextPage()) { + return ApiFutures.immediateFuture(responseAccumulator); + } + + // Otherwise fetch the next page. + return ApiFutures.transformAsync( + page.getNextPageAsync(), + this, + MoreExecutors.directExecutor() + ); + } + }, + MoreExecutors.directExecutor() + ); + + // Wrap all of the accumulated protos. + return ApiFutures.transform(allProtos, + new ApiFunction, List>() { + @Override + public List apply(List protos) { + List results = Lists.newArrayListWithCapacity(protos.size()); + for (com.google.bigtable.admin.v2.Table proto : protos) { results.add(TableName.parse(proto.getName())); } return results; } }, - MoreExecutors.directExecutor()); + MoreExecutors.directExecutor() + ); } /** diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java index 85eb60ec898c..1494638091db 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java @@ -33,6 +33,7 @@ import com.google.bigtable.admin.v2.ListTablesRequest; import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification; import com.google.bigtable.admin.v2.TableName; +import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -216,34 +217,52 @@ public void testGetTable() { @Test public void testListTables() { // Setup - ListTablesRequest expectedRequest = ListTablesRequest.newBuilder() - .setParent(INSTANCE_NAME.toString()) - .build(); - - ListTablesPagedResponse expectedResponseWrapper = Mockito.mock(ListTablesPagedResponse.class); + com.google.bigtable.admin.v2.ListTablesRequest expectedRequest = + com.google.bigtable.admin.v2.ListTablesRequest.newBuilder() + .setParent(INSTANCE_NAME.toString()) + .build(); - Iterable expectedResults = Lists.newArrayList( - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "1") - .build(), - com.google.bigtable.admin.v2.Table.newBuilder() - .setName(TABLE_NAME.toString() + "2") - .build()); + // 3 Tables spread across 2 pages + List expectedProtos = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + expectedProtos.add( + com.google.bigtable.admin.v2.Table.newBuilder() + .setName(TABLE_NAME.toString() + i) + .build()); + } + // 2 on the first page + ListTablesPage page0 = Mockito.mock(ListTablesPage.class); + Mockito.when(page0.getValues()).thenReturn(expectedProtos.subList(0, 2)); + Mockito.when(page0.getNextPageToken()).thenReturn("next-page"); + Mockito.when(page0.hasNextPage()).thenReturn(true); + + // 1 on the last page + ListTablesPage page1 = Mockito.mock(ListTablesPage.class); + Mockito.when(page1.getValues()).thenReturn(expectedProtos.subList(2, 3)); + + // Link page0 to page1 + Mockito.when(page0.getNextPageAsync()).thenReturn( + ApiFutures.immediateFuture(page1) + ); - Mockito.when(mockListTableCallable.futureCall(expectedRequest)) - .thenReturn(ApiFutures.immediateFuture(expectedResponseWrapper)); + // Link page to the response + ListTablesPagedResponse response0 = Mockito.mock(ListTablesPagedResponse.class); + Mockito.when(response0.getPage()).thenReturn(page0); - Mockito.when(expectedResponseWrapper.iterateAll()) - .thenReturn(expectedResults); + Mockito.when(mockListTableCallable.futureCall(expectedRequest)).thenReturn( + ApiFutures.immediateFuture(response0) + ); // Execute List actualResults = adminClient.listTables(); // Verify - assertThat(actualResults).containsExactly( - TableName.parse(TABLE_NAME.toString() + "1"), - TableName.parse(TABLE_NAME.toString() + "2") - ); + List expectedResults = Lists.newArrayList(); + for (com.google.bigtable.admin.v2.Table expectedProto : expectedProtos) { + expectedResults.add(TableName.parse(expectedProto.getName())); + } + + assertThat(actualResults).containsExactlyElementsIn(expectedResults); } @Test