Skip to content

Commit

Permalink
make metrics consistent with cloud monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jul 26, 2022
1 parent 80ea5b1 commit 1c4215f
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class BigtableStackdriverExportUtils {
private static final io.opencensus.common.Function<ExplicitOptions, BucketOptions>
bucketOptionsExplicitFunction;

private static final String DOMAIN = "bigtable.googleapis.com/internal/client/";

private static final Set<String> PROMOTED_RESOURCE_LABELS =
ImmutableSet.of(
BuiltinMeasureConstants.PROJECT_ID.getName(),
Expand Down Expand Up @@ -142,7 +140,7 @@ static TimeSeries convertTimeSeries(
metricTagValues.add(labelValues.get(i));
}
}
metricTagKeys.add(LabelKey.create(BuiltinMeasureConstants.CLIENT_ID.getName(), "client id"));
metricTagKeys.add(LabelKey.create(BuiltinMeasureConstants.CLIENT_UID.getName(), "client id"));
metricTagValues.add(LabelValue.create(clientId));

TimeSeries.Builder builder = TimeSeries.newBuilder();
Expand All @@ -161,7 +159,7 @@ static TimeSeries convertTimeSeries(
static com.google.api.Metric createMetric(
String metricName, List<LabelKey> labelKeys, List<LabelValue> labelValues) {
com.google.api.Metric.Builder builder = com.google.api.Metric.newBuilder();
builder.setType(DOMAIN + metricName);
builder.setType(metricName);
Map<String, String> stringTagMap = Maps.newHashMap();

for (int i = 0; i < labelValues.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class BigtableStackdriverStatsExporter {
@GuardedBy("monitor")
private static BigtableStackdriverStatsExporter instance = null;

private static final Duration EXPORT_INTERVAL = Duration.create(600, 0);
private static final Duration EXPORT_INTERVAL = Duration.create(60, 0);
private static final String RESOURCE_TYPE = "bigtable_client_raw";

private final IntervalMetricReader intervalMetricReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
class BuiltinMeasureConstants {
// Monitored resource TagKeys
static final TagKey PROJECT_ID = TagKey.create("project_id");
static final TagKey INSTANCE_ID = TagKey.create("instance_id");
static final TagKey INSTANCE_ID = TagKey.create("instance");
static final TagKey CLUSTER = TagKey.create("cluster");
static final TagKey TABLE = TagKey.create("table");
static final TagKey ZONE = TagKey.create("zone");
// Placeholder TagKey to be used in Stackdriver exporter
static final TagKey CLIENT_ID = TagKey.create("client_id");
static final TagKey CLIENT_UID = TagKey.create("client_uid");

// Metrics TagKeys
static final TagKey APP_PROFILE = TagKey.create("app_profile");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BuiltinViewConstants {
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0,
100.0)));

private static final Aggregation AGGREGATION_ERROR_COUNT = Sum.create();
private static final Aggregation AGGREGATION_COUNT = Sum.create();

static final View OPERATION_LATENCIES_VIEW =
View.create(
Expand Down Expand Up @@ -102,7 +102,7 @@ class BuiltinViewConstants {
View.Name.create("bigtable.googleapis.com/internal/client/retry_count"),
"The number of additional RPCs sent after the initial attempt.",
RETRY_COUNT,
AGGREGATION_RETRY_COUNT,
AGGREGATION_COUNT,
ImmutableList.of(
PROJECT_ID,
INSTANCE_ID,
Expand Down Expand Up @@ -154,7 +154,7 @@ class BuiltinViewConstants {
View.Name.create("bigtable.googleapis.com/internal/client/connectivity_error_count"),
"Number of requests that failed to reach the Google datacenter. (Requests without google response headers).",
CONNECTIVITY_ERROR_COUNT,
AGGREGATION_ERROR_COUNT,
AGGREGATION_COUNT,
ImmutableList.of(
PROJECT_ID,
INSTANCE_ID,
Expand All @@ -173,15 +173,7 @@ class BuiltinViewConstants {
APPLICATION_LATENCIES,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE,
METHOD,
STREAMING,
CLIENT_NAME,
CLUSTER,
ZONE,
TABLE));
PROJECT_ID, INSTANCE_ID, APP_PROFILE, METHOD, CLIENT_NAME, CLUSTER, ZONE, TABLE));

static final View THROTTLING_LATENCIES_VIEW =
View.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testTimeSeries() {
assertThat(timeSeries.getMetric().getLabelsMap())
.containsAtLeast(BuiltinMeasureConstants.APP_PROFILE.getName(), appProfileId);
assertThat(timeSeries.getMetric().getLabelsMap())
.containsKey(BuiltinMeasureConstants.CLIENT_ID.getName());
.containsKey(BuiltinMeasureConstants.CLIENT_UID.getName());

assertThat(timeSeries.getPoints(0).getValue().getDoubleValue()).isEqualTo(fakeValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", settings.getProjectId())
.put("instance_id", settings.getInstanceId())
.put("instance", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.build();
// Inject Opencensus instrumentation
Expand Down Expand Up @@ -335,10 +335,13 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ServerStreamingCallable<Query, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(readRowsUserCallable);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);
withBigtableTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -367,10 +370,19 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
.build(),
rowAdapter);

UnaryCallable<Query, RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext).first();
ServerStreamingCallable<Query, RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ServerStreamingCallable<Query, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(readRowCallable);

SpanName span = getSpanName("ReadRow");

return createUserFacingUnaryCallable("ReadRow", readRowCallable);
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable(
withBigtableTracer, clientContext.getTracerFactory(), span);

return traced.first().withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -432,13 +444,10 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);
new ReadRowsRetryCompletedCallable<>(watched);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -104,8 +105,13 @@ public void onError(Throwable t) {
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
try {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
MoreObjects.firstNonNull(
metadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)),
trailingMetadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand All @@ -120,8 +126,13 @@ public void onComplete() {
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
MoreObjects.firstNonNull(
metadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)),
trailingMetadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -83,8 +84,13 @@ public void onFailure(Throwable throwable) {
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
try {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
MoreObjects.firstNonNull(
metadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)),
trailingMetadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand All @@ -97,8 +103,13 @@ public void onSuccess(ResponseT response) {
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
MoreObjects.firstNonNull(
metadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)),
trailingMetadata.get(
Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER)));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
Expand Down

0 comments on commit 1c4215f

Please sign in to comment.