Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cdc rebase #1378

Closed
wants to merge 70 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
88bf067
chore(deps): update dependency com.google.cloud:libraries-bom to v26 …
renovate-bot Jul 12, 2022
c7539f9
build: enable longpaths support for windows test (#1485) (#1310)
gcf-owl-bot[bot] Jul 13, 2022
de35414
build(deps): update dependency org.codehaus.mojo:extra-enforcer-rules…
renovate-bot Jul 13, 2022
ddae354
deps: update dependency org.graalvm.buildtools:junit-platform-native …
renovate-bot Jul 13, 2022
c0740fe
deps: update dependency org.graalvm.buildtools:native-maven-plugin to…
renovate-bot Jul 13, 2022
5ab424c
test(deps): update dependency com.google.cloud:google-cloud-conforman…
renovate-bot Jul 13, 2022
7c77879
fix: enable integration test for google-cloud-bigtable-stats (#1311)
mutianf Jul 15, 2022
85ff18c
chore: mark native image checks as required (#1313)
mpeddada1 Jul 18, 2022
0f9854b
test(deps): update dependency com.google.cloud:google-cloud-conforman…
renovate-bot Jul 18, 2022
2aa490c
ci: update template so GAPIC_AUTO repos do not require special approv…
gcf-owl-bot[bot] Jul 19, 2022
6629821
feat: use PingAndWarm request for channel priming (#1179)
mutianf Jul 19, 2022
d182bec
build(deps): update dependency org.apache.maven.plugins:maven-deploy-…
renovate-bot Jul 19, 2022
5282589
feat: add storage utilization gib per node for autoscaling (#1317)
kolea2 Jul 22, 2022
644454a
fix: fix race condition in BuiltinMetricsTracer (#1320)
mutianf Jul 26, 2022
57fe707
chore(main): release 2.10.0 (#1302)
release-please[bot] Jul 26, 2022
7655747
deps: update dependency org.junit.vintage:junit-vintage-engine to v5.…
renovate-bot Jul 26, 2022
30e7b90
chore(main): release 2.10.1-SNAPSHOT (#1321)
release-please[bot] Jul 26, 2022
1a5b3a2
fix: retry rst stream in mutations (#1327)
mutianf Jul 29, 2022
0a44123
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 1, 2022
4fdf37e
chore(main): release 2.10.1 (#1325)
release-please[bot] Aug 1, 2022
eca3969
build(deps): update dependency org.codehaus.mojo:extra-enforcer-rules…
renovate-bot Aug 1, 2022
fdabaaf
chore(main): release 2.10.2-SNAPSHOT (#1331)
release-please[bot] Aug 2, 2022
644aeb3
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 2, 2022
cb539b5
fix: add a ReadFirstRow callable to set future in onComplete (#1326)
mutianf Aug 2, 2022
98b3349
deps: upgrade shared config to 1.5.3, exclude google-http-client and …
blakeli0 Aug 2, 2022
c4b8c03
fix: The metadata could be returned in trailer or header depends on i…
mutianf Aug 3, 2022
bee0ca0
deps: update dependency com.google.cloud:google-cloud-shared-dependen…
renovate-bot Aug 3, 2022
2313980
chore(main): release 2.10.2 (#1335)
release-please[bot] Aug 8, 2022
f82104a
chore(main): release 2.10.3-SNAPSHOT (#1340)
release-please[bot] Aug 8, 2022
8071de6
fix: declaring 2 http libraries as runtime (#1341)
suztomo Aug 8, 2022
dca1535
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 10, 2022
a283659
chore(main): release 2.10.3 (#1343)
release-please[bot] Aug 11, 2022
6ff3970
chore(main): release 2.10.4-SNAPSHOT (#1348)
release-please[bot] Aug 11, 2022
b865578
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 11, 2022
7ce915e
feat: add stackdriver exporter (#1247)
mutianf Aug 12, 2022
d315473
chore(deps): update dependency com.google.cloud:libraries-bom to v26.…
renovate-bot Aug 12, 2022
8d353a2
chore: add opencensus lincese and update readme (#1353)
mutianf Aug 15, 2022
f399240
chore: add instructions to enable builtin metrics (#1358)
mutianf Aug 16, 2022
f8d97e5
deps: update dependency com.google.cloud:google-cloud-monitoring-bom …
renovate-bot Aug 16, 2022
cb2ff66
test(deps): update dependency org.mockito:mockito-core to v4.7.0 (#1356)
renovate-bot Aug 16, 2022
dcfd512
test(deps): update dependency com.google.cloud:google-cloud-conforman…
renovate-bot Aug 16, 2022
c7ffd6b
build(deps): update dependency org.apache.maven.plugins:maven-javadoc…
renovate-bot Aug 17, 2022
8c3edb0
chore(main): release 2.11.0 (#1350)
release-please[bot] Aug 22, 2022
f81f1a5
chore(main): release 2.11.1-SNAPSHOT (#1361)
release-please[bot] Aug 22, 2022
1ac859b
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 23, 2022
c039a83
test: add integration test for builtin metrics (#1360)
mutianf Aug 25, 2022
faa5b36
build(deps): update dependency org.apache.maven.shared:maven-dependen…
renovate-bot Aug 25, 2022
1683365
fix: reset a measure map everytime the stats are recorded (#1364)
mutianf Aug 26, 2022
d1b269f
chore(main): release 2.11.1 (#1365)
release-please[bot] Aug 29, 2022
6f4c01d
chore(main): release 2.11.2-SNAPSHOT (#1366)
release-please[bot] Aug 29, 2022
54bf5f5
chore(deps): update dependency com.google.cloud:google-cloud-bigtable…
renovate-bot Aug 29, 2022
d29fada
chore(deps): update dependency com.google.cloud:libraries-bom to v26.…
renovate-bot Aug 31, 2022
930d043
fix: make cloud-monitoring a runtime dependency (#1371)
mutianf Aug 31, 2022
72a7387
test: disable integration test (#1375)
mutianf Sep 1, 2022
4174f0d
deps: update dependency com.google.cloud:google-cloud-monitoring-bom …
renovate-bot Sep 2, 2022
b3cc7f0
test: fix metrics integration test, remove the server latency since t…
mutianf Sep 2, 2022
d02f0e5
feat: copy preview Change Streams API (#1309)
tonytanger Jul 14, 2022
5021a5e
feat: Add ListChangeStreamPartitions callable (#1312)
tengzhonger Jul 20, 2022
88a7de8
feat: Create ReadChangeStreamQuery and ChangeStreamRecode::Heartbeat/…
tengzhonger Jul 26, 2022
73ea672
feat: Add ChangeStreamMutation which is a ChangeStreamRecord (#1324)
tengzhonger Aug 1, 2022
4663279
feat: Add ChangeStreamRecordAdapter and ChangeStreamStateMachine (#1334)
tengzhonger Aug 3, 2022
d3119cf
feat: Add readChangeStream callables (#1338)
tengzhonger Aug 8, 2022
865ea62
feat: Expose some package-private methods to be used by CDC beam code…
tengzhonger Aug 9, 2022
611ae58
feat: Implement ReadChangeStreamResumptionStrategy (#1344)
tengzhonger Aug 10, 2022
2cde452
feat: Add toByteString/fromByteString for ChangeStreamContinuationTok…
tengzhonger Aug 11, 2022
76d9c45
feat!: rename ListChangeStreamPartitions to GenerateInitialChangeStre…
tonytanger Aug 12, 2022
fd1804f
feat: Change CDC related APIs to return ByteStringRange instead of Ro…
tengzhonger Aug 15, 2022
7d58536
feat: Return MutationType and bigtable.common.Status instead of raw p…
tengzhonger Aug 16, 2022
31c1c8d
feat: Expose CDC data API settings in EnhancedBigtableStubSettings (#…
tengzhonger Sep 1, 2022
53f5708
Merge branch 'cdc' into cdc_rebase
tengzhonger Sep 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Add ListChangeStreamPartitions callable (#1312)
* feat: Add ListChangeStreamPartitions callable

* feat: Change return type of ListChangeStreamPartitions to RowRange

* feat: Fix format for ListChangeStreamPartitions

* fix: Address comments for ListChangeStreamPartitionsCallable

* feat: Add comments for IntervalApi for ListChangeStreamPartitions

* feat: Ignore renaming of ReadRowsConvertExceptionCallable

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong committed Sep 2, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 5021a5efda05a916d548a41948fd850546955686
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -39,6 +39,11 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
<!-- InternalApi was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
@@ -1489,6 +1490,143 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return stub.readModifyWriteRowCallable();
}

/**
* Convenience method for synchronously streaming the partitions of a table. The returned
* ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<RowRange> stream = bigtableDataClient.listChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (RowRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<RowRange> listChangeStreamPartitions(String tableId) {
return listChangeStreamPartitionsCallable().call(tableId);
}

/**
* Convenience method for asynchronously streaming the partitions of a table.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* bigtableDataClient.listChangeStreamPartitionsAsync(tableId, new ResponseObserver<RowRange>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(RowRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with partition
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
@InternalApi("Used in Changestream beam pipeline.")
public void listChangeStreamPartitionsAsync(String tableId, ResponseObserver<RowRange> observer) {
listChangeStreamPartitionsCallable().call(tableId, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* // Iterator style
* try {
* for(RowRange partition : bigtableDataClient.listChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<RowRange> partitions = bigtableDataClient.listChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<RowRange> partitionFuture =
* bigtableDataClient.listChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(RowRange result) {
* System.out.println("Got partition: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, RowRange> listChangeStreamPartitionsCallable() {
return stub.listChangeStreamPartitionsCallable();
}

/** Close the clients and releases all associated resources. */
@Override
public void close() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;

/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
final class ConvertStreamExceptionCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;

public ConvertStreamExceptionCallable(
ServerStreamingCallable<RequestT, ResponseT> innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
ConvertStreamExceptionResponseObserver<ResponseT> observer =
new ConvertStreamExceptionResponseObserver<>(responseObserver);
innerCallable.call(request, observer, context);
}

private class ConvertStreamExceptionResponseObserver<ResponseT>
implements ResponseObserver<ResponseT> {

private final ResponseObserver<ResponseT> outerObserver;

ConvertStreamExceptionResponseObserver(ResponseObserver<ResponseT> outerObserver) {
this.outerObserver = outerObserver;
}

@Override
public void onStart(StreamController controller) {
outerObserver.onStart(controller);
}

@Override
public void onResponse(ResponseT response) {
outerObserver.onResponse(response);
}

@Override
public void onError(Throwable t) {
outerObserver.onError(convertException(t));
}

@Override
public void onComplete() {
outerObserver.onComplete();
}
}

private Throwable convertException(Throwable t) {
// Long lived connections sometimes are disconnected via an RST frame. This error is
// transient and should be retried.
if (t instanceof InternalException && t.getMessage() != null) {
String error = t.getMessage().toLowerCase();
if (error.contains("rst_stream") || error.contains("rst stream")) {
return new InternalException(t, ((InternalException) t).getStatusCode(), true);
}
}
return t;
}
}
Loading