diff --git a/README.md b/README.md
index 889f0774a1..b521689b43 100644
--- a/README.md
+++ b/README.md
@@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.1'
+implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.2'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.1"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.2"
```
@@ -221,7 +221,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.1
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.2
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml
index 42680bc923..efe23cdbd0 100644
--- a/google-cloud-bigquerystorage/pom.xml
+++ b/google-cloud-bigquerystorage/pom.xml
@@ -156,6 +156,10 @@
google-auth-library-credentials
1.23.0
+
+ io.opentelemetry
+ opentelemetry-api
+
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index e295320153..ab0929e211 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -36,13 +36,21 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -253,6 +261,24 @@ class ConnectionWorker implements AutoCloseable {
static final Pattern DEFAULT_STREAM_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");
+ private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
+ private static Pattern streamPatternTable = Pattern.compile(tableMatching);
+ private Meter writeMeter;
+ static AttributeKey telemetryKeyTableId = AttributeKey.stringKey("table_id");
+ private static String dataflowPrefix = "dataflow:";
+ static List> telemetryKeysTraceId =
+ new ArrayList>() {
+ {
+ add(AttributeKey.stringKey("trace_field_1"));
+ add(AttributeKey.stringKey("trace_field_2"));
+ add(AttributeKey.stringKey("trace_field_3"));
+ }
+ };
+ private Attributes telemetryAttributes;
+ private LongCounter instrumentIncomingRequestCount;
+ private LongCounter instrumentIncomingRequestSize;
+ private LongCounter instrumentIncomingRequestRows;
+
public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
return matcher.matches();
@@ -278,6 +304,85 @@ static String getRoutingHeader(String streamName, String location) {
return project + "locations/" + location;
}
+ private String getTableName() {
+ Matcher tableMatcher = streamPatternTable.matcher(this.streamName);
+ return tableMatcher.find() ? tableMatcher.group(1) : "";
+ }
+
+ private void setTraceIdAttributesPart(
+ AttributesBuilder builder,
+ String[] traceIdParts,
+ int indexPartsToCheck,
+ int indexTelemetryKeysToUse) {
+ if ((indexPartsToCheck < traceIdParts.length) && !traceIdParts[indexPartsToCheck].isEmpty()) {
+ builder.put(
+ telemetryKeysTraceId.get(indexTelemetryKeysToUse), traceIdParts[indexPartsToCheck]);
+ }
+ }
+
+ private void setTraceIdAttributes(AttributesBuilder builder) {
+ if ((this.traceId != null) && !this.traceId.isEmpty()) {
+ int indexDataflow = this.traceId.toLowerCase().indexOf(dataflowPrefix);
+ if (indexDataflow >= 0) {
+ String[] traceIdParts =
+ this.traceId.substring(indexDataflow + dataflowPrefix.length()).split(":", 8);
+ setTraceIdAttributesPart(builder, traceIdParts, 0, 0);
+ setTraceIdAttributesPart(builder, traceIdParts, 1, 1);
+ setTraceIdAttributesPart(builder, traceIdParts, 2, 2);
+ }
+ }
+ }
+
+ private Attributes buildOpenTelemetryAttributes() {
+ AttributesBuilder builder = Attributes.builder();
+ String tableName = getTableName();
+ if (!tableName.isEmpty()) {
+ builder.put(telemetryKeyTableId, tableName);
+ }
+ setTraceIdAttributes(builder);
+ return builder.build();
+ }
+
+ private void refreshOpenTelemetryTableNameAttributes() {
+ String tableName = getTableName();
+ if (!tableName.isEmpty()
+ && !tableName.equals(getTelemetryAttributes().get(telemetryKeyTableId))) {
+ AttributesBuilder builder = getTelemetryAttributes().toBuilder();
+ builder.put(telemetryKeyTableId, tableName);
+ this.telemetryAttributes = builder.build();
+ }
+ }
+
+ @VisibleForTesting
+ Attributes getTelemetryAttributes() {
+ return telemetryAttributes;
+ }
+
+ private void registerOpenTelemetryMetrics() {
+ MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
+ writeMeter =
+ meterProvider
+ .meterBuilder("com.google.cloud.bigquery.storage.v1.write")
+ .setInstrumentationVersion(
+ ConnectionWorker.class.getPackage().getImplementationVersion())
+ .build();
+ instrumentIncomingRequestCount =
+ writeMeter
+ .counterBuilder("append_requests")
+ .setDescription("Counts number of incoming requests")
+ .build();
+ instrumentIncomingRequestSize =
+ writeMeter
+ .counterBuilder("append_request_bytes")
+ .setDescription("Counts byte size of incoming requests")
+ .build();
+ instrumentIncomingRequestRows =
+ writeMeter
+ .counterBuilder("append_rows")
+ .setDescription("Counts number of incoming request rows")
+ .build();
+ }
+
public ConnectionWorker(
String streamName,
String location,
@@ -312,6 +417,9 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
+ this.telemetryAttributes = buildOpenTelemetryAttributes();
+ registerOpenTelemetryMetrics();
+
// Always recreate a client for connection worker.
HashMap newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
@@ -507,6 +615,9 @@ private ApiFuture appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
+ instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
+ instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
+ instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
@@ -783,6 +894,7 @@ private void appendLoop() {
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
+ refreshOpenTelemetryTableNameAttributes();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForTableOrSchemaSwitch = true;
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
index 119b410c7a..185313f2b6 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
@@ -27,6 +27,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
@@ -238,9 +239,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows)
return append(streamWriter, rows, -1);
}
- /** Distributes the writing of a message to an underlying connection. */
- ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) {
- // We are in multiplexing mode after entering the following logic.
+ ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
ConnectionWorker connectionWorker;
lock.lock();
try {
@@ -277,6 +276,13 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows,
} finally {
lock.unlock();
}
+ return connectionWorker;
+ }
+
+ /** Distributes the writing of a message to an underlying connection. */
+ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) {
+ // We are in multiplexing mode after entering the following logic.
+ ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture responseFuture =
connectionWorker.append(streamWriter, rows, offset);
@@ -294,6 +300,12 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows,
MoreExecutors.directExecutor());
}
+ @VisibleForTesting
+ Attributes getTelemetryAttributes(StreamWriter streamWriter) {
+ ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
+ return connectionWorker.getTelemetryAttributes();
+ }
+
/**
* Create a new connection if we haven't reached current maximum, or reuse an existing connection
* with least load.
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java
new file mode 100644
index 0000000000..aae8cd99dd
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.logging.Logger;
+
+/** Container for global singleton objects. */
+class Singletons {
+
+ private static final Logger log = Logger.getLogger(Singletons.class.getName());
+
+ // Global OpenTelemetry instance
+ private static OpenTelemetry openTelemetry = null;
+
+ static OpenTelemetry getOpenTelemetry() {
+ if (openTelemetry == null) {
+ openTelemetry = GlobalOpenTelemetry.get();
+ }
+ return openTelemetry;
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index 5745f43c0c..95ec77b229 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -34,6 +34,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
@@ -172,6 +173,15 @@ public ApiFuture append(
}
}
+ @VisibleForTesting
+ Attributes getTelemetryAttributes(StreamWriter streamWriter) {
+ if (getKind() == Kind.CONNECTION_WORKER) {
+ return connectionWorker().getTelemetryAttributes();
+ } else {
+ return connectionWorkerPool().getTelemetryAttributes(streamWriter);
+ }
+ }
+
public void close(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER) {
connectionWorker().close();
@@ -459,6 +469,11 @@ public ApiFuture append(ProtoRows rows, long offset) {
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
}
+ @VisibleForTesting
+ Attributes getTelemetryAttributes() {
+ return this.singleConnectionOrConnectionPool.getTelemetryAttributes(this);
+ }
+
/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
index 3dab071d0d..936be9c8eb 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
@@ -33,6 +33,7 @@
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.StatusRuntimeException;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -754,6 +755,97 @@ public void testLongTimeIdleWontFail() throws Exception {
}
}
+ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, String expected)
+ throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ streamName,
+ null,
+ schema1,
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ null,
+ null,
+ client.getSettings(),
+ retrySettings);
+
+ Attributes attributes = connectionWorker.getTelemetryAttributes();
+ String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId);
+ assertEquals(expected, attributesTableId);
+ }
+
+ @Test
+ public void testOpenTelemetryAttributesWithStreamNames() throws Exception {
+ exerciseOpenTelemetryAttributesWithStreamNames(
+ "projects/my_project/datasets/my_dataset/tables/my_table/streams/my_stream",
+ "projects/my_project/datasets/my_dataset/tables/my_table");
+ exerciseOpenTelemetryAttributesWithStreamNames(
+ "projects/my_project/datasets/my_dataset/tables/my_table/",
+ "projects/my_project/datasets/my_dataset/tables/my_table");
+ exerciseOpenTelemetryAttributesWithStreamNames(
+ "projects/my_project/datasets/my_dataset/tables/", null);
+ }
+
+ void checkOpenTelemetryTraceIdAttribute(Attributes attributes, int index, String expected) {
+ String attributesTraceId = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(index));
+ assertEquals(expected, attributesTraceId);
+ }
+
+ void exerciseOpenTelemetryAttributesWithTraceId(
+ String traceId, String expectedField1, String expectedField2, String expectedField3)
+ throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_1,
+ null,
+ schema1,
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ traceId,
+ null,
+ client.getSettings(),
+ retrySettings);
+
+ Attributes attributes = connectionWorker.getTelemetryAttributes();
+ checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);
+ checkOpenTelemetryTraceIdAttribute(attributes, 1, expectedField2);
+ checkOpenTelemetryTraceIdAttribute(attributes, 2, expectedField3);
+ }
+
+ @Test
+ public void testOpenTelemetryAttributesWithTraceId() throws Exception {
+ exerciseOpenTelemetryAttributesWithTraceId(null, null, null, null);
+ exerciseOpenTelemetryAttributesWithTraceId("a:b:c", null, null, null);
+ exerciseOpenTelemetryAttributesWithTraceId(
+ "java-streamwriter:HEAD+20240508-1544 Dataflow:monorail-c-multi:2024-05-08_11_44_34-6968230696879535523:1972585693681960752",
+ "monorail-c-multi",
+ "2024-05-08_11_44_34-6968230696879535523",
+ "1972585693681960752");
+ exerciseOpenTelemetryAttributesWithTraceId(
+ "Dataflow:2024-04-26_23_19_08-12221961051154168466",
+ "2024-04-26_23_19_08-12221961051154168466",
+ null,
+ null);
+ exerciseOpenTelemetryAttributesWithTraceId(
+ "Dataflow:writeapi3:2024-04-03_03_49_33-845412829237675723:63737042897365355",
+ "writeapi3",
+ "2024-04-03_03_49_33-845412829237675723",
+ "63737042897365355");
+ exerciseOpenTelemetryAttributesWithTraceId(
+ "java-streamwriter Dataflow:pubsub-to-bq-staging-tongruil-1024-static:2024-05-14_15_13_14-5530509399715326669:4531186922674871499",
+ "pubsub-to-bq-staging-tongruil-1024-static",
+ "2024-05-14_15_13_14-5530509399715326669",
+ "4531186922674871499");
+ exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow :c", null, null, null);
+ exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null);
+ }
+
@Test
public void testLocationName() throws Exception {
assertEquals(
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 9211adeb94..43c90f64a4 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -56,6 +56,7 @@
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
@@ -823,6 +824,42 @@ public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
writer2.close();
}
+ @Test
+ public void testOpenTelemetryAttributes_MultiplexingCase() throws Exception {
+ ConnectionWorkerPool.setOptions(
+ Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
+ StreamWriter writer1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setWriterSchema(createProtoSchema())
+ .setLocation("US")
+ .setEnableConnectionPool(true)
+ .build();
+ StreamWriter writer2 =
+ StreamWriter.newBuilder(TEST_STREAM_2, client)
+ .setWriterSchema(createProtoSchema())
+ .setLocation("US")
+ .setEnableConnectionPool(true)
+ .build();
+
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+ testBigQueryWrite.addResponse(createAppendResponse(1));
+
+ ApiFuture appendFuture1 = sendTestMessage(writer1, new String[] {"A"});
+ assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
+ Attributes attributes = writer1.getTelemetryAttributes();
+ String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId);
+ assertEquals("projects/p/datasets/d1/tables/t1", attributesTableId);
+
+ ApiFuture appendFuture2 = sendTestMessage(writer2, new String[] {"A"});
+ assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
+ attributes = writer2.getTelemetryAttributes();
+ attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId);
+ assertEquals("projects/p/datasets/d2/tables/t2", attributesTableId);
+
+ writer1.close();
+ writer2.close();
+ }
+
@Test
public void testProtoSchemaPiping_nonMultiplexingCase() throws Exception {
ProtoSchema protoSchema = createProtoSchema();
diff --git a/pom.xml b/pom.xml
index 6b3e98435a..fa38b7b153 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,13 @@
json
20240303
-
+
+ io.opentelemetry
+ opentelemetry-bom
+ 1.38.0
+ pom
+ import
+