Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add opentelemetry counters for sent and acked messages #2532

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand All @@ -40,6 +41,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
Expand Down Expand Up @@ -259,6 +261,7 @@ class ConnectionWorker implements AutoCloseable {
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
Expand All @@ -268,10 +271,25 @@ class ConnectionWorker implements AutoCloseable {
add(AttributeKey.stringKey("trace_field_3"));
}
};
static AttributeKey<String> telemetryKeyErrorCode = AttributeKey.stringKey("error_code");
static AttributeKey<String> telemetryKeyIsRetry = AttributeKey.stringKey("is_retry");
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;
// Latency buckets are based on a list of 1.5 ^ n
private static final List<Long> METRICS_MILLISECONDS_LATENCY_BUCKETS =
ImmutableList.of(
0L, 17L, 38L, 86L, 195L, 438L, 985L, 2217L, 4988L, 11223L, 25251L, 56815L, 127834L,
287627L, 647160L);

private static final class OpenTelemetryMetrics {
private LongCounter instrumentAckedRequestCount;
private LongCounter instrumentAckedRequestSize;
private LongCounter instrumentAckedRequestRows;
private LongHistogram instrumentNetworkResponseLatency;
private LongCounter instrumentConnectionStartCount;
private LongCounter instrumentConnectionEndCount;
}

private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics();

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
Expand Down Expand Up @@ -327,16 +345,21 @@ private void setTraceIdAttributes(AttributesBuilder builder) {
}
}

// Specify common attributes for all metrics.
// For example, table name and writer id.
// Metrics dashboards can be filtered on available attributes.
private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder();
String tableName = getTableName();
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
builder.put(telemetryKeyWriterId, writerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment to buildOpenTelemetryAttributes, what kind of attributes it is building and does this apply to all metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

setTraceIdAttributes(builder);
return builder.build();
}

// Refresh the table name attribute when multiplexing switches between tables.
private void refreshOpenTelemetryTableNameAttributes() {
String tableName = getTableName();
if (!tableName.isEmpty()
Expand All @@ -347,6 +370,22 @@ private void refreshOpenTelemetryTableNameAttributes() {
}
}

// Build new attributes augmented with an error code string.
private Attributes augmentAttributesWithErrorCode(Attributes attributes, String errorCode) {
AttributesBuilder builder = attributes.toBuilder();
if ((errorCode != null) && !errorCode.isEmpty()) {
builder.put(telemetryKeyErrorCode, errorCode);
}
return builder.build();
}

// Build new attributes augmented with a flag indicating this was a retry.
private Attributes augmentAttributesWithRetry(Attributes attributes) {
AttributesBuilder builder = attributes.toBuilder();
builder.put(telemetryKeyIsRetry, "1");
return builder.build();
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
Expand All @@ -360,20 +399,72 @@ private void registerOpenTelemetryMetrics() {
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
telemetryMetrics.instrumentAckedRequestCount =
writeMeter
.counterBuilder("append_requests_acked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a TODO, I am wondering if it is possible to have a Retry attribute to the metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.setDescription("Counts number of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestSize =
writeMeter
.counterBuilder("append_request_bytes_acked")
.setDescription("Counts byte size of requests acked by the server")
.build();
telemetryMetrics.instrumentAckedRequestRows =
writeMeter
.counterBuilder("append_rows_acked")
.setDescription("Counts number of request rows acked by the server")
.build();
writeMeter
.gaugeBuilder("active_connection_count")
.ofLongs()
.setDescription("Reports number of active connections")
.buildWithCallback(
measurement -> {
int count = 0;
this.lock.lock();
try {
if (streamConnectionIsConnected) {
count = 1;
}
} finally {
this.lock.unlock();
}
measurement.record(count, getTelemetryAttributes());
});
writeMeter
.gaugeBuilder("inflight_queue_length")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will need connection_id as metrics field, otherwise it doesn't make too much sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added writer_id as an attribute.

.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from the server.")
.buildWithCallback(
measurement -> {
int length = 0;
this.lock.lock();
try {
length = inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
measurement.record(length, getTelemetryAttributes());
});
telemetryMetrics.instrumentNetworkResponseLatency =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.histogramBuilder("network_response_latency")
.ofLongs()
.setDescription(
"Reports time taken in milliseconds for a response to arrive once a message has been sent over the network.")
.setExplicitBucketBoundariesAdvice(METRICS_MILLISECONDS_LATENCY_BUCKETS)
.build();
instrumentIncomingRequestSize =
telemetryMetrics.instrumentConnectionStartCount =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.counterBuilder("connection_start_count")
.setDescription(
"Counts number of connection attempts made, regardless of whether these are initial or retry.")
.build();
instrumentIncomingRequestRows =
telemetryMetrics.instrumentConnectionEndCount =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.counterBuilder("connection_end_count")
.setDescription("Counts number of connection end events.")
.build();
}

Expand Down Expand Up @@ -465,6 +556,7 @@ public void run() {

private void resetConnection() {
log.info("Start connecting stream: " + streamName + " id: " + writerId);
telemetryMetrics.instrumentConnectionStartCount.add(1, getTelemetryAttributes());
if (this.streamConnection != null) {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
Expand Down Expand Up @@ -618,9 +710,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -1214,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
connectionRetryStartTime = 0;
}
if (!this.inflightRequestQueue.isEmpty()) {
Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp;
if (sendInstant != null) {
Duration durationLatency = Duration.between(sendInstant, Instant.now());
telemetryMetrics.instrumentNetworkResponseLatency.record(
durationLatency.toMillis(), getTelemetryAttributes());
}

requestWrapper = pollFirstInflightRequestQueue();
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
Expand All @@ -1234,6 +1330,22 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}

Attributes augmentedTelemetryAttributes =
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[
response.hasError() ? response.getError().getCode() : Status.Code.OK.ordinal()]
.toString());
if (requestWrapper.retryCount > 0) {
augmentedTelemetryAttributes = augmentAttributesWithRetry(augmentedTelemetryAttributes);
}
telemetryMetrics.instrumentAckedRequestCount.add(1, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestSize.add(
requestWrapper.messageSize, augmentedTelemetryAttributes);
telemetryMetrics.instrumentAckedRequestRows.add(
requestWrapper.message.getProtoRows().getRows().getSerializedRowsCount(),
augmentedTelemetryAttributes);

// Retries need to happen on the same thread as queue locking may occur
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
Expand Down Expand Up @@ -1316,6 +1428,11 @@ private void doneCallback(Throwable finalStatus) {
this.lock.lock();
try {
this.streamConnectionIsConnected = false;
this.telemetryMetrics.instrumentConnectionEndCount.add(
1,
augmentAttributesWithErrorCode(
getTelemetryAttributes(),
Code.values()[Status.fromThrowable(finalStatus).getCode().ordinal()].toString()));
if (connectionFinalStatus == null) {
if (connectionRetryStartTime == 0) {
connectionRetryStartTime = System.currentTimeMillis();
Expand All @@ -1327,6 +1444,8 @@ private void doneCallback(Throwable finalStatus) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
this.telemetryMetrics.instrumentConnectionStartCount.add(
1, augmentAttributesWithRetry(getTelemetryAttributes()));
log.info(
"Connection is going to be reestablished with the next request. Retriable error "
+ finalStatus.toString()
Expand Down
Loading