Skip to content

Commit

Permalink
feat: add response protos
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed May 12, 2022
1 parent ca28be6 commit 61c1090
Show file tree
Hide file tree
Showing 10 changed files with 1,198 additions and 70 deletions.
17 changes: 15 additions & 2 deletions google-cloud-bigtable-stats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-monitoring</artifactId>
<version>3.2.9</version>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
Expand All @@ -46,12 +50,21 @@
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-exporter-stats-stackdriver</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-monitoring</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>


<dependency>
<groupId>com.google.truth.extensions</groupId>
<artifactId>truth-proto-extension</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void recordApplicationLatency(
.put(BuiltinMeasureConstants.APPLICATION_LATENCIES, applicationLatency);

TagContextBuilder tagCtx = newTagContextBuilder(tableId, zone, cluster);

if (operationType == OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
tagCtx.putLocal(BuiltinMeasureConstants.STREAMING, TagValue.create("true"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void testStreamingOperation() throws InterruptedException {
tracer.recordGfeLatencies(serverLatency);
tracer.recordGfeMissingHeaders(connectivityErrorCount);
tracer.recordFirstResponseLatency(firstResponseLatency);

tracer.recordBatchRequestThrottled(throttlingLatency, TABLE_ID, ZONE, CLUSTER);

tracer.recordAttemptLevelWithoutStreaming(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -100,15 +102,15 @@ public void onError(Throwable t) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
// try {
// byte[] trailers =
// responseMetadata
// .getTrailingMetadata()
// .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onError(t);
}
Expand All @@ -118,14 +120,15 @@ public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
// try {
// byte[] trailers =
// responseMetadata.getTrailingMetadata().get(Metadata.Key.of(Util.TRAILER_KEY,
// Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -82,32 +84,31 @@ public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
// try {
// byte[] trailers =
// responseMetadata
// .getTrailingMetadata()
// .get(Metadata.Key.of(Util.TRAILER_KEY,
// Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
// try {
// byte[] trailers =
// responseMetadata
// .getTrailingMetadata()
// .get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
// ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
// tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
// } catch (NullPointerException | InvalidProtocolBufferException e) {
// }
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.ResponseParams;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand Down Expand Up @@ -74,7 +75,6 @@ public class BuiltinMetricsTracerTest {
private static final String CLUSTER_0 = "cluster-0";
private static final String ZONE_1 = "us-east-1";
private static final String CLUSTER_1 = "cluster-1";
private static final String UNDEFINED = "undefined";
private static final long FAKE_SERVER_TIMING = 50;
private static final long SERVER_LATENCY = 500;

Expand Down Expand Up @@ -163,30 +163,28 @@ public void sendHeaders(Metadata headers) {

@Override
public void close(Status status, Metadata trailers) {
// int currentCount = count.getAndIncrement();
// if (currentCount == 0) {
// ResponseParams params =
// ResponseParams.newBuilder()
// .setZoneId(ZONE_0)
// .setClusterId(CLUSTER_0)
// .build();
// byte[] byteArray = params.toByteArray();
// trailers.put(
// Metadata.Key.of(Util.TRAILER_KEY,
// Metadata.BINARY_BYTE_MARSHALLER),
// byteArray);
// } else {
// ResponseParams params =
// ResponseParams.newBuilder()
// .setClusterId(CLUSTER_1)
// .setZoneId(ZONE_1)
// .build();
// byte[] byteArray = params.toByteArray();
// trailers.put(
// Metadata.Key.of(Util.TRAILER_KEY,
// Metadata.BINARY_BYTE_MARSHALLER),
// byteArray);
// }
int currentCount = count.getAndIncrement();
if (currentCount == 0) {
ResponseParams params =
ResponseParams.newBuilder()
.setZoneId(ZONE_0)
.setClusterId(CLUSTER_0)
.build();
byte[] byteArray = params.toByteArray();
trailers.put(
Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER),
byteArray);
} else {
ResponseParams params =
ResponseParams.newBuilder()
.setClusterId(CLUSTER_1)
.setZoneId(ZONE_1)
.build();
byte[] byteArray = params.toByteArray();
trailers.put(
Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER),
byteArray);
}
super.close(status, trailers);
}
},
Expand Down Expand Up @@ -317,7 +315,6 @@ public void testMutateRowApplicationLatency() {
verify(builtinMetricsRecorder)
.recordApplicationLatency(
longValue.capture(), tableId.capture(), zone.capture(), cluster.capture());

// Application latency should be slightly less than the total delay between 2 requests observed
// from the server side. To make
// the test less flaky comparing with half of the server side delay here.
Expand All @@ -343,12 +340,12 @@ public void testMutateRowAttempts() {
verify(builtinMetricsRecorder, times(3))
.recordAttemptLevelWithStreaming(
status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues().get(0)).isEqualTo(UNDEFINED);
assertThat(zone.getAllValues().get(1)).isEqualTo(UNDEFINED);
assertThat(zone.getAllValues().get(2)).isEqualTo(UNDEFINED);
assertThat(cluster.getAllValues().get(0)).isEqualTo(UNDEFINED);
assertThat(cluster.getAllValues().get(1)).isEqualTo(UNDEFINED);
assertThat(cluster.getAllValues().get(2)).isEqualTo(UNDEFINED);
assertThat(zone.getAllValues().get(0)).isEqualTo(ZONE_0);
assertThat(zone.getAllValues().get(1)).isEqualTo(ZONE_1);
assertThat(zone.getAllValues().get(2)).isEqualTo(ZONE_1);
assertThat(cluster.getAllValues().get(0)).isEqualTo(CLUSTER_0);
assertThat(cluster.getAllValues().get(1)).isEqualTo(CLUSTER_1);
assertThat(cluster.getAllValues().get(2)).isEqualTo(CLUSTER_1);
assertThat(status.getAllValues().get(0)).isEqualTo("UNAVAILABLE");
assertThat(status.getAllValues().get(1)).isEqualTo("UNAVAILABLE");
assertThat(status.getAllValues().get(2)).isEqualTo("OK");
Expand Down
Loading

0 comments on commit 61c1090

Please sign in to comment.