From 0b87d973b6200e37004e79c7977170b4707d5d7d Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Thu, 30 May 2024 16:52:53 -0700 Subject: [PATCH] feat: add opentelemetry counters for sent and acked messages Also add network latency, queue length and error counts. The metrics (other than error counts) are now reported periodically, every second. --- .../bigquery/storage/v1/ConnectionWorker.java | 167 ++++++++++++++++-- 1 file changed, 151 insertions(+), 16 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index ab0929e211..612841304c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -31,6 +31,7 @@ import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -40,6 +41,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import java.io.IOException; @@ -265,6 +267,7 @@ class ConnectionWorker implements AutoCloseable { private static Pattern streamPatternTable = Pattern.compile(tableMatching); private Meter writeMeter; static AttributeKey telemetryKeyTableId = AttributeKey.stringKey("table_id"); + static AttributeKey telemetryKeyWriterId = AttributeKey.stringKey("writer_id"); private static String dataflowPrefix = "dataflow:"; static List> telemetryKeysTraceId = new ArrayList>() { @@ -274,10 +277,25 @@ class ConnectionWorker implements AutoCloseable { add(AttributeKey.stringKey("trace_field_3")); } }; + static AttributeKey telemetryKeyErrorCode = AttributeKey.stringKey("error_code"); + static AttributeKey telemetryKeyIsRetry = AttributeKey.stringKey("is_retry"); private Attributes telemetryAttributes; - private LongCounter instrumentIncomingRequestCount; - private LongCounter instrumentIncomingRequestSize; - private LongCounter instrumentIncomingRequestRows; + private static final List METRICS_MILLISECONDS_LATENCY_BUCKETS = + ImmutableList.of(0L, 50L, 100L, 500L, 1000L, 5000L, 10000L, 20000L, 30000L, 60000L, 120000L); + + private static final class OpenTelemetryMetrics { + private LongCounter instrumentSentRequestCount; + private LongCounter instrumentSentRequestSize; + private LongCounter instrumentSentRequestRows; + private LongCounter instrumentAckedRequestCount; + private LongCounter instrumentAckedRequestSize; + private LongCounter instrumentAckedRequestRows; + private LongHistogram instrumentNetworkResponseLatency; + private LongCounter instrumentConnectionStartCount; + private LongCounter instrumentConnectionEndCount; + } + + private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics(); public static Boolean isDefaultStreamName(String streamName) { Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName); @@ -339,6 +357,7 @@ private Attributes buildOpenTelemetryAttributes() { if (!tableName.isEmpty()) { builder.put(telemetryKeyTableId, tableName); } + builder.put(telemetryKeyWriterId, writerId); setTraceIdAttributes(builder); return builder.build(); } @@ -353,6 +372,20 @@ private void refreshOpenTelemetryTableNameAttributes() { } } + private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) { + AttributesBuilder builder = attributes.toBuilder(); + if ((errorCode != null) && !errorCode.isEmpty()) { + builder.put(telemetryKeyErrorCode, errorCode); + } + return builder.build(); + } + + private Attributes augmentAttributesWithRetry(Attributes attributes) { + AttributesBuilder builder = attributes.toBuilder(); + builder.put(telemetryKeyIsRetry, "1"); + return builder.build(); + } + @VisibleForTesting Attributes getTelemetryAttributes() { return telemetryAttributes; @@ -366,20 +399,87 @@ private void registerOpenTelemetryMetrics() { .setInstrumentationVersion( ConnectionWorker.class.getPackage().getImplementationVersion()) .build(); - instrumentIncomingRequestCount = + telemetryMetrics.instrumentSentRequestCount = + writeMeter + .counterBuilder("append_requests_sent") + .setDescription("Counts number of requests sent over the network") + .build(); + telemetryMetrics.instrumentSentRequestSize = + writeMeter + .counterBuilder("append_request_bytes_sent") + .setDescription("Counts byte size of requests sent over the network") + .build(); + telemetryMetrics.instrumentSentRequestRows = + writeMeter + .counterBuilder("append_rows_sent") + .setDescription("Counts number of request rows sent over the network") + .build(); + telemetryMetrics.instrumentAckedRequestCount = writeMeter - .counterBuilder("append_requests") - .setDescription("Counts number of incoming requests") + .counterBuilder("append_requests_acked") + .setDescription("Counts number of requests acked by the server") .build(); - instrumentIncomingRequestSize = + telemetryMetrics.instrumentAckedRequestSize = writeMeter - .counterBuilder("append_request_bytes") - .setDescription("Counts byte size of incoming requests") + .counterBuilder("append_request_bytes_acked") + .setDescription("Counts byte size of requests acked by the server") .build(); - instrumentIncomingRequestRows = + telemetryMetrics.instrumentAckedRequestRows = writeMeter - .counterBuilder("append_rows") - .setDescription("Counts number of incoming request rows") + .counterBuilder("append_rows_acked") + .setDescription("Counts number of request rows acked by the server") + .build(); + writeMeter + .gaugeBuilder("active_connection_count") + .ofLongs() + .setDescription("Reports number of active connections") + .buildWithCallback( + measurement -> { + int count = 0; + this.lock.lock(); + try { + if (streamConnectionIsConnected) { + count = 1; + } + } finally { + this.lock.unlock(); + } + measurement.record(count, getTelemetryAttributes()); + }); + writeMeter + .gaugeBuilder("inflight_queue_length") + .ofLongs() + .setDescription( + "Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.") + .buildWithCallback( + measurement -> { + int length = 0; + this.lock.lock(); + try { + length = inflightRequestQueue.size(); + } finally { + this.lock.unlock(); + } + measurement.record(length, getTelemetryAttributes()); + }); + telemetryMetrics.instrumentNetworkResponseLatency = + writeMeter + .histogramBuilder("network_response_latency") + .ofLongs() + .setDescription( + "Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.") + .setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS) + .build(); + telemetryMetrics.instrumentConnectionStartCount = + writeMeter + .counterBuilder("connection_start_count") + .setDescription( + "Counts number of connection attempts made, regardless of whether these are initial or retry.") + .build(); + telemetryMetrics.instrumentConnectionEndCount = + writeMeter + .counterBuilder("connection_end_count") + .setDescription("Counts number of connection end events.") .build(); } @@ -469,6 +569,7 @@ public void run() { private void resetConnection() { log.info("Start connecting stream: " + streamName + " id: " + writerId); + telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes()); if (this.streamConnection != null) { // It's safe to directly close the previous connection as the in flight messages // will be picked up by the next connection. @@ -615,9 +716,6 @@ private ApiFuture appendInternal( + requestWrapper.messageSize))); return requestWrapper.appendResult; } - instrumentIncomingRequestCount.add(1, getTelemetryAttributes()); - instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes()); - instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount()); this.lock.lock(); try { if (userClosed) { @@ -876,7 +974,8 @@ private void appendLoop() { } while (!localQueue.isEmpty()) { localQueue.peekFirst().setRequestSendQueueTime(); - AppendRowsRequest originalRequest = localQueue.pollFirst().message; + AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); + AppendRowsRequest originalRequest = requestWrapper.message; AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder(); // Always respect the first writer schema seen by the loop. if (writerSchema == null) { @@ -928,6 +1027,15 @@ private void appendLoop() { // In the close case, the request is in the inflight queue, and will either be returned // to the user with an error, or will be resent. this.streamConnection.send(originalRequestBuilder.build()); + Attributes telemetryAttributes = getTelemetryAttributes(); + if (requestWrapper.retryCount > 0) { + telemetryAttributes = augmentAttributesWithRetry(telemetryAttributes); + } + telemetryMetrics.instrumentSentRequestCount.add(1, telemetryAttributes); + telemetryMetrics.instrumentSentRequestSize.add( + requestWrapper.messageSize, telemetryAttributes); + telemetryMetrics.instrumentSentRequestRows.add( + originalRequest.getProtoRows().getRows().getSerializedRowsCount(), telemetryAttributes); } } cleanupConnectionAndRequests(/* avoidBlocking= */ false); @@ -1195,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) { connectionRetryStartTime = 0; } if (!this.inflightRequestQueue.isEmpty()) { + Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; + if (sendInstant != null) { + Duration durationLatency = Duration.between(sendInstant, Instant.now()); + telemetryMetrics.instrumentNetworkResponseLatency.record( + durationLatency.toMillis(), getTelemetryAttributes()); + } + requestWrapper = pollFirstInflightRequestQueue(); } else if (inflightCleanuped) { // It is possible when requestCallback is called, the inflight queue is already drained @@ -1213,6 +1328,19 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } + Attributes augmentedTelemetryAttributes = + augmentAttributesWithErrorCode( + getTelemetryAttributes(), + Code.values()[ + response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()] + .toString()); + telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes); + telemetryMetrics.instrumentAckedRequestSize.add( + requestWrapper.messageSize, augmentedTelemetryAttributes); + telemetryMetrics.instrumentAckedRequestRows.add( + requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(), + augmentedTelemetryAttributes); + // Retries need to happen on the same thread as queue locking may occur if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { @@ -1290,6 +1418,11 @@ private void doneCallback(Throwable finalStatus) { this.lock.lock(); try { this.streamConnectionIsConnected = false; + this.telemetryMetrics.instrumentConnectionEndCount.add( + 1, + augmentAttributesWithErrorCode( + getTelemetryAttributes(), + Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString())); if (connectionFinalStatus == null) { if (connectionRetryStartTime == 0) { connectionRetryStartTime = System.currentTimeMillis(); @@ -1301,6 +1434,8 @@ private void doneCallback(Throwable finalStatus) { || System.currentTimeMillis() - connectionRetryStartTime <= maxRetryDuration.toMillis())) { this.conectionRetryCountWithoutCallback++; + this.telemetryMetrics.instrumentConnectionStartCount.add( + 1, augmentAttributesWithRetry(getTelemetryAttributes())); log.info( "Connection is going to be reestablished with the next request. Retriable error " + finalStatus.toString()