Skip to content

Commit

Permalink
Demonstrate implementing couple OpenTelemetry metrics in the core Java
Browse files Browse the repository at this point in the history
library.
  • Loading branch information
agrawal-siddharth committed May 14, 2024
1 parent cc9fdfd commit 5ad2501
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 1 deletion.
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<artifactId>google-auth-library-credentials</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -250,6 +255,11 @@ class ConnectionWorker implements AutoCloseable {
private static String projectMatching = "projects/[^/]+/";
private static Pattern streamPatternProject = Pattern.compile(projectMatching);

private Meter writeMeter;
private final Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;

static final Pattern DEFAULT_STREAM_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");

Expand Down Expand Up @@ -278,6 +288,70 @@ static String getRoutingHeader(String streamName, String location) {
return project + "locations/" + location;
}

private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder().put("writerId", this.writerId);
builder.put("streamId", this.streamName);
if (!this.traceId.isEmpty()) {
String[] traceIdParts = this.traceId.split(":", 5);
for (int i = 0; i < traceIdParts.length; i++) {
builder.put("traceField" + i, traceIdParts[i]);
}
}
return builder.build();
}

private void registerOpenTelemetryMetrics() {
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
writeMeter =
meterProvider
.meterBuilder("com.google.cloud.bigquery.storage.v1.write")
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
writeMeter
.counterBuilder("incoming-request-count")
.setDescription("Counts number of incoming requests")
.build();
instrumentIncomingRequestSize =
writeMeter
.counterBuilder("incoming-request-size")
.setDescription("Counts byte size of incoming requests")
.build();
writeMeter
.gaugeBuilder("waiting-queue-length")
.ofLongs()
.setDescription(
"Reports length of waiting queue. This queue contains requests buffered in the client and not yet sent to the server.")
.buildWithCallback(
result -> {
long waitQueueSize = 0;
this.lock.lock();
try {
waitQueueSize = this.waitingRequestQueue.size();
} finally {
this.lock.unlock();
}
result.record(waitQueueSize, telemetryAttributes);
});
writeMeter
.gaugeBuilder("inflight-queue-length")
.ofLongs()
.setDescription(
"Reports length of inflight queue. This queue contains sent append requests waiting for response from server.")
.buildWithCallback(
result -> {
long inflightQueueSize = 0;
this.lock.lock();
try {
inflightQueueSize = this.inflightRequestQueue.size();
} finally {
this.lock.unlock();
}
result.record(inflightQueueSize, telemetryAttributes);
});
}

public ConnectionWorker(
String streamName,
String location,
Expand Down Expand Up @@ -312,6 +386,9 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -506,6 +583,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, telemetryAttributes);
instrumentIncomingRequestSize.add(requestWrapper.messageSize, telemetryAttributes);
this.lock.lock();
try {
if (userClosed) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.util.logging.Logger;

/** Container for global singleton objects. */
class Singletons {

private static final Logger log = Logger.getLogger(Singletons.class.getName());

// Global OpenTelemetry instance
private static OpenTelemetry openTelemetry = null;

static OpenTelemetry getOpenTelemetry() {
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}
return openTelemetry;
}
}
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@
<artifactId>json</artifactId>
<version>20240303</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.38.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down

0 comments on commit 5ad2501

Please sign in to comment.