Skip to content

Commit

Permalink
Bigtable: improve list tables spooler (#3639)
Browse files Browse the repository at this point in the history
* Bigtable: improve list tables spooler

Avoid blocking the event loop. Previously the first page would be fetched
asynchronously, but all of the other pages would be fetched synchronously
which would block the grpc event loop. The new implementation uses future
chaining.

* update async test as well

* reformat
  • Loading branch information
igorbernstein2 authored and pongad committed Sep 6, 2018
1 parent 01bec2b commit a9860b3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.admin.v2;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand All @@ -27,6 +28,7 @@
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.ListTablesRequest;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
Expand Down Expand Up @@ -433,22 +435,65 @@ public ApiFuture<List<TableName>> listTablesAsync() {
ListTablesRequest request = ListTablesRequest.newBuilder().setParent(instanceName.toString())
.build();

ApiFuture<ListTablesPagedResponse> listResp =
this.stub.listTablesPagedCallable().futureCall(request);
// TODO(igorbernstein2): try to upstream pagination spooling or figure out a way to expose the
// paginated responses while maintaining the wrapper facade.

return ApiFutures.transform(
listResp,
new ApiFunction<ListTablesPagedResponse, List<TableName>>() {
// Fetch the first page.
ApiFuture<ListTablesPage> firstPageFuture = ApiFutures.transform(
stub.listTablesPagedCallable().futureCall(request),
new ApiFunction<ListTablesPagedResponse, ListTablesPage>() {
@Override
public List<TableName> apply(ListTablesPagedResponse response) {
List<TableName> results = Lists.newArrayList();
for (com.google.bigtable.admin.v2.Table proto : response.iterateAll()) {
public ListTablesPage apply(ListTablesPagedResponse response) {
return response.getPage();
}
},
MoreExecutors.directExecutor()
);

// Fetch the rest of the pages by chaining the futures.
ApiFuture<List<com.google.bigtable.admin.v2.Table>> allProtos = ApiFutures
.transformAsync(
firstPageFuture,
new ApiAsyncFunction<ListTablesPage, List<com.google.bigtable.admin.v2.Table>>() {
List<com.google.bigtable.admin.v2.Table> responseAccumulator = Lists
.newArrayList();

@Override
public ApiFuture<List<com.google.bigtable.admin.v2.Table>> apply(
ListTablesPage page) {
// Add all entries from the page
responseAccumulator.addAll(Lists.newArrayList(page.getValues()));

// If this is the last page, just return the accumulated responses.
if (!page.hasNextPage()) {
return ApiFutures.immediateFuture(responseAccumulator);
}

// Otherwise fetch the next page.
return ApiFutures.transformAsync(
page.getNextPageAsync(),
this,
MoreExecutors.directExecutor()
);
}
},
MoreExecutors.directExecutor()
);

// Wrap all of the accumulated protos.
return ApiFutures.transform(allProtos,
new ApiFunction<List<com.google.bigtable.admin.v2.Table>, List<TableName>>() {
@Override
public List<TableName> apply(List<com.google.bigtable.admin.v2.Table> protos) {
List<TableName> results = Lists.newArrayListWithCapacity(protos.size());
for (com.google.bigtable.admin.v2.Table proto : protos) {
results.add(TableName.parse(proto.getName()));
}
return results;
}
},
MoreExecutors.directExecutor());
MoreExecutors.directExecutor()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.bigtable.admin.v2.ListTablesRequest;
import com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest.Modification;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage;
import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
Expand Down Expand Up @@ -216,34 +217,52 @@ public void testGetTable() {
@Test
public void testListTables() {
// Setup
ListTablesRequest expectedRequest = ListTablesRequest.newBuilder()
.setParent(INSTANCE_NAME.toString())
.build();

ListTablesPagedResponse expectedResponseWrapper = Mockito.mock(ListTablesPagedResponse.class);
com.google.bigtable.admin.v2.ListTablesRequest expectedRequest =
com.google.bigtable.admin.v2.ListTablesRequest.newBuilder()
.setParent(INSTANCE_NAME.toString())
.build();

Iterable<com.google.bigtable.admin.v2.Table> expectedResults = Lists.newArrayList(
com.google.bigtable.admin.v2.Table.newBuilder()
.setName(TABLE_NAME.toString() + "1")
.build(),
com.google.bigtable.admin.v2.Table.newBuilder()
.setName(TABLE_NAME.toString() + "2")
.build());
// 3 Tables spread across 2 pages
List<com.google.bigtable.admin.v2.Table> expectedProtos = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
expectedProtos.add(
com.google.bigtable.admin.v2.Table.newBuilder()
.setName(TABLE_NAME.toString() + i)
.build());
}
// 2 on the first page
ListTablesPage page0 = Mockito.mock(ListTablesPage.class);
Mockito.when(page0.getValues()).thenReturn(expectedProtos.subList(0, 2));
Mockito.when(page0.getNextPageToken()).thenReturn("next-page");
Mockito.when(page0.hasNextPage()).thenReturn(true);

// 1 on the last page
ListTablesPage page1 = Mockito.mock(ListTablesPage.class);
Mockito.when(page1.getValues()).thenReturn(expectedProtos.subList(2, 3));

// Link page0 to page1
Mockito.when(page0.getNextPageAsync()).thenReturn(
ApiFutures.immediateFuture(page1)
);

Mockito.when(mockListTableCallable.futureCall(expectedRequest))
.thenReturn(ApiFutures.immediateFuture(expectedResponseWrapper));
// Link page to the response
ListTablesPagedResponse response0 = Mockito.mock(ListTablesPagedResponse.class);
Mockito.when(response0.getPage()).thenReturn(page0);

Mockito.when(expectedResponseWrapper.iterateAll())
.thenReturn(expectedResults);
Mockito.when(mockListTableCallable.futureCall(expectedRequest)).thenReturn(
ApiFutures.immediateFuture(response0)
);

// Execute
List<TableName> actualResults = adminClient.listTables();

// Verify
assertThat(actualResults).containsExactly(
TableName.parse(TABLE_NAME.toString() + "1"),
TableName.parse(TABLE_NAME.toString() + "2")
);
List<TableName> expectedResults = Lists.newArrayList();
for (com.google.bigtable.admin.v2.Table expectedProto : expectedProtos) {
expectedResults.add(TableName.parse(expectedProto.getName()));
}

assertThat(actualResults).containsExactlyElementsIn(expectedResults);
}

@Test
Expand Down

0 comments on commit a9860b3

Please sign in to comment.