diff --git a/README.md b/README.md index 889f0774a1..b521689b43 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.2' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.2" ``` @@ -221,7 +221,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.1 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.2 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 1faff53cd4..16632bc616 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -156,6 +156,10 @@ google-auth-library-credentials 1.23.0 + + io.opentelemetry + opentelemetry-api + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index e295320153..ab0929e211 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -36,13 +36,21 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -253,6 +261,24 @@ class ConnectionWorker implements AutoCloseable { static final Pattern DEFAULT_STREAM_PATTERN = Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$"); + private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/"; + private static Pattern streamPatternTable = Pattern.compile(tableMatching); + private Meter writeMeter; + static AttributeKey telemetryKeyTableId = AttributeKey.stringKey("table_id"); + private static String dataflowPrefix = "dataflow:"; + static List> telemetryKeysTraceId = + new ArrayList>() { + { + add(AttributeKey.stringKey("trace_field_1")); + add(AttributeKey.stringKey("trace_field_2")); + add(AttributeKey.stringKey("trace_field_3")); + } + }; + private Attributes telemetryAttributes; + private LongCounter instrumentIncomingRequestCount; + private LongCounter instrumentIncomingRequestSize; + private LongCounter instrumentIncomingRequestRows; + public static Boolean isDefaultStreamName(String streamName) { Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName); return matcher.matches(); @@ -278,6 +304,85 @@ static String getRoutingHeader(String streamName, String location) { return project + "locations/" + location; } + private String getTableName() { + Matcher tableMatcher = streamPatternTable.matcher(this.streamName); + return tableMatcher.find() ? tableMatcher.group(1) : ""; + } + + private void setTraceIdAttributesPart( + AttributesBuilder builder, + String[] traceIdParts, + int indexPartsToCheck, + int indexTelemetryKeysToUse) { + if ((indexPartsToCheck < traceIdParts.length) && !traceIdParts[indexPartsToCheck].isEmpty()) { + builder.put( + telemetryKeysTraceId.get(indexTelemetryKeysToUse), traceIdParts[indexPartsToCheck]); + } + } + + private void setTraceIdAttributes(AttributesBuilder builder) { + if ((this.traceId != null) && !this.traceId.isEmpty()) { + int indexDataflow = this.traceId.toLowerCase().indexOf(dataflowPrefix); + if (indexDataflow >= 0) { + String[] traceIdParts = + this.traceId.substring(indexDataflow + dataflowPrefix.length()).split(":", 8); + setTraceIdAttributesPart(builder, traceIdParts, 0, 0); + setTraceIdAttributesPart(builder, traceIdParts, 1, 1); + setTraceIdAttributesPart(builder, traceIdParts, 2, 2); + } + } + } + + private Attributes buildOpenTelemetryAttributes() { + AttributesBuilder builder = Attributes.builder(); + String tableName = getTableName(); + if (!tableName.isEmpty()) { + builder.put(telemetryKeyTableId, tableName); + } + setTraceIdAttributes(builder); + return builder.build(); + } + + private void refreshOpenTelemetryTableNameAttributes() { + String tableName = getTableName(); + if (!tableName.isEmpty() + && !tableName.equals(getTelemetryAttributes().get(telemetryKeyTableId))) { + AttributesBuilder builder = getTelemetryAttributes().toBuilder(); + builder.put(telemetryKeyTableId, tableName); + this.telemetryAttributes = builder.build(); + } + } + + @VisibleForTesting + Attributes getTelemetryAttributes() { + return telemetryAttributes; + } + + private void registerOpenTelemetryMetrics() { + MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider(); + writeMeter = + meterProvider + .meterBuilder("com.google.cloud.bigquery.storage.v1.write") + .setInstrumentationVersion( + ConnectionWorker.class.getPackage().getImplementationVersion()) + .build(); + instrumentIncomingRequestCount = + writeMeter + .counterBuilder("append_requests") + .setDescription("Counts number of incoming requests") + .build(); + instrumentIncomingRequestSize = + writeMeter + .counterBuilder("append_request_bytes") + .setDescription("Counts byte size of incoming requests") + .build(); + instrumentIncomingRequestRows = + writeMeter + .counterBuilder("append_rows") + .setDescription("Counts number of incoming request rows") + .build(); + } + public ConnectionWorker( String streamName, String location, @@ -312,6 +417,9 @@ public ConnectionWorker( this.inflightRequestQueue = new LinkedList(); this.compressorName = compressorName; this.retrySettings = retrySettings; + this.telemetryAttributes = buildOpenTelemetryAttributes(); + registerOpenTelemetryMetrics(); + // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); @@ -507,6 +615,9 @@ private ApiFuture appendInternal( + requestWrapper.messageSize))); return requestWrapper.appendResult; } + instrumentIncomingRequestCount.add(1, getTelemetryAttributes()); + instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes()); + instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount()); this.lock.lock(); try { if (userClosed) { @@ -783,6 +894,7 @@ private void appendLoop() { || (originalRequest.getProtoRows().hasWriterSchema() && !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) { streamName = originalRequest.getWriteStream(); + refreshOpenTelemetryTableNameAttributes(); writerSchema = originalRequest.getProtoRows().getWriterSchema(); isMultiplexing = true; firstRequestForTableOrSchemaSwitch = true; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 119b410c7a..185313f2b6 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -27,6 +27,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.util.Collections; import java.util.Comparator; @@ -238,9 +239,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows) return append(streamWriter, rows, -1); } - /** Distributes the writing of a message to an underlying connection. */ - ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { - // We are in multiplexing mode after entering the following logic. + ConnectionWorker getConnectionWorker(StreamWriter streamWriter) { ConnectionWorker connectionWorker; lock.lock(); try { @@ -277,6 +276,13 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, } finally { lock.unlock(); } + return connectionWorker; + } + + /** Distributes the writing of a message to an underlying connection. */ + ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { + // We are in multiplexing mode after entering the following logic. + ConnectionWorker connectionWorker = getConnectionWorker(streamWriter); Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = connectionWorker.append(streamWriter, rows, offset); @@ -294,6 +300,12 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, MoreExecutors.directExecutor()); } + @VisibleForTesting + Attributes getTelemetryAttributes(StreamWriter streamWriter) { + ConnectionWorker connectionWorker = getConnectionWorker(streamWriter); + return connectionWorker.getTelemetryAttributes(); + } + /** * Create a new connection if we haven't reached current maximum, or reuse an existing connection * with least load. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java new file mode 100644 index 0000000000..aae8cd99dd --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import java.util.logging.Logger; + +/** Container for global singleton objects. */ +class Singletons { + + private static final Logger log = Logger.getLogger(Singletons.class.getName()); + + // Global OpenTelemetry instance + private static OpenTelemetry openTelemetry = null; + + static OpenTelemetry getOpenTelemetry() { + if (openTelemetry == null) { + openTelemetry = GlobalOpenTelemetry.get(); + } + return openTelemetry; + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 5745f43c0c..95ec77b229 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -34,6 +34,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.time.Duration; import java.util.HashMap; @@ -172,6 +173,15 @@ public ApiFuture append( } } + @VisibleForTesting + Attributes getTelemetryAttributes(StreamWriter streamWriter) { + if (getKind() == Kind.CONNECTION_WORKER) { + return connectionWorker().getTelemetryAttributes(); + } else { + return connectionWorkerPool().getTelemetryAttributes(streamWriter); + } + } + public void close(StreamWriter streamWriter) { if (getKind() == Kind.CONNECTION_WORKER) { connectionWorker().close(); @@ -459,6 +469,11 @@ public ApiFuture append(ProtoRows rows, long offset) { return this.singleConnectionOrConnectionPool.append(this, rows, offset); } + @VisibleForTesting + Attributes getTelemetryAttributes() { + return this.singleConnectionOrConnectionPool.getTelemetryAttributes(this); + } + /** * Returns the wait of a request in Client side before sending to the Server. Request could wait * in Client because it reached the client side inflight request limit (adjustable when diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 3dab071d0d..936be9c8eb 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -33,6 +33,7 @@ import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -754,6 +755,97 @@ public void testLongTimeIdleWontFail() throws Exception { } } + private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, String expected) + throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker connectionWorker = + new ConnectionWorker( + streamName, + null, + schema1, + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + null, + null, + client.getSettings(), + retrySettings); + + Attributes attributes = connectionWorker.getTelemetryAttributes(); + String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId); + assertEquals(expected, attributesTableId); + } + + @Test + public void testOpenTelemetryAttributesWithStreamNames() throws Exception { + exerciseOpenTelemetryAttributesWithStreamNames( + "projects/my_project/datasets/my_dataset/tables/my_table/streams/my_stream", + "projects/my_project/datasets/my_dataset/tables/my_table"); + exerciseOpenTelemetryAttributesWithStreamNames( + "projects/my_project/datasets/my_dataset/tables/my_table/", + "projects/my_project/datasets/my_dataset/tables/my_table"); + exerciseOpenTelemetryAttributesWithStreamNames( + "projects/my_project/datasets/my_dataset/tables/", null); + } + + void checkOpenTelemetryTraceIdAttribute(Attributes attributes, int index, String expected) { + String attributesTraceId = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(index)); + assertEquals(expected, attributesTraceId); + } + + void exerciseOpenTelemetryAttributesWithTraceId( + String traceId, String expectedField1, String expectedField2, String expectedField3) + throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + schema1, + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + traceId, + null, + client.getSettings(), + retrySettings); + + Attributes attributes = connectionWorker.getTelemetryAttributes(); + checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1); + checkOpenTelemetryTraceIdAttribute(attributes, 1, expectedField2); + checkOpenTelemetryTraceIdAttribute(attributes, 2, expectedField3); + } + + @Test + public void testOpenTelemetryAttributesWithTraceId() throws Exception { + exerciseOpenTelemetryAttributesWithTraceId(null, null, null, null); + exerciseOpenTelemetryAttributesWithTraceId("a:b:c", null, null, null); + exerciseOpenTelemetryAttributesWithTraceId( + "java-streamwriter:HEAD+20240508-1544 Dataflow:monorail-c-multi:2024-05-08_11_44_34-6968230696879535523:1972585693681960752", + "monorail-c-multi", + "2024-05-08_11_44_34-6968230696879535523", + "1972585693681960752"); + exerciseOpenTelemetryAttributesWithTraceId( + "Dataflow:2024-04-26_23_19_08-12221961051154168466", + "2024-04-26_23_19_08-12221961051154168466", + null, + null); + exerciseOpenTelemetryAttributesWithTraceId( + "Dataflow:writeapi3:2024-04-03_03_49_33-845412829237675723:63737042897365355", + "writeapi3", + "2024-04-03_03_49_33-845412829237675723", + "63737042897365355"); + exerciseOpenTelemetryAttributesWithTraceId( + "java-streamwriter Dataflow:pubsub-to-bq-staging-tongruil-1024-static:2024-05-14_15_13_14-5530509399715326669:4531186922674871499", + "pubsub-to-bq-staging-tongruil-1024-static", + "2024-05-14_15_13_14-5530509399715326669", + "4531186922674871499"); + exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow :c", null, null, null); + exerciseOpenTelemetryAttributesWithTraceId("a:b dataflow:c:d", "c", "d", null); + } + @Test public void testLocationName() throws Exception { assertEquals( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 06a558e658..af7c98d611 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -56,6 +56,7 @@ import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; @@ -823,6 +824,42 @@ public void testOneMaxInflightRequests_MultiplexingCase() throws Exception { writer2.close(); } + @Test + public void testOpenTelemetryAttributes_MultiplexingCase() throws Exception { + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + StreamWriter writer1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setLocation("US") + .setEnableConnectionPool(true) + .build(); + StreamWriter writer2 = + StreamWriter.newBuilder(TEST_STREAM_2, client) + .setWriterSchema(createProtoSchema()) + .setLocation("US") + .setEnableConnectionPool(true) + .build(); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer1, new String[] {"A"}); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + Attributes attributes = writer1.getTelemetryAttributes(); + String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId); + assertEquals("projects/p/datasets/d1/tables/t1", attributesTableId); + + ApiFuture appendFuture2 = sendTestMessage(writer2, new String[] {"A"}); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + attributes = writer2.getTelemetryAttributes(); + attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId); + assertEquals("projects/p/datasets/d2/tables/t2", attributesTableId); + + writer1.close(); + writer2.close(); + } + @Test public void testProtoSchemaPiping_nonMultiplexingCase() throws Exception { ProtoSchema protoSchema = createProtoSchema(); diff --git a/pom.xml b/pom.xml index 490dfde116..868f6784f5 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,13 @@ json 20240303 - + + io.opentelemetry + opentelemetry-bom + 1.38.0 + pom + import +