Skip to content

Commit

Permalink
feat: add response protos (#1246)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf authored Jul 11, 2022
1 parent 93edfe1 commit 52d59ce
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 29 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:25.4.0')
implementation platform('com.google.cloud:libraries-bom:26.0.0')
implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-bigtable-stats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/** For registering built-in metric views */
@InternalApi("For internal use only")
public class BuiltinViews {

@VisibleForTesting
static final ImmutableSet<View> BIGTABLE_BUILTIN_VIEWS =
ImmutableSet.of(
Expand All @@ -44,7 +45,7 @@ void registerPrivateViews(ViewManager viewManager) {
}
}

public void registerBigtableBuiltinViews() {
public static void registerBigtableBuiltinViews() {
ViewManager viewManager = Stats.getViewManager();
for (View view : BIGTABLE_BUILTIN_VIEWS) {
viewManager.registerView(view);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.SpanName;
import io.opencensus.stats.Stats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Wrapper class for accessing opencensus. We use a shaded version of opencensus to avoid polluting
Expand All @@ -34,4 +37,16 @@ public static StatsRecorderWrapper createRecorder(
return new StatsRecorderWrapper(
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
}

// This is used in integration tests to get the tag value strings from view manager because Stats
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
@InternalApi("Visible for testing")
public static List<String> getOperationLatencyViewTagValueStrings() {
return Stats.getViewManager().getView(BuiltinViewConstants.OPERATION_LATENCIES_VIEW.getName())
.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.map(x -> x.asString())
.collect(Collectors.toCollection(ArrayList::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/**
* This callable will
*
* <p>-inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <p>-Call {@link BigtableTracer#onRequest()} to record the request events in a stream.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
* <li>-Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>-Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class BigtableTracerStreamingCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -102,6 +103,14 @@ public void onError(Throwable t) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onError(t);
}

Expand All @@ -110,6 +119,14 @@ public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

/**
* This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
* returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
* were injected in the header/trailer and publish them to OpenCensus. If {@link
* GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
* reached GFE, and it'll increment the gfe_header_missing_counter in this case.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
* This callable will:
* <li>- Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class BigtableTracerUnaryCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -78,13 +82,27 @@ public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class Util {
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");

static final String TRAILER_KEY = "x-goog-ext-425905942-bin";
static final String RESPONSE_PRAMS_KEY = "x-goog-ext-425905942-bin";

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class StreamingMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("StreamingMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);
BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String prefix = UUID.randomUUID().toString();
String uniqueKey = prefix + "-read";

Query query = Query.create(testEnvRule.env().getTableId()).rowKey(uniqueKey);
ArrayList<Row> rows = Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());

List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains(clusters.get(0).getZone());
assertThat(tagValueStrings).contains(clusters.get(0).getId());
}

@Test
public void testFailure() throws InterruptedException {
Query query = Query.create("non-exist-table");
try {
Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains("undefined");
assertThat(tagValueStrings).contains("undefined");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class UnaryMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("UnaryMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);
BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

ApiFuture<Void> future =
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.futureCall(
RowMutation.create(testEnvRule.env().getTableId(), rowKey)
.setCell(familyId, "q", "myVal"));

future.get(1, TimeUnit.MINUTES);

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());
List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains(clusters.get(0).getZone());
assertThat(tagValueStrings).contains(clusters.get(0).getId());
}

@Test
public void testFailure() throws InterruptedException {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

try {
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.call(RowMutation.create("non-exist-table", rowKey).setCell(familyId, "q", "myVal"));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains("undefined");
assertThat(tagValueStrings).contains("undefined");
}
}
Loading

0 comments on commit 52d59ce

Please sign in to comment.