-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Add paging to hbase client #4166
Conversation
...hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java
Outdated
Show resolved
Hide resolved
@Override | ||
public ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize) { | ||
return new PaginatedRowResultScanner( | ||
paginator, delegate, maxSegmentByteSize, () -> this.createScanCallContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you defer the creation of the ScanCallContext? The effect of re-creating it is that you are extending the operation timeout across all of the pages. I think the operation deadline should stay consistent from the callers perspective regardless if its broken up into multiple segments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -155,7 +170,8 @@ public void readRowsAsync(Query request, StreamObserver<Result> observer) { | |||
.call(request, new StreamObserverAdapter<>(observer), createScanCallContext()); | |||
} | |||
|
|||
// Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed | |||
// Point reads are implemented using a streaming ReadRows RPC. So timeouts need | |||
// to be managed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please revert irrelevant changes to minimize the noise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return null; | ||
} | ||
} | ||
scannerResultMeter.mark(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be in the if statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
...hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java
Show resolved
Hide resolved
@@ -74,6 +80,8 @@ public class TestDataClientVeneerApi { | |||
private static final String TABLE_ID = "fake-table"; | |||
private static final ByteString ROW_KEY = ByteString.copyFromUtf8("row-key"); | |||
|
|||
private static AtomicBoolean cancelled = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy pasted from beam, removed
} else { | ||
Query.QueryPaginator paginator = | ||
hbaseAdapter.adapt(scan).createPaginator(scan.getCaching()); | ||
scanner = clientWrapper.readRows(paginator, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can pass in the default to begin with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's private, and I wouldn't want to expose it. This value is just for tests, so I don't think it's critical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move the default value in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
...bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/DataClientWrapper.java
Show resolved
Hide resolved
...e-hbase-integration-tests-common/src/test/java/com/google/cloud/bigtable/hbase/TestScan.java
Show resolved
Hide resolved
...e-hbase-integration-tests-common/src/test/java/com/google/cloud/bigtable/hbase/TestScan.java
Show resolved
Hide resolved
ResultScanner noRowsResultScanner = dataClientWrapper.readRows(query.createPaginator(100), -1); | ||
assertNull(noRowsResultScanner.next()); | ||
|
||
verify(mockDataClient, times(2)).readRowsCallable(Mockito.<RowResultAdapter>any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused at this test. Why would it be called twice when the page size is 100 and you're only returning 2 results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am calling readRows twice. Once with results, and once without
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see it now. However I don't think this actually tested anything. I think we can set a smaller page size (for example, 2), return more rows (say 3), and verify that readRowsCallable are called 3 times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
new Answer<Void>() { | ||
@Override | ||
public Void answer(InvocationOnMock invocation) throws Throwable { | ||
((ResponseObserver) invocation.getArgument(1)).onResponse(Result.EMPTY_RESULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we return an empty result first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's similar to the test for the non paginated scanner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesnt really make sense. You want to be testing what happens between pages. Something like this:
private static Result createRow(String key) {
return Result.create(
ImmutableList.<Cell>of(
new com.google.cloud.bigtable.hbase.adapters.read.RowCell(
Bytes.toBytes(key),
Bytes.toBytes("cf"),
Bytes.toBytes("q"),
10L,
Bytes.toBytes("value"),
ImmutableList.of("label"))));
}
@Test
public void testReadPaginatedRows() throws IOException {
Query query = Query.create(TABLE_ID).range("a", "z");
when(mockDataClient.readRowsCallable(Mockito.<RowResultAdapter>any()))
.thenReturn(mockStreamingCallable);
// First Page
doAnswer((args) -> {
ResponseObserver<Result> observer = args.getArgument(1);
observer.onResponse(createRow("a"));
observer.onResponse(createRow("b"));
observer.onComplete();
return null;
})
.when(mockStreamingCallable)
.call(eq(Query.create(TABLE_ID).range("a", "z").limit(2)), any(), any());
// 2nd Page
doAnswer((args) -> {
ResponseObserver<Result> observer = args.getArgument(1);
observer.onResponse(createRow("c"));
observer.onResponse(createRow("d"));
observer.onComplete();
return null;
})
.when(mockStreamingCallable)
.call(
eq(
Query.create(TABLE_ID)
.range(ByteStringRange.unbounded().startOpen("b").endOpen("z")).limit(2)),
any(), any());
// 3rd Page
doAnswer((args) -> {
ResponseObserver<Result> observer = args.getArgument(1);
observer.onResponse(createRow("e"));
observer.onComplete();
return null;
})
.when(mockStreamingCallable)
.call(
eq(
Query.create(TABLE_ID)
.range(ByteStringRange.unbounded().startOpen("d").endOpen("z")).limit(2)), any(), any());
// 3rd Page
doAnswer((args) -> {
ResponseObserver<Result> observer = args.getArgument(1);
observer.onComplete();
return null;
})
.when(mockStreamingCallable)
.call(
eq(
Query.create(TABLE_ID)
.range(ByteStringRange.unbounded().startOpen("e").endOpen("z")).limit(2)), any(), any());
ResultScanner resultScanner = dataClientWrapper.readRows(query.createPaginator(2), 1000);
assertThat(resultScanner)
.comparingElementsUsing(Correspondence.transforming((Result r) -> new String(r.getRow()), "row key"))
.containsExactly("a", "b", "c", "d", "e");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Mockito.any(ResponseObserver.class), | ||
Mockito.any(GrpcCallContext.class)); | ||
|
||
ResultScanner resultScanner = dataClientWrapper.readRows(query.createPaginator(100), 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question here, why do we return an empty result here? I don't think that's gonna use any memory? Maybe you want to return another expected result and assert readRowsCallable is called twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, just copying the old test. No real reason, but I don't think it's not valid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, I don't think this actually tested the memory buffer function. We should at least return another result, and verify that readRowsCallable are called twice (first time the request got cancelled becuase we filled up the buffer, and the second time we finish the read).
|
||
doAnswer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between line 353 - 372 and line 331 - 351? Seem to be duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} | ||
|
||
@Test | ||
public void testRead100Rows() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this a new test that's added to test the paginator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Adjusted to new paginator impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test isnt really testing anything, I would just drop it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
...e-hbase-integration-tests-common/src/test/java/com/google/cloud/bigtable/hbase/TestScan.java
Show resolved
Hide resolved
testManyResultsInScanner(105); | ||
} | ||
|
||
private void testManyResultsInScanner(int rowsToWrite) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I should be more clear on my previous comment. I think we want to test both getScanner without pagination and getScanner with pagiation. So maybe we can add a parameter:
testManyResultsInScanner(int rowsToWrite, boolean with pagination);
And add one more test: testManyResultsInScanner(100, false);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} else { | ||
Query.QueryPaginator paginator = | ||
hbaseAdapter.adapt(scan).createPaginator(scan.getCaching()); | ||
scanner = clientWrapper.readRows(paginator, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move the default value in this class?
ResultScanner noRowsResultScanner = dataClientWrapper.readRows(query.createPaginator(100), -1); | ||
assertNull(noRowsResultScanner.next()); | ||
|
||
verify(mockDataClient, times(2)).readRowsCallable(Mockito.<RowResultAdapter>any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see it now. However I don't think this actually tested anything. I think we can set a smaller page size (for example, 2), return more rows (say 3), and verify that readRowsCallable are called 3 times.
Mockito.any(ResponseObserver.class), | ||
Mockito.any(GrpcCallContext.class)); | ||
|
||
ResultScanner resultScanner = dataClientWrapper.readRows(query.createPaginator(100), 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, I don't think this actually tested the memory buffer function. We should at least return another result, and verify that readRowsCallable are called twice (first time the request got cancelled becuase we filled up the buffer, and the second time we finish the read).
public void testReadRows_Errors() throws IOException { | ||
Query query = Query.create(TABLE_ID).rowKey(ROW_KEY); | ||
when(mockDataClient.readRowsCallable(Mockito.<RowResultAdapter>any())) | ||
.thenThrow(new RuntimeException()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesnt make sense. readRowsCallable() will never throw an exception. You want to test what happens when the server returns an error. I think something like this:
public void testReadRows_Errors() throws IOException {
Query query = Query.create(TABLE_ID).rowKey(ROW_KEY);
when(mockDataClient.readRowsCallable(any(RowResultAdapter.class)))
.thenReturn(mockStreamingCallable);
when(mockStreamingCallable.call(any(Query.class), any(GrpcCallContext.class)))
.thenReturn(serverStream);
when(serverStream.iterator())
.thenReturn(new Iterator<Result>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Result next() {
throw new InternalException("fake error", null, GrpcStatusCode.of(Code.INTERNAL), false);
}
})
.thenReturn(ImmutableList.<Result>of().iterator());
assertThrows(Exception.class, () -> dataClientWrapper.readRows(query).next());
ResultScanner noRowsResultScanner = dataClientWrapper.readRows(query);
assertNull(noRowsResultScanner.next());
noRowsResultScanner.close();
verify(mockDataClient, times(2)).readRowsCallable(Mockito.<RowResultAdapter>any());
verify(serverStream, times(2)).iterator();
verify(mockStreamingCallable, times(2))
.call(any(Query.class), any(GrpcCallContext.class));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Test | ||
public void testReadRowsLowMemory() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, you want to be testing the rpc between pages. so something like this:
public void testReadRowsLowMemory() throws IOException {
Query query = Query.create(TABLE_ID);
when(mockDataClient.readRowsCallable(any(RowResultAdapter.class)))
.thenReturn(mockStreamingCallable);
StreamController mockController = Mockito.mock(StreamController.class);
doAnswer(invocation -> {
cancelled.set(true);
return null;
})
.when(mockController)
.cancel();
// Generate
doAnswer(
(Answer<Void>) invocation -> {
ResponseObserver<Result> observer = invocation.getArgument(1);
observer.onStart(mockController);
for(int i=0; i < 1000 && !cancelled.get(); i++) {
observer.onResponse(createRow(String.format("row%010d", i)));
Thread.sleep(10);
}
observer.onComplete();
return null;
})
.doAnswer(
(Answer<Void>) invocation -> {
ResponseObserver<Result> observer = invocation.getArgument(1);
observer.onComplete();
return null;
})
.when(mockStreamingCallable)
.call(any(), any(), any());
ResultScanner resultScanner = dataClientWrapper.readRows(query.createPaginator(100), 3);
// Consume the stream
Lists.newArrayList(resultScanner);
verify(mockStreamingCallable, times(2))
.call(
any(Query.class),
any(ResponseObserver.class),
any(GrpcCallContext.class));
assertTrue(cancelled.get());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️
If you write sample code, please follow the samples format.