diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 35d6f1da7..783a079f7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -245,6 +245,9 @@ class ConnectionWorker implements AutoCloseable { private final RequestProfiler.RequestProfilerHook requestProfilerHook; private final TelemetryMetrics telemetryMetrics; + /** Indicate whether this connection is created during multiplexing mode. */ + private final Boolean isMultiplexing; + private static String projectMatching = "projects/[^/]+/"; private static Pattern streamPatternProject = Pattern.compile(projectMatching); @@ -327,7 +330,8 @@ public ConnectionWorker( BigQueryWriteSettings clientSettings, RetrySettings retrySettings, boolean enableRequestProfiler, - boolean enableOpenTelemetry) + boolean enableOpenTelemetry, + boolean isMultiplexing) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -353,6 +357,7 @@ public ConnectionWorker( this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler); this.telemetryMetrics = new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId); + this.isMultiplexing = isMultiplexing; // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); @@ -744,8 +749,6 @@ private void appendLoop() { // Indicate whether we are at the first request after switching destination. // True means the schema and other metadata are needed. boolean firstRequestForTableOrSchemaSwitch = true; - // Represent whether we have entered multiplexing. - boolean isMultiplexing = false; while (!waitingQueueDrained()) { this.lock.lock(); @@ -848,7 +851,6 @@ private void appendLoop() { streamName = originalRequest.getWriteStream(); telemetryMetrics.refreshOpenTelemetryTableNameAttributes(getTableName()); writerSchema = originalRequest.getProtoRows().getWriterSchema(); - isMultiplexing = true; firstRequestForTableOrSchemaSwitch = true; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 40d4481e8..8550e553d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -413,7 +413,8 @@ private ConnectionWorker createConnectionWorker( clientSettings, retrySettings, enableRequestProfiler, - enableOpenTelemetry); + enableOpenTelemetry, + /*isMultiplexing=*/ true); connectionWorkerPool.add(connectionWorker); log.info( String.format( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index a4223adb6..2e5c5881c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -256,7 +256,8 @@ private StreamWriter(Builder builder) throws IOException { clientSettings, builder.retrySettings, builder.enableRequestProfiler, - builder.enableOpenTelemetry)); + builder.enableOpenTelemetry, + /*isMultiplexing=*/ false)); } else { if (!isDefaultStream(streamName)) { log.warning( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 2e540d7b2..8c6c8ae81 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -87,7 +87,7 @@ public void setUp() throws Exception { @Test public void testMultiplexedAppendSuccess() throws Exception { - try (ConnectionWorker connectionWorker = createConnectionWorker()) { + try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) { long appendCount = 20; for (long i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); @@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception { // We will get the request as the pattern of: // (writer_stream: t1, schema: t1) - // (writer_stream: _, schema: _) + // (writer_stream: t1, schema: _) // (writer_stream: t2, schema: t2) -> multiplexing entered. // (writer_stream: t2, schema: _) // (writer_stream: t1, schema: t1) @@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception { break; case 1: // The write stream is empty until we enter multiplexing. - if (i == 1) { - assertThat(serverRequest.getWriteStream()).isEmpty(); - } else { - assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); - } + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break; @@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception { @Test public void testAppendInSameStream_switchSchema() throws Exception { - try (ConnectionWorker connectionWorker = createConnectionWorker()) { + try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) { long appendCount = 20; for (long i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); @@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception { // We will get the request as the pattern of: // (writer_stream: t1, schema: schema1) - // (writer_stream: _, schema: _) + // (writer_stream: t1, schema: _) // (writer_stream: t1, schema: schema3) // (writer_stream: t1, schema: _) // (writer_stream: t1, schema: schema1) // (writer_stream: t1, schema: _) switch (i % 4) { case 0: - if (i == 0) { - assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); - } + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); assertThat( serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) .isEqualTo("foo"); break; case 1: - if (i == 1) { - assertThat(serverRequest.getWriteStream()).isEmpty(); - } else { - assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); - } + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break; @@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception { client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing=*/ false); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing=*/ true); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception { client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing=*/ true); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception { client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing=*/ true); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) { .build(); } - private ConnectionWorker createConnectionWorker() throws IOException { + private ConnectionWorker createMultiplexedConnectionWorker() throws IOException { // By default use only the first table as table reference. - return createConnectionWorker( + return createMultiplexedConnectionWorker( TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5)); } - private ConnectionWorker createConnectionWorker( + private ConnectionWorker createMultiplexedConnectionWorker( String streamName, String traceId, long maxRequests, @@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker( client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing=*/ true); } private ProtoSchema createProtoSchema(String protoName) { @@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing*/ false); org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2); testBigQueryWrite.setResponseSleep(durationSleep); @@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception { client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ false); + /*enableOpenTelemetry=*/ false, + /*isMultiplexing*/ false); long appendCount = 10; for (int i = 0; i < appendCount * 2; i++) { @@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ true); + /*enableOpenTelemetry=*/ true, + /*isMultiplexing*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); String attributesTableId = attributes.get(TelemetryMetrics.telemetryKeyTableId); @@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId( client.getSettings(), retrySettings, /*enableRequestProfiler=*/ false, - /*enableOpenTelemetry=*/ true); + /*enableOpenTelemetry=*/ true, + /*isMultiplexing*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 8468fb712..027dd3f15 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -949,11 +949,7 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception { assertEquals( appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance()); // Before entering multiplexing (i == 1) case, the write stream won't be populated. - if (i == 1) { - assertEquals(appendRowsRequest.getWriteStream(), ""); - } else { - assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1); - } + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1); } else if (i % 4 == 2) { assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2); assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);