diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 76c0ed9ad2..2f3ce676cd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -259,6 +259,7 @@ class ConnectionWorker implements AutoCloseable { private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/"; private static Pattern streamPatternTable = Pattern.compile(tableMatching); + private final boolean enableOpenTelemetry; private Meter writeMeter; static AttributeKey telemetryKeyTableId = AttributeKey.stringKey("table_id"); static AttributeKey telemetryKeyWriterId = AttributeKey.stringKey("writer_id"); @@ -480,7 +481,8 @@ public ConnectionWorker( @Nullable String compressorName, BigQueryWriteSettings clientSettings, RetrySettings retrySettings, - boolean enableRequestProfiler) + boolean enableRequestProfiler, + boolean enableOpenTelemetry) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -505,6 +507,7 @@ public ConnectionWorker( this.retrySettings = retrySettings; this.telemetryAttributes = buildOpenTelemetryAttributes(); this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler); + this.enableOpenTelemetry = enableOpenTelemetry; registerOpenTelemetryMetrics(); // Always recreate a client for connection worker. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 6e5188fb7d..40d4481e87 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -206,6 +206,7 @@ public abstract static class Builder { private static Settings settings = Settings.builder().build(); private final boolean enableRequestProfiler; + private final boolean enableOpenTelemetry; ConnectionWorkerPool( long maxInflightRequests, @@ -216,7 +217,8 @@ public abstract static class Builder { @Nullable String comperssorName, BigQueryWriteSettings clientSettings, RetrySettings retrySettings, - boolean enableRequestProfiler) { + boolean enableRequestProfiler, + boolean enableOpenTelemetry) { this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; @@ -227,6 +229,7 @@ public abstract static class Builder { this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); this.retrySettings = retrySettings; this.enableRequestProfiler = enableRequestProfiler; + this.enableOpenTelemetry = enableOpenTelemetry; } /** @@ -409,7 +412,8 @@ private ConnectionWorker createConnectionWorker( compressorName, clientSettings, retrySettings, - enableRequestProfiler); + enableRequestProfiler, + enableOpenTelemetry); connectionWorkerPool.add(connectionWorker); log.info( String.format( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 7cf0a4ec0c..83f45e4318 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -392,6 +392,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) { return this; } + /** Enable generation of metrics for OpenTelemetry. */ + public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) { + this.schemaAwareStreamWriterBuilder.setEnableOpenTelemetry(enableOpenTelemetry); + return this; + } + /** * Sets the default missing value interpretation value if the column is not presented in the * missing_value_interpretations map. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index c021980df5..8f45f0c5f5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -111,6 +111,7 @@ private SchemaAwareStreamWriter(Builder builder) if (builder.enableRequestProfiler) { requestProfilerHook.startPeriodicalReportFlushing(); } + streamWriterBuilder.setEnableOpenTelemetry(builder.enableOpenTelemetry); this.streamWriter = streamWriterBuilder.build(); this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; @@ -477,6 +478,7 @@ public static final class Builder { private String clientId; private boolean enableRequestProfiler = false; + private boolean enableOpenTelemetry = false; private static final String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -702,6 +704,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) { return this; } + /** Enable generation of metrics for OpenTelemetry. */ + public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) { + this.enableOpenTelemetry = enableOpenTelemetry; + return this; + } + /** * Builds SchemaAwareStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 5b73b3e918..fabcac3b0b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -254,7 +254,8 @@ private StreamWriter(Builder builder) throws IOException { builder.compressorName, clientSettings, builder.retrySettings, - builder.enableRequestProfiler)); + builder.enableRequestProfiler, + builder.enableOpenTelemetry)); } else { if (!isDefaultStream(streamName)) { log.warning( @@ -321,7 +322,8 @@ private StreamWriter(Builder builder) throws IOException { builder.compressorName, client.getSettings(), builder.retrySettings, - builder.enableRequestProfiler); + builder.enableRequestProfiler, + builder.enableOpenTelemetry); })); validateFetchedConnectonPool(builder); // If the client is not from outside, then shutdown the client we created. @@ -699,6 +701,7 @@ public static final class Builder { MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; private boolean enableRequestProfiler = false; + private boolean enableOpenTelemetry = false; private RetrySettings retrySettings = null; @@ -857,6 +860,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) { return this; } + /** Enable generation of metrics for OpenTelemetry. */ + public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) { + this.enableOpenTelemetry = enableOpenTelemetry; + return this; + } + /** * Enable client lib automatic retries on request level errors. * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index c628d9b23b..0ed2fc5705 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -565,6 +565,7 @@ ConnectionWorkerPool createConnectionWorkerPool( null, clientSettings, retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry*/ false); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 6bd3d93b1a..efcc253434 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -345,7 +345,8 @@ public void testAppendButInflightQueueFull() throws Exception { null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -403,7 +404,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -473,7 +475,8 @@ public void testLocationMismatch() throws Exception { null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -506,7 +509,8 @@ public void testStreamNameMismatch() throws Exception { null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -560,7 +564,8 @@ private ConnectionWorker createConnectionWorker( null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); } private ProtoSchema createProtoSchema(String protoName) { @@ -657,7 +662,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2); testBigQueryWrite.setResponseSleep(durationSleep); @@ -733,7 +739,8 @@ public void testLongTimeIdleWontFail() throws Exception { null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); long appendCount = 10; for (int i = 0; i < appendCount * 2; i++) { @@ -779,7 +786,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId); @@ -820,7 +828,8 @@ void exerciseOpenTelemetryAttributesWithTraceId( null, client.getSettings(), retrySettings, - /*enableRequestProfiler=*/ false); + /*enableRequestProfiler=*/ false, + /*enableOpenTelemetry=*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);