diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index af6cbe8c3a..76c0ed9ad2 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -31,6 +31,7 @@ import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -40,6 +41,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import java.io.IOException; @@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable { private static Pattern streamPatternTable = Pattern.compile(tableMatching); private Meter writeMeter; static AttributeKey telemetryKeyTableId = AttributeKey.stringKey("table_id"); + static AttributeKey telemetryKeyWriterId = AttributeKey.stringKey("writer_id"); private static String dataflowPrefix = "dataflow:"; static List> telemetryKeysTraceId = new ArrayList>() { @@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable { add(AttributeKey.stringKey("trace_field_3")); } }; + static AttributeKey telemetryKeyErrorCode = AttributeKey.stringKey("error_code"); + static AttributeKey telemetryKeyIsRetry = AttributeKey.stringKey("is_retry"); private Attributes telemetryAttributes; - private LongCounter instrumentIncomingRequestCount; - private LongCounter instrumentIncomingRequestSize; - private LongCounter instrumentIncomingRequestRows; + // Latency buckets are based on a list of 1.5 ^ n + private static final List METRICS_MILLISECONDS_LATENCY_BUCKETS = + ImmutableList.of( + 0L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L, 127834L, + 287627L, 647160L); + + private static final class OpenTelemetryMetrics { + private LongCounter instrumentAckedRequestCount; + private LongCounter instrumentAckedRequestSize; + private LongCounter instrumentAckedRequestRows; + private LongHistogram instrumentNetworkResponseLatency; + private LongCounter instrumentConnectionStartCount; + private LongCounter instrumentConnectionEndCount; + } + + private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics(); public static Boolean isDefaultStreamName(String streamName) { Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName); @@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) { } } + // Specify common attributes for all metrics. + // For example, table name and writer id. + // Metrics dashboards can be filtered on available attributes. private Attributes buildOpenTelemetryAttributes() { AttributesBuilder builder = Attributes.builder(); String tableName = getTableName(); if (!tableName.isEmpty()) { builder.put(telemetryKeyTableId, tableName); } + builder.put(telemetryKeyWriterId, writerId); setTraceIdAttributes(builder); return builder.build(); } + // Refresh the table name attribute when multiplexing switches between tables. private void refreshOpenTelemetryTableNameAttributes() { String tableName = getTableName(); if (!tableName.isEmpty() @@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() { } } + // Build new attributes augmented with an error code string. + private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) { + AttributesBuilder builder = attributes.toBuilder(); + if ((errorCode != null) && !errorCode.isEmpty()) { + builder.put(telemetryKeyErrorCode, errorCode); + } + return builder.build(); + } + + // Build new attributes augmented with a flag indicating this was a retry. + private Attributes augmentAttributesWithRetry(Attributes attributes) { + AttributesBuilder builder = attributes.toBuilder(); + builder.put(telemetryKeyIsRetry, "1"); + return builder.build(); + } + @VisibleForTesting Attributes getTelemetryAttributes() { return telemetryAttributes; @@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() { .setInstrumentationVersion( ConnectionWorker.class.getPackage().getImplementationVersion()) .build(); - instrumentIncomingRequestCount = + telemetryMetrics.instrumentAckedRequestCount = + writeMeter + .counterBuilder("append_requests_acked") + .setDescription("Counts number of requests acked by the server") + .build(); + telemetryMetrics.instrumentAckedRequestSize = + writeMeter + .counterBuilder("append_request_bytes_acked") + .setDescription("Counts byte size of requests acked by the server") + .build(); + telemetryMetrics.instrumentAckedRequestRows = + writeMeter + .counterBuilder("append_rows_acked") + .setDescription("Counts number of request rows acked by the server") + .build(); + writeMeter + .gaugeBuilder("active_connection_count") + .ofLongs() + .setDescription("Reports number of active connections") + .buildWithCallback( + measurement -> { + int count = 0; + this.lock.lock(); + try { + if (streamConnectionIsConnected) { + count = 1; + } + } finally { + this.lock.unlock(); + } + measurement.record(count, getTelemetryAttributes()); + }); + writeMeter + .gaugeBuilder("inflight_queue_length") + .ofLongs() + .setDescription( + "Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.") + .buildWithCallback( + measurement -> { + int length = 0; + this.lock.lock(); + try { + length = inflightRequestQueue.size(); + } finally { + this.lock.unlock(); + } + measurement.record(length, getTelemetryAttributes()); + }); + telemetryMetrics.instrumentNetworkResponseLatency = writeMeter - .counterBuilder("append_requests") - .setDescription("Counts number of incoming requests") + .histogramBuilder("network_response_latency") + .ofLongs() + .setDescription( + "Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.") + .setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS) .build(); - instrumentIncomingRequestSize = + telemetryMetrics.instrumentConnectionStartCount = writeMeter - .counterBuilder("append_request_bytes") - .setDescription("Counts byte size of incoming requests") + .counterBuilder("connection_start_count") + .setDescription( + "Counts number of connection attempts made, regardless of whether these are initial or retry.") .build(); - instrumentIncomingRequestRows = + telemetryMetrics.instrumentConnectionEndCount = writeMeter - .counterBuilder("append_rows") - .setDescription("Counts number of incoming request rows") + .counterBuilder("connection_end_count") + .setDescription("Counts number of connection end events.") .build(); } @@ -465,6 +556,7 @@ public void run() { private void resetConnection() { log.info("Start connecting stream: " + streamName + " id: " + writerId); + telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes()); if (this.streamConnection != null) { // It's safe to directly close the previous connection as the in flight messages // will be picked up by the next connection. @@ -618,9 +710,6 @@ private ApiFuture appendInternal( + requestWrapper.messageSize))); return requestWrapper.appendResult; } - instrumentIncomingRequestCount.add(1, getTelemetryAttributes()); - instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes()); - instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount()); this.lock.lock(); try { if (userClosed) { @@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) { connectionRetryStartTime = 0; } if (!this.inflightRequestQueue.isEmpty()) { + Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; + if (sendInstant != null) { + Duration durationLatency = Duration.between(sendInstant, Instant.now()); + telemetryMetrics.instrumentNetworkResponseLatency.record( + durationLatency.toMillis(), getTelemetryAttributes()); + } + requestWrapper = pollFirstInflightRequestQueue(); requestProfilerHook.endOperation( RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId); @@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } + Attributes augmentedTelemetryAttributes = + augmentAttributesWithErrorCode( + getTelemetryAttributes(), + Code.values()[ + response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()] + .toString()); + if (requestWrapper.retryCount > 0) { + augmentedTelemetryAttributes = augmentAttributesWithRetry(augmentedTelemetryAttributes); + } + telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes); + telemetryMetrics.instrumentAckedRequestSize.add( + requestWrapper.messageSize, augmentedTelemetryAttributes); + telemetryMetrics.instrumentAckedRequestRows.add( + requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(), + augmentedTelemetryAttributes); + // Retries need to happen on the same thread as queue locking may occur if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { @@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) { this.lock.lock(); try { this.streamConnectionIsConnected = false; + this.telemetryMetrics.instrumentConnectionEndCount.add( + 1, + augmentAttributesWithErrorCode( + getTelemetryAttributes(), + Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString())); if (connectionFinalStatus == null) { if (connectionRetryStartTime == 0) { connectionRetryStartTime = System.currentTimeMillis(); @@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) { || System.currentTimeMillis() - connectionRetryStartTime <= maxRetryDuration.toMillis())) { this.conectionRetryCountWithoutCallback++; + this.telemetryMetrics.instrumentConnectionStartCount.add( + 1, augmentAttributesWithRetry(getTelemetryAttributes())); log.info( "Connection is going to be reestablished with the next request. Retriable error " + finalStatus.toString()