Skip to content

Commit

Permalink
deps: update versions and allow disabling client side metrics (#4350)
Browse files Browse the repository at this point in the history
* deps: update client to latest version and allow disabling client side metrics

* update

* update pom

* update pom
  • Loading branch information
mutianf authored Apr 19, 2024
1 parent 33facf5 commit efd75e1
Show file tree
Hide file tree
Showing 16 changed files with 86 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,8 @@ public class BigtableOptionsFactory {
/** Override idle timeout, for testing only. */
@VisibleForTesting
public static final String BIGTABLE_TEST_IDLE_TIMEOUT_MS = "google.bigtable.idle.timeout.ms";

/** Sets if client side metrics should be enabled. Client side metrics is enabled by default. */
public static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS =
"google.bigtable.enable.client.side.metrics";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -87,7 +87,7 @@ ApiFuture<Result> readRowAsync(

/** Read {@link Result} asynchronously, and pass them to a stream observer to be processed. */
// TODO: once veneer is implemented update this with gax's ResponseObserver.
void readRowsAsync(Query request, StreamObserver<Result> observer);
void readRowsAsync(Query request, ResponseObserver<Result> observer);

@Override
void close() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_DATA_CHANNEL_COUNT_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ENABLE_CLIENT_SIDE_METRICS;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_HOST_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_MUTATE_RPC_ATTEMPT_TIMEOUT_MS_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_MUTATE_RPC_TIMEOUT_MS_KEY;
Expand Down Expand Up @@ -81,6 +82,7 @@
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableExtendedConfiguration;
import com.google.cloud.bigtable.hbase.BigtableHBaseVersion;
Expand Down Expand Up @@ -340,6 +342,10 @@ private BigtableDataSettings buildBigtableDataSettings(ClientOperationTimeouts c
configureRetryableCallSettings(
dataBuilder.stubSettings().sampleRowKeysSettings(), clientTimeouts.getUnaryTimeouts());

if (!configuration.getBoolean(BIGTABLE_ENABLE_CLIENT_SIDE_METRICS, true)) {
dataBuilder.setMetricsProvider(NoopMetricsProvider.INSTANCE);
}

return dataBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
Expand All @@ -48,7 +47,6 @@
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -162,10 +160,8 @@ public ApiFuture<List<Result>> readRowsAsync(Query request) {
}

@Override
public void readRowsAsync(Query request, StreamObserver<Result> observer) {
delegate
.readRowsCallable(RESULT_ADAPTER)
.call(request, new StreamObserverAdapter<>(observer), createScanCallContext());
public void readRowsAsync(Query request, ResponseObserver<Result> observer) {
delegate.readRowsCallable(RESULT_ADAPTER).call(request, observer, createScanCallContext());
}

// Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed
Expand Down Expand Up @@ -218,29 +214,6 @@ public void close() {
delegate.close();
}

/** wraps {@link StreamObserver} onto GCJ {@link com.google.api.gax.rpc.ResponseObserver}. */
private static class StreamObserverAdapter<T> extends StateCheckingResponseObserver<T> {
private final StreamObserver<T> delegate;

StreamObserverAdapter(StreamObserver<T> delegate) {
this.delegate = delegate;
}

protected void onStartImpl(StreamController controller) {}

protected void onResponseImpl(T response) {
this.delegate.onNext(response);
}

protected void onErrorImpl(Throwable t) {
this.delegate.onError(t);
}

protected void onCompleteImpl() {
this.delegate.onCompleted();
}
}

/**
* wraps {@link ServerStream} onto HBase {@link ResultScanner}. {@link PaginatedRowResultScanner}
* gets a paginator and a {@link Query.QueryPaginator} used to get a {@link ServerStream}<{@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.hbase.wrappers.veneer;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -27,7 +28,6 @@
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.cloud.bigtable.hbase.wrappers.veneer.SharedDataClientWrapperFactory.Key;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -99,7 +99,7 @@ public ApiFuture<List<Result>> readRowsAsync(Query request) {
}

@Override
public void readRowsAsync(Query request, StreamObserver<Result> observer) {
public void readRowsAsync(Query request, ResponseObserver<Result> observer) {
delegate.readRowsAsync(request, observer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.grpc.Status.Code;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -470,11 +469,14 @@ public void testReadRowsAsync() throws Exception {
public void testReadRowsAsyncWithStreamOb() {
final Exception readException = new Exception();
Query request = Query.create(TABLE_ID).rowKey(ROW_KEY);
StreamObserver<Result> resultStreamOb =
new StreamObserver<Result>() {
ResponseObserver<Result> resultStreamOb =
new ResponseObserver<Result>() {
@Override
public void onNext(Result result) {
assertResult(EXPECTED_RESULT, result);
public void onStart(StreamController controller) {}

@Override
public void onResponse(Result response) {
assertResult(EXPECTED_RESULT, response);
}

@Override
Expand All @@ -483,7 +485,7 @@ public void onError(Throwable throwable) {
}

@Override
public void onCompleted() {}
public void onComplete() {}
};
when(mockDataClient.readRowsCallable(Mockito.<RowResultAdapter>any()))
.thenReturn(mockStreamingCallable);
Expand Down
4 changes: 0 additions & 4 deletions bigtable-dataflow-parent/bigtable-beam-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ limitations under the License.
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-format</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.PackageUtil;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
Expand Down
11 changes: 10 additions & 1 deletion bigtable-dataflow-parent/bigtable-hbase-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ limitations under the License.
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.14.0</version>
<version>2.20.0</version>
</dependency>

<!-- fix the version conflict where beam is using 1.8.2 and google-cloud-shared-config
is using a newer version -->
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.8.2</version>
</dependency>


<!-- fix conflict between hbase slf & beam -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ limitations under the License.
<classEntry>org/apache/hadoop/hbase/client/AbstractBigtableConnection</classEntry>
<classEntry>org/apache/hadoop/hbase/client/CommonConnection</classEntry>
<classEntry>org/apache/hadoop/hbase/client/BigtableAsyncRegistry</classEntry>
<classEntry>META-INF/versions/9/com/google/cloud/bigtable/repackaged</classEntry>
</allowedJarClassEntries>
</configuration>
</execution>
Expand Down
16 changes: 15 additions & 1 deletion bigtable-hbase-1.x-parent/bigtable-hbase-1.x-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ limitations under the License.
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</transformer>
</transformers>
<filters>
<filter>
Expand Down Expand Up @@ -277,7 +282,15 @@ limitations under the License.
</relocation>


<!-- Opencensus related shading -->
<!-- Opentelemetry - this is a bit weird, as hbase-client shares this dependency -->
<relocation>
<pattern>io.opentelemetry</pattern>
<shadedPattern>com.google.bigtable.repackaged.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/versions/9/io/opentelemetry</pattern>
<shadedPattern>META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>io.opencensus</pattern>
<shadedPattern>com.google.bigtable.repackaged.io.opencensus</shadedPattern>
Expand Down Expand Up @@ -365,6 +378,7 @@ limitations under the License.
<classEntry>org/apache/hadoop/hbase/client/AbstractBigtableConnection</classEntry>
<classEntry>org/apache/hadoop/hbase/client/CommonConnection</classEntry>
<classEntry>org/apache/hadoop/hbase/client/BigtableAsyncRegistry</classEntry>
<classEntry>META-INF/versions/9/com/google/cloud/bigtable/repackaged</classEntry>
</allowedJarClassEntries>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ limitations under the License.
<classEntry>org/apache/hadoop/hbase/client/BigtableConnectionRegistry</classEntry>
<classEntry>org/apache/hadoop/hbase/client/BigtableAsyncRegistry</classEntry>
<classEntry>org/apache/hadoop/hbase/client/CommonConnection</classEntry>
<classEntry>META-INF/versions/9/com/google/cloud/bigtable/repackaged</classEntry>
</allowedJarClassEntries>
</configuration>
</execution>
Expand Down
16 changes: 16 additions & 0 deletions bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ limitations under the License.
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</transformer>
</transformers>
<filters>
<filter>
Expand Down Expand Up @@ -269,6 +274,10 @@ limitations under the License.
<pattern>io.opentelemetry</pattern>
<shadedPattern>com.google.bigtable.repackaged.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF/versions/9/io/opentelemetry</pattern>
<shadedPattern>META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry</shadedPattern>
</relocation>
<!-- Opencensus related shading -->
<relocation>
<pattern>io.opencensus</pattern>
Expand Down Expand Up @@ -360,6 +369,7 @@ limitations under the License.
<classEntry>org/apache/hadoop/hbase/client/BigtableAsyncRegistry</classEntry>
<classEntry>org/apache/hadoop/hbase/client/CommonConnection</classEntry>
<classEntry>org/apache/hadoop/hbase/client/BigtableConnectionRegistry</classEntry>
<classEntry>META-INF/versions/9/com/google/cloud/bigtable/repackaged</classEntry>
</allowedJarClassEntries>
</configuration>
</execution>
Expand All @@ -380,6 +390,12 @@ limitations under the License.
<targetDependencies>
<targetDependency>org.apache.hbase:hbase-shaded-client</targetDependency>
</targetDependencies>
<!-- opentelemetry dependency is added to veneer since 2.38.0. We shade veneer so ignoring
opentelemetry when comparing with hbase-shaded-client is fine-->
<ignoredDependencies>
<dependency>io.opentelemetry:opentelemetry-api</dependency>
<dependency>io.opentelemetry:opentelemetry-context</dependency>
</ignoredDependencies>
</configuration>
</execution>
<execution>
Expand Down
4 changes: 4 additions & 0 deletions bigtable-hbase-2.x-parent/bigtable-hbase-2.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ limitations under the License.
<!-- make sure that we are a strict superset of hbase -->
<targetDependency>org.apache.hbase:hbase-shaded-client</targetDependency>
</targetDependencies>
<ignoredDependencies>
<dependency>io.opentelemetry:opentelemetry-api</dependency>
<dependency>io.opentelemetry:opentelemetry-context</dependency>
</ignoredDependencies>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.stream.Collectors.toList;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand All @@ -34,7 +36,6 @@
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
Expand Down Expand Up @@ -477,10 +478,13 @@ public void scan(Scan scan, final ScanResultConsumer consumer) {
Query query = hbaseAdapter.adapt(scan);
clientWrapper.readRowsAsync(
query,
new StreamObserver<Result>() {
new ResponseObserver<Result>() {
@Override
public void onNext(Result value) {
consumer.onNext(value);
public void onStart(StreamController controller) {}

@Override
public void onResponse(Result response) {
consumer.onNext(response);
}

@Override
Expand All @@ -489,7 +493,7 @@ public void onError(Throwable t) {
}

@Override
public void onCompleted() {
public void onComplete() {
consumer.onComplete();
}
});
Expand Down
Loading

0 comments on commit efd75e1

Please sign in to comment.