Skip to content

Commit

Permalink
add integration tests and fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jul 7, 2022
1 parent 2a86c37 commit 509ffdc
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@

/**
* This callable will
*
* <li>-Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and cluster ids.
*
* <li>-Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>-Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
*
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
* applications.
*/
@InternalApi
public class BigtableTracerStreamingCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -108,9 +105,7 @@ public void onError(Throwable t) {
tracer.recordGfeMetadata(latency, t);
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand All @@ -126,9 +121,7 @@ public void onComplete() {
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@

/**
* This callable will:
*
* <li>- Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and cluster ids.
*
* <li>- Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
* applications.
*/
@InternalApi
public class BigtableTracerUnaryCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -86,9 +84,7 @@ public void onFailure(Throwable throwable) {
tracer.recordGfeMetadata(latency, throwable);
try {
byte[] trailers =
responseMetadata
.getTrailingMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand All @@ -102,9 +98,7 @@ public void onSuccess(ResponseT response) {
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
responseMetadata
.getMetadata()
.get(Metadata.Key.of(Util.TRAILER_KEY, Metadata.BINARY_BYTE_MARSHALLER));
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class Util {
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");

static final String TRAILER_KEY = "x-goog-ext-425905942-bin";
static final String RESPONSE_PRAMS_KEY = "x-goog-ext-425905942-bin";

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import com.google.common.collect.Lists;
import io.opencensus.stats.Stats;
import io.opencensus.stats.View;
import io.opencensus.stats.ViewData;
import io.opencensus.stats.ViewManager;
import io.opencensus.tags.TagValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class StreamingMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("StreamingMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);
BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String prefix = UUID.randomUUID().toString();
String uniqueKey = prefix + "-read";

Query query = Query.create(testEnvRule.env().getTableId()).rowKey(uniqueKey);
ArrayList<Row> rows = Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));

// give opencensus some time to populate view data
Thread.sleep(100);

ViewManager viewManager = Stats.getViewManager();
ViewData viewData =
viewManager.getView(
View.Name.create("bigtable.googleapis.com/internal/client/operation_latencies"));

List<TagValue> tagValues =
viewData.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.collect(Collectors.toCollection(ArrayList::new));

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());
List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

assertThat(tagValues).contains(TagValue.create(clusters.get(0).getZone()));
assertThat(tagValues).contains(TagValue.create(clusters.get(0).getId()));
}

@Test
public void testFailure() throws InterruptedException {
String prefix = UUID.randomUUID().toString();
String uniqueKey = prefix + "-read";

Query query = Query.create("non-exist-table");
try {
Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

ViewManager viewManager = Stats.getViewManager();
ViewData viewData =
viewManager.getView(
View.Name.create("bigtable.googleapis.com/internal/client/operation_latencies"));

List<TagValue> tagValues =
viewData.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.collect(Collectors.toCollection(ArrayList::new));

assertThat(tagValues).contains(TagValue.create("undefined"));
assertThat(tagValues).contains(TagValue.create("undefined"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import io.opencensus.stats.Stats;
import io.opencensus.stats.View;
import io.opencensus.stats.ViewData;
import io.opencensus.stats.ViewManager;
import io.opencensus.tags.TagValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class UnaryMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("UnaryMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);

BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

ApiFuture<Void> future =
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.futureCall(
RowMutation.create(testEnvRule.env().getTableId(), rowKey)
.setCell(familyId, "q", "myVal"));

future.get(1, TimeUnit.MINUTES);

// give opencensus some time to populate view data
Thread.sleep(100);

ViewManager viewManager = Stats.getViewManager();
ViewData viewData =
viewManager.getView(
View.Name.create("bigtable.googleapis.com/internal/client/operation_latencies"));

List<TagValue> tagValues =
viewData.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.collect(Collectors.toCollection(ArrayList::new));

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());
List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

assertThat(tagValues).contains(TagValue.create(clusters.get(0).getZone()));
assertThat(tagValues).contains(TagValue.create(clusters.get(0).getId()));
}

@Test
public void testFailure() throws InterruptedException {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

try {
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.call(RowMutation.create("non-exist-table", rowKey).setCell(familyId, "q", "myVal"));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

ViewManager viewManager = Stats.getViewManager();
ViewData viewData =
viewManager.getView(
View.Name.create("bigtable.googleapis.com/internal/client/operation_latencies"));

List<TagValue> tagValues =
viewData.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.collect(Collectors.toCollection(ArrayList::new));

assertThat(tagValues).contains(TagValue.create("undefined"));
assertThat(tagValues).contains(TagValue.create("undefined"));
}
}
Loading

0 comments on commit 509ffdc

Please sign in to comment.