Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add paging to hbase client #4166

Merged
merged 87 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
ef49c26
Added paging to hbase client
ron-gal Sep 21, 2023
f5710b2
minor fixes
ron-gal Sep 21, 2023
ecd88f9
minor fixes
ron-gal Sep 22, 2023
88373d0
Fix tests
ron-gal Sep 22, 2023
2144a6f
Fix tests
ron-gal Sep 22, 2023
63c7492
Fix lint
ron-gal Sep 22, 2023
40e5759
test
ron-gal Sep 25, 2023
6d15df1
test
ron-gal Sep 25, 2023
5254712
test
ron-gal Sep 25, 2023
d1dc685
test
ron-gal Sep 26, 2023
424f453
test
ron-gal Sep 26, 2023
8a0d1f5
test
ron-gal Sep 26, 2023
0f14715
test
ron-gal Sep 26, 2023
22ae1fc
test
ron-gal Sep 26, 2023
84be8a0
test
ron-gal Sep 26, 2023
dc6a9dc
test
ron-gal Sep 26, 2023
b8d05d3
test
ron-gal Sep 26, 2023
5b99b77
test
ron-gal Sep 26, 2023
435bb73
test
ron-gal Sep 26, 2023
e4f34fc
test
ron-gal Sep 27, 2023
b539bb5
test
ron-gal Sep 27, 2023
34f00d1
test
ron-gal Sep 27, 2023
a944e6b
test
ron-gal Sep 28, 2023
244bbb7
test
ron-gal Sep 28, 2023
a1e3d54
test
ron-gal Sep 28, 2023
2aa5d17
test
ron-gal Sep 29, 2023
0093a78
test
ron-gal Sep 29, 2023
1667188
test
ron-gal Sep 29, 2023
f394ba2
test
ron-gal Sep 29, 2023
d1ef3b6
test
ron-gal Oct 2, 2023
423df71
test
ron-gal Oct 2, 2023
3d9f18c
Brought back test and fixed page size handling
ron-gal Oct 3, 2023
fdf511b
fixed test
ron-gal Oct 3, 2023
fd15972
add tests
ron-gal Oct 4, 2023
2a3b125
minor refactor
ron-gal Oct 5, 2023
a93d3bb
minor refactor
ron-gal Oct 5, 2023
55af2e0
add error handling tests
ron-gal Oct 5, 2023
52425f1
fix format
ron-gal Oct 5, 2023
cced23a
Add protection against OOM exceptions
ron-gal Oct 6, 2023
06b8b2e
Add protection against OOM exceptions
ron-gal Oct 6, 2023
b50ec9f
Remove useless assertion
ron-gal Oct 17, 2023
6535e6c
handle setCaching properly
ron-gal Oct 24, 2023
d025473
handle setCaching properly
ron-gal Oct 24, 2023
1e61132
handle setCaching properly
ron-gal Oct 24, 2023
9df66fe
handle setCaching properly
ron-gal Oct 24, 2023
cdf7347
handle setCaching properly
ron-gal Oct 24, 2023
951ebef
handle setCaching properly
ron-gal Oct 25, 2023
7ba3c64
remove useless code
ron-gal Nov 2, 2023
d4ed8e5
Get page size directly from the paginator
ron-gal Nov 2, 2023
d4bede1
fix lint
ron-gal Nov 2, 2023
06227d2
cancel serverStream when reaching memory limit
ron-gal Nov 2, 2023
947608a
add test for low memory
ron-gal Nov 2, 2023
e23101c
add test for low memory
ron-gal Nov 2, 2023
4e15493
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 2, 2023
56bb3f1
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 3, 2023
34653ed
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 6, 2023
a082a9e
fix lint
ron-gal Nov 6, 2023
871d66b
update java-bigtable dependency
ron-gal Nov 7, 2023
06ce458
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 8, 2023
a95e218
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 8, 2023
03c0e3b
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 15, 2023
b5fdeeb
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 15, 2023
b0c7cbf
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 16, 2023
262525f
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 16, 2023
7382aa9
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 21, 2023
5b510a1
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 21, 2023
80ffea0
Fixed several PR comments
ron-gal Nov 21, 2023
da22f6e
Fixed several PR comments
ron-gal Nov 22, 2023
7dd6b49
Fixed several PR comments
ron-gal Nov 22, 2023
40545c1
Moved to async API
ron-gal Nov 22, 2023
fb86765
Fixed several PR comments
ron-gal Nov 29, 2023
91ee89e
Fixed several PR comments
ron-gal Nov 29, 2023
c91d6f9
Fixed several PR comments
ron-gal Nov 29, 2023
410d203
Fixed several PR comments
ron-gal Nov 29, 2023
ce514ee
Fixed several PR comments
ron-gal Nov 29, 2023
bf23528
Fixed several PR comments
ron-gal Nov 29, 2023
a819985
Merge branch 'googleapis:main' into hbase_paging
ron-gal Nov 29, 2023
a6738f5
Fixed according to PR comments
ron-gal Dec 7, 2023
52d8a12
Fix wrong advance
ron-gal Dec 8, 2023
ab65b95
Fixed according to PR
ron-gal Dec 13, 2023
1fc9369
Fixed according to PR
ron-gal Dec 13, 2023
048ca63
Fixed according to PR
ron-gal Dec 13, 2023
e72dca0
remove test
ron-gal Dec 13, 2023
f6132ba
adjust tests according to PR
ron-gal Dec 13, 2023
a519462
adjust tests according to PR
ron-gal Dec 14, 2023
96e49fb
Merge branch 'googleapis:main' into hbase_paging
ron-gal Feb 12, 2024
8bccc3c
fix bug found on beam
ron-gal Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,22 @@ public void testGetScannerNoQualifiers() throws IOException {
}

@Test
public void test100ResultsInScanner() throws IOException {
public void testManyResultsInScanner_lessThanPageSize() throws IOException {
testManyResultsInScanner(95);
}

@Test
public void testManyResultsInScanner_equalToPageSize() throws IOException {
testManyResultsInScanner(100);
}

@Test
public void testManyResultsInScanner_greaterThanPageSize() throws IOException {
testManyResultsInScanner(105);
}
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private void testManyResultsInScanner(int rowsToWrite) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I should be more clear on my previous comment. I think we want to test both getScanner without pagination and getScanner with pagiation. So maybe we can add a parameter:

testManyResultsInScanner(int rowsToWrite, boolean with pagination);

And add one more test: testManyResultsInScanner(100, false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

String prefix = "scan_row_";
int rowsToWrite = 100;

// Initialize variables
Table table = getDefaultTable();
Expand Down Expand Up @@ -210,7 +223,7 @@ public void test100ResultsInScanner() throws IOException {

Scan scan = new Scan();
scan.withStartRow(rowKeys[0])
.withStopRow(rowFollowing(rowKeys[rowsToWrite - 1]))
.withStopRow(rowFollowingSameLength(rowKeys[rowsToWrite - 1]))
mutianf marked this conversation as resolved.
Show resolved Hide resolved
.addFamily(COLUMN_FAMILY);

try (ResultScanner resultScanner = table.getScanner(scan)) {
Expand Down Expand Up @@ -277,7 +290,7 @@ public void testScanDelete() throws IOException {

Scan scan = new Scan();
scan.withStartRow(rowKeys[0])
.withStopRow(rowFollowing(rowKeys[rowsToWrite - 1]))
.withStopRow(rowFollowingSameLength(rowKeys[rowsToWrite - 1]))
.addFamily(COLUMN_FAMILY);
int deleteCount = 0;
try (ResultScanner resultScanner = table.getScanner(scan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.hbase.adapters.Adapters;
Expand Down Expand Up @@ -298,8 +299,14 @@ public ResultScanner getScanner(final Scan scan) throws IOException {
LOG.trace("getScanner(Scan)");
Span span = TRACER.spanBuilder("BigtableTable.scan").startSpan();
try (Scope scope = TRACER.withSpan(span)) {

final ResultScanner scanner = clientWrapper.readRows(hbaseAdapter.adapt(scan));
ResultScanner scanner;
if (scan.getCaching() == -1) {
scanner = clientWrapper.readRows(hbaseAdapter.adapt(scan));
} else {
Query.QueryPaginator paginator =
hbaseAdapter.adapt(scan).createPaginator(scan.getCaching());
scanner = clientWrapper.readRows(paginator, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can pass in the default to begin with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's private, and I wouldn't want to expose it. This value is just for tests, so I don't think it's critical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move the default value in this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
if (hasWhileMatchFilter(scan.getFilter())) {
return Adapters.BIGTABLE_WHILE_MATCH_RESULT_RESULT_SCAN_ADAPTER.adapt(scanner, span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ ApiFuture<Result> readRowAsync(

@Override
void close() throws IOException;

ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
Expand All @@ -43,13 +44,21 @@
import com.google.cloud.bigtable.metrics.Timer;
import com.google.cloud.bigtable.metrics.Timer.Context;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -134,6 +143,12 @@ public Result apply(Row row) {
MoreExecutors.directExecutor());
}

@Override
public ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize) {
return new PaginatedRowResultScanner(
paginator, delegate, maxSegmentByteSize, () -> this.createScanCallContext());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you defer the creation of the ScanCallContext? The effect of re-creating it is that you are extending the operation timeout across all of the pages. I think the operation deadline should stay consistent from the callers perspective regardless if its broken up into multiple segments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Override
public ResultScanner readRows(Query request) {
return new RowResultScanner(
Expand All @@ -155,7 +170,8 @@ public void readRowsAsync(Query request, StreamObserver<Result> observer) {
.call(request, new StreamObserverAdapter<>(observer), createScanCallContext());
}

// Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed
// Point reads are implemented using a streaming ReadRows RPC. So timeouts need
// to be managed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert irrelevant changes to minimize the noise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// similar to scans below.
private ApiCallContext createReadRowCallContext() {
GrpcCallContext ctx = GrpcCallContext.createDefault();
Expand All @@ -179,9 +195,11 @@ private ApiCallContext createReadRowCallContext() {
}

// Support 2 bigtable-hbase features not directly available in veneer:
// - per attempt deadlines - vener doesn't implement deadlines for attempts. To workaround this,
// the timeouts are set per call in the ApiCallContext. However this creates a separate issue of
// over running the operation deadline, so gRPC deadline is also set.
// - per attempt deadlines - vener doesn't implement deadlines for attempts. To
// workaround this,
// the timeouts are set per call in the ApiCallContext. However this creates a
// separate issue of
// over running the operation deadline, so gRPC deadline is also set.
private GrpcCallContext createScanCallContext() {
GrpcCallContext ctx = GrpcCallContext.createDefault();
OperationTimeouts callSettings = clientOperationTimeouts.getScanTimeouts();
Expand Down Expand Up @@ -228,9 +246,167 @@ protected void onCompleteImpl() {
}
}

/**
* wraps {@link ServerStream} onto HBase {@link ResultScanner}. {@link PaginatedRowResultScanner}
* gets a paginator and a {@link Query.QueryPaginator} used to get a {@link ServerStream}<{@link
* Result}> using said paginator to iterate over pages of rows. The {@link Query.QueryPaginator}
* pageSize property indicates the size of each page in every API call. A cache of a maximum size
* of 1.1*pageSize and a minimum of 0.1*pageSize is held at all times. In order to avoid OOM
* exceptions, there is a limit for the total byte size held in cache.
*/
static class PaginatedRowResultScanner extends AbstractClientScanner {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
// Percentage of max number of rows allowed in the buffer
private static final double WATERMARK_PERCENTAGE = .1;
private static final int MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024;
private static final double DEFAULT_BYTE_LIMIT_PERCENTAGE = .1;
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter();

private final Meter scannerResultMeter =
BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "scanner.results");
private final Timer scannerResultTimer =
BigtableClientMetrics.timer(
BigtableClientMetrics.MetricLevel.Debug, "scanner.results.latency");
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private ByteString lastSeenRowKey = ByteString.EMPTY;
private Boolean hasMore = true;
private final Queue<Result> buffer;
private final Query.QueryPaginator paginator;
private final int refillSegmentWaterMark;
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private final BigtableDataClient dataClient;

private static final long DEFAULT_MAX_SEGMENT_SIZE =
(long)
Math.max(
MIN_BYTE_BUFFER_SIZE,
(Runtime.getRuntime().totalMemory() * DEFAULT_BYTE_LIMIT_PERCENTAGE));
private final long maxSegmentByteSize;

private long currentByteSize = 0;

private @Nullable Future<List<Result>> future;
private Supplier<GrpcCallContext> createScanCallContext;

PaginatedRowResultScanner(
Query.QueryPaginator paginator,
BigtableDataClient dataClient,
long maxSegmentByteSize,
Supplier<GrpcCallContext> createScanCallContext) {
if (maxSegmentByteSize < 0) {
maxSegmentByteSize = DEFAULT_MAX_SEGMENT_SIZE;
}
this.maxSegmentByteSize = maxSegmentByteSize;

this.paginator = paginator;
this.dataClient = dataClient;
this.buffer = new ArrayDeque<>();
this.refillSegmentWaterMark =
(int) Math.max(1, paginator.getPageSize() * WATERMARK_PERCENTAGE);
this.createScanCallContext = createScanCallContext;
this.future = fetchNextSegment();
}

@Override
public Result next() {
try (Context ignored = scannerResultTimer.time()) {
if (this.buffer.size() < this.refillSegmentWaterMark && this.future == null && hasMore) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
future = fetchNextSegment();
}
if (this.buffer.isEmpty() && this.future != null) {
try {
this.waitReadRowsFuture();
} catch (IOException e) {
return null;
}
}
scannerResultMeter.mark();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in the if statement?

Copy link
Contributor Author

@ron-gal ron-gal Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Result result = this.buffer.poll();
if (result != null) {
currentByteSize -= Result.getTotalSizeOfCells(result);
}
return result;
}
}

@Override
public void close() {
if (this.future != null) {
this.future.cancel(true);
}
}

public boolean renewLease() {
return true;
}

private Future<List<Result>> fetchNextSegment() {
SettableFuture<List<Result>> resultsFuture = SettableFuture.create();

dataClient
.readRowsCallable(RESULT_ADAPTER)
.call(
paginator.getNextQuery(),
new ResponseObserver<Result>() {
private StreamController controller;
List<Result> results = new ArrayList();

long currentByteSize = 0;
boolean byteLimitReached = false;

@Override
public void onStart(StreamController controller) {
this.controller = controller;
}

@Override
public void onResponse(Result result) {
// calculate size of the response
currentByteSize += Result.getTotalSizeOfCells(result);
results.add(result);
if (result != null && result.rawCells() != null) {
lastSeenRowKey = RESULT_ADAPTER.getKey(result);
}

if (currentByteSize > maxSegmentByteSize) {
byteLimitReached = true;
controller.cancel();
return;
}
}

@Override
public void onError(Throwable t) {
resultsFuture.setException(t);
}

@Override
public void onComplete() {
hasMore = paginator.advance(lastSeenRowKey);
resultsFuture.set(results);
}
},
this.createScanCallContext.get());
return resultsFuture;
}

private void waitReadRowsFuture() throws IOException {
try {
List<Result> results = future.get();
this.buffer.addAll(results);
this.hasMore = this.paginator.advance(this.lastSeenRowKey);
this.future = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw new IOException(cause);
}
}
}

/** wraps {@link ServerStream} onto HBase {@link ResultScanner}. */
private static class RowResultScanner extends AbstractClientScanner {

private final Meter scannerResultMeter =
BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "scanner.results");
private final Timer scannerResultTimer =
Expand Down Expand Up @@ -260,11 +436,11 @@ public Result next() {

@Override
public void close() {
serverStream.cancel();
this.serverStream.cancel();
}

public boolean renewLease() {
throw new UnsupportedOperationException("renewLease");
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,9 @@ public void close() throws IOException {
delegate.close();
owner.release(key);
}

@Override
public ResultScanner readRows(Query.QueryPaginator paginator, long maxSegmentByteSize) {
return delegate.readRows(paginator, maxSegmentByteSize);
}
}
Loading