Skip to content

Commit

Permalink
feat: add opentelemetry counters for sent and acked messages
Browse files Browse the repository at this point in the history
Also add network latency, queue length and error counts.

The metrics (other than error counts) are now reported periodically,
every second.
  • Loading branch information
agrawal-siddharth committed Jul 20, 2024
1 parent e6a6cb4 commit 0b87d97
Showing 1 changed file with 151 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand All @@ -40,6 +41,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
Expand Down Expand Up @@ -265,6 +267,7 @@ class ConnectionWorker implements AutoCloseable {
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
Expand All @@ -274,10 +277,25 @@ class ConnectionWorker implements AutoCloseable {
add(AttributeKey.stringKey("trace_field_3"));
}
};
static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
static AttributeKey<String> telemetryKeyIsRetry = AttributeKey.stringKey("is_retry");
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;
private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
ImmutableList.of(0L, 50L, 100L, 500L, 1000L, 5000L, 10000L, 20000L, 30000L, 60000L, 120000L);

private static final class OpenTelemetryMetrics {
private LongCounter instrumentSentRequestCount;
private LongCounter instrumentSentRequestSize;
private LongCounter instrumentSentRequestRows;
private LongCounter instrumentAckedRequestCount;
private LongCounter instrumentAckedRequestSize;
private LongCounter instrumentAckedRequestRows;
private LongHistogram instrumentNetworkResponseLatency;
private LongCounter instrumentConnectionStartCount;
private LongCounter instrumentConnectionEndCount;
}

private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
Expand Down Expand Up @@ -339,6 +357,7 @@ private Attributes buildOpenTelemetryAttributes() {
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
builder.put(telemetryKeyWriterId, writerId);
setTraceIdAttributes(builder);
return builder.build();
}
Expand All @@ -353,6 +372,20 @@ private void refreshOpenTelemetryTableNameAttributes() {
}
}

private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
AttributesBuilder builder = attributes.toBuilder();
if ((errorCode != null) && !errorCode.isEmpty()) {
builder.put(telemetryKeyErrorCode, errorCode);
}
return builder.build();
}

private Attributes augmentAttributesWithRetry(Attributes attributes) {
AttributesBuilder builder = attributes.toBuilder();
builder.put(telemetryKeyIsRetry, "1");
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
Expand All @@ -366,20 +399,87 @@ private void registerOpenTelemetryMetrics() {
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
telemetryMetrics.instrumentSentRequestCount =
writeMeter
.counterBuilder("append_requests_sent")
.setDescription("Counts number of requests sent over the network")
.build();
telemetryMetrics.instrumentSentRequestSize =
writeMeter
.counterBuilder("append_request_bytes_sent")
.setDescription("Counts byte size of requests sent over the network")
.build();
telemetryMetrics.instrumentSentRequestRows =
writeMeter
.counterBuilder("append_rows_sent")
.setDescription("Counts number of request rows sent over the network")
.build();
telemetryMetrics.instrumentAckedRequestCount =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.counterBuilder("append_requests_acked")
.setDescription("Counts number of requests acked by the server")
.build();
instrumentIncomingRequestSize =
telemetryMetrics.instrumentAckedRequestSize =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.counterBuilder("append_request_bytes_acked")
.setDescription("Counts byte size of requests acked by the server")
.build();
instrumentIncomingRequestRows =
telemetryMetrics.instrumentAckedRequestRows =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.counterBuilder("append_rows_acked")
.setDescription("Counts number of request rows acked by the server")
.build();
writeMeter
.gaugeBuilder("active_connection_count")
.ofLongs()
.setDescription("Reports number of active connections")
.buildWithCallback(
measurement -> {
int count = 0;
this.lock.lock();
try {
if (streamConnectionIsConnected) {
count = 1;
}
} finally {
this.lock.unlock();
}
measurement.record(count, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight_queue_length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
telemetryMetrics.instrumentNetworkResponseLatency =
writeMeter
.histogramBuilder("network_response_latency")
.ofLongs()
.setDescription(
"Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
.setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
.build();
telemetryMetrics.instrumentConnectionStartCount =
writeMeter
.counterBuilder("connection_start_count")
.setDescription(
"Counts number of connection attempts made, regardless of whether these are initial or retry.")
.build();
telemetryMetrics.instrumentConnectionEndCount =
writeMeter
.counterBuilder("connection_end_count")
.setDescription("Counts number of connection end events.")
.build();
}

Expand Down Expand Up @@ -469,6 +569,7 @@ public void run() {

private void resetConnection() {
log.info("Start connecting stream: " + streamName + " id: " + writerId);
telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes());
if (this.streamConnection != null) {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
Expand Down Expand Up @@ -615,9 +716,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -876,7 +974,8 @@ private void appendLoop() {
}
while (!localQueue.isEmpty()) {
localQueue.peekFirst().setRequestSendQueueTime();
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRequestAndResponse requestWrapper = localQueue.pollFirst();
AppendRowsRequest originalRequest = requestWrapper.message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
// Always respect the first writer schema seen by the loop.
if (writerSchema == null) {
Expand Down Expand Up @@ -928,6 +1027,15 @@ private void appendLoop() {
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
this.streamConnection.send(originalRequestBuilder.build());
Attributes telemetryAttributes = getTelemetryAttributes();
if (requestWrapper.retryCount > 0) {
telemetryAttributes = augmentAttributesWithRetry(telemetryAttributes);
}
telemetryMetrics.instrumentSentRequestCount.add(1, telemetryAttributes);
telemetryMetrics.instrumentSentRequestSize.add(
requestWrapper.messageSize, telemetryAttributes);
telemetryMetrics.instrumentSentRequestRows.add(
originalRequest.getProtoRows().getRows().getSerializedRowsCount(), telemetryAttributes);
}
}
cleanupConnectionAndRequests(/* avoidBlocking= */ false);
Expand Down Expand Up @@ -1195,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
Duration durationLatency = Duration.between(sendInstant, Instant.now());
telemetryMetrics.instrumentNetworkResponseLatency.record(
durationLatency.toMillis(), getTelemetryAttributes());
}

requestWrapper = pollFirstInflightRequestQueue();
} else if (inflightCleanuped) {
// It is possible when requestCallback is called, the inflight queue is already drained
Expand All @@ -1213,6 +1328,19 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}

Attributes augmentedTelemetryAttributes =
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[
response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()]
.toString());
telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestSize.add(
requestWrapper.messageSize, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestRows.add(
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
augmentedTelemetryAttributes);

// Retries need to happen on the same thread as queue locking may occur
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
Expand Down Expand Up @@ -1290,6 +1418,11 @@ private void doneCallback(Throwable finalStatus) {
this.lock.lock();
try {
this.streamConnectionIsConnected = false;
this.telemetryMetrics.instrumentConnectionEndCount.add(
1,
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()));
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
Expand All @@ -1301,6 +1434,8 @@ private void doneCallback(Throwable finalStatus) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
this.telemetryMetrics.instrumentConnectionStartCount.add(
1, augmentAttributesWithRetry(getTelemetryAttributes()));
log.info(
"Connection is going to be reestablished with the next request. Retriable error "
+ finalStatus.toString()
Expand Down

0 comments on commit 0b87d97

Please sign in to comment.