-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Add paging to hbase client #4166
Changes from 8 commits
ef49c26
f5710b2
ecd88f9
88373d0
2144a6f
63c7492
40e5759
6d15df1
5254712
d1dc685
424f453
8a0d1f5
0f14715
22ae1fc
84be8a0
dc6a9dc
b8d05d3
5b99b77
435bb73
e4f34fc
b539bb5
34f00d1
a944e6b
244bbb7
a1e3d54
2aa5d17
0093a78
1667188
f394ba2
d1ef3b6
423df71
3d9f18c
fdf511b
fd15972
2a3b125
a93d3bb
55af2e0
52425f1
cced23a
06b8b2e
b50ec9f
6535e6c
d025473
1e61132
9df66fe
cdf7347
951ebef
7ba3c64
d4ed8e5
d4bede1
06227d2
947608a
e23101c
4e15493
56bb3f1
34653ed
a082a9e
871d66b
06ce458
a95e218
03c0e3b
b5fdeeb
b0c7cbf
262525f
7382aa9
5b510a1
80ffea0
da22f6e
7dd6b49
40545c1
fb86765
91ee89e
c91d6f9
410d203
ce514ee
bf23528
a819985
a6738f5
52d8a12
ab65b95
1fc9369
048ca63
e72dca0
f6132ba
a519462
96e49fb
8bccc3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,8 +47,10 @@ | |
import io.grpc.CallOptions; | ||
import io.grpc.Deadline; | ||
import io.grpc.stub.StreamObserver; | ||
import java.util.ArrayDeque; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Queue; | ||
import java.util.concurrent.TimeUnit; | ||
import javax.annotation.Nullable; | ||
import org.apache.hadoop.hbase.client.AbstractClientScanner; | ||
|
@@ -60,6 +62,7 @@ | |
public class DataClientVeneerApi implements DataClientWrapper { | ||
|
||
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter(); | ||
private static final int PAGE_SIZE = 100; | ||
|
||
private final BigtableDataClient delegate; | ||
private final ClientOperationTimeouts clientOperationTimeouts; | ||
|
@@ -70,6 +73,10 @@ public class DataClientVeneerApi implements DataClientWrapper { | |
this.clientOperationTimeouts = clientOperationTimeouts; | ||
} | ||
|
||
interface paginatorFunction { | ||
public ServerStream<Result> func(Query.QueryPaginator paginator); | ||
} | ||
|
||
@Override | ||
public BulkMutationWrapper createBulkMutation(String tableId) { | ||
return new BulkMutationVeneerApi(delegate.newBulkMutationBatcher(tableId), 0); | ||
|
@@ -136,8 +143,14 @@ public Result apply(Row row) { | |
|
||
@Override | ||
public ResultScanner readRows(Query request) { | ||
Query.QueryPaginator paginator = request.createPaginator(PAGE_SIZE); | ||
return new RowResultScanner( | ||
delegate.readRowsCallable(RESULT_ADAPTER).call(request, createScanCallContext())); | ||
paginator, | ||
(p) -> { | ||
return delegate | ||
.readRowsCallable(RESULT_ADAPTER) | ||
.call(p.getNextQuery(), createScanCallContext()); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you dont need the curly brackets or return
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
@Override | ||
|
@@ -155,7 +168,8 @@ public void readRowsAsync(Query request, StreamObserver<Result> observer) { | |
.call(request, new StreamObserverAdapter<>(observer), createScanCallContext()); | ||
} | ||
|
||
// Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed | ||
// Point reads are implemented using a streaming ReadRows RPC. So timeouts need | ||
// to be managed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please revert irrelevant changes to minimize the noise There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// similar to scans below. | ||
private ApiCallContext createReadRowCallContext() { | ||
GrpcCallContext ctx = GrpcCallContext.createDefault(); | ||
|
@@ -179,9 +193,11 @@ private ApiCallContext createReadRowCallContext() { | |
} | ||
|
||
// Support 2 bigtable-hbase features not directly available in veneer: | ||
// - per attempt deadlines - vener doesn't implement deadlines for attempts. To workaround this, | ||
// the timeouts are set per call in the ApiCallContext. However this creates a separate issue of | ||
// over running the operation deadline, so gRPC deadline is also set. | ||
// - per attempt deadlines - vener doesn't implement deadlines for attempts. To | ||
// workaround this, | ||
// the timeouts are set per call in the ApiCallContext. However this creates a | ||
// separate issue of | ||
// over running the operation deadline, so gRPC deadline is also set. | ||
private GrpcCallContext createScanCallContext() { | ||
GrpcCallContext ctx = GrpcCallContext.createDefault(); | ||
OperationTimeouts callSettings = clientOperationTimeouts.getScanTimeouts(); | ||
|
@@ -230,41 +246,67 @@ protected void onCompleteImpl() { | |
|
||
/** wraps {@link ServerStream} onto HBase {@link ResultScanner}. */ | ||
private static class RowResultScanner extends AbstractClientScanner { | ||
// Percentage of max number of rows allowed in the buffer | ||
private static final double WATERMARK_PERCENTAGE = .1; | ||
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter(); | ||
|
||
private final Meter scannerResultMeter = | ||
BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "scanner.results"); | ||
private final Timer scannerResultTimer = | ||
BigtableClientMetrics.timer( | ||
BigtableClientMetrics.MetricLevel.Debug, "scanner.results.latency"); | ||
|
||
private final ServerStream<Result> serverStream; | ||
private final Iterator<Result> iterator; | ||
|
||
RowResultScanner(ServerStream<Result> serverStream) { | ||
this.serverStream = serverStream; | ||
this.iterator = serverStream.iterator(); | ||
private ServerStream<Result> serverStream; | ||
private ByteString lastSeenRowKey = ByteString.EMPTY; | ||
private final Queue<Result> buffer; | ||
private final Query.QueryPaginator paginator; | ||
private final paginatorFunction wrapper; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wrapper is fairly ambiguous here. I think you want something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
private final int refillSegmentWaterMark; | ||
mutianf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
RowResultScanner(Query.QueryPaginator paginator, paginatorFunction wrapper) { | ||
this.paginator = paginator; | ||
this.wrapper = wrapper; | ||
this.buffer = new ArrayDeque<>(); | ||
this.refillSegmentWaterMark = (int) (PAGE_SIZE * WATERMARK_PERCENTAGE); | ||
mutianf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public Result next() { | ||
try (Context ignored = scannerResultTimer.time()) { | ||
if (!iterator.hasNext()) { | ||
// null signals EOF | ||
return null; | ||
if (this.buffer.size() < this.refillSegmentWaterMark && this.serverStream == null) { | ||
this.serverStream = this.wrapper.func(this.paginator); | ||
} | ||
if (this.buffer.isEmpty() && this.serverStream != null) { | ||
this.waitReadRowsFuture(); | ||
} | ||
|
||
scannerResultMeter.mark(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be in the if statement? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return iterator.next(); | ||
return this.buffer.poll(); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
serverStream.cancel(); | ||
if (this.serverStream != null) { | ||
this.serverStream.cancel(); | ||
} | ||
} | ||
|
||
public boolean renewLease() { | ||
throw new UnsupportedOperationException("renewLease"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why throw? Why not just always return true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
private void waitReadRowsFuture() { | ||
Iterator<Result> iterator = this.serverStream.iterator(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be significantly more efficient if you use the async api to fetch the next batch. ie SettableFuture<List<Result>> resultsFuture = SettableFuture.create();
dataClient.readRowsCallable(myRowMerger).call(req, new ResponseObserver() {
List<Result> results = new ArrayList();
onStart(Controller c) {
this.controller = c;
}
onResponse(Result r) {
results.add(r);
if (thresholdReached) {
this.controller.cancel();
}
}
onComplete() {
resultsFuture.set(results);
}
onError(e) {
resultsFuture.setExceptionally(e);
});
List<Result> results = resultsFuture.get();
.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
while (iterator.hasNext()) { | ||
Result result = iterator.next(); | ||
this.buffer.add(result); | ||
if (result == null || result.rawCells() == null) { | ||
mutianf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue; | ||
} | ||
this.lastSeenRowKey = RESULT_ADAPTER.getKey(result); | ||
} | ||
this.paginator.advance(lastSeenRowKey); | ||
this.serverStream = null; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why lower case?
Also why do you need a separate interface for this? Can it be
Function<Query.QueryPaginator, ServerStream<Result>>
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why do we need a closure to begin with? why not pass the client directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know about Function<...> :( Done this way.
Passing the client feels like it's giving the scanner too much power, when all I want it to do is call a specific function that given a paginator - gets the next page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up passing the client, to be able to use the async API