From d9914ea9f0311a6276bd751ad4948e84abe7cbaf Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Feb 2023 01:55:32 +0000 Subject: [PATCH 01/12] feat: add back header setting --- .../cloud/bigquery/storage/v1/ConnectionWorker.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 19d8911ee8..3445ea4906 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; @@ -236,7 +237,12 @@ public ConnectionWorker( this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); // Always recreate a client for connection worker. - this.client = BigQueryWriteClient.create(clientSettings); + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put("x-goog-request-params", "write_stream=" + streamName); + BigQueryWriteSettings stubSettings = + clientSettings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build(); + this.client = BigQueryWriteClient.create(stubSettings); this.appendThread = new Thread( From 3144544bae2da3ab3b91c08b6ba19b866b402cdb Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Feb 2023 01:55:59 +0000 Subject: [PATCH 02/12] . --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 3445ea4906..098503b717 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -241,7 +241,10 @@ public ConnectionWorker( newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); newHeaders.put("x-goog-request-params", "write_stream=" + streamName); BigQueryWriteSettings stubSettings = - clientSettings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build(); + clientSettings + .toBuilder() + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) + .build(); this.client = BigQueryWriteClient.create(stubSettings); this.appendThread = From 2aa87f9eed9180e66a7dbee6dabf80b8dd198814 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Feb 2023 02:03:27 +0000 Subject: [PATCH 03/12] . --- .../bigquery/storage/v1/ConnectionWorker.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 098503b717..be701dea09 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -205,6 +205,8 @@ class ConnectionWorker implements AutoCloseable { private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; private long testOnlyAppendLoopSleepTime = 0; + private BigQueryWriteSettings clientSettings = null; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -236,16 +238,7 @@ public ConnectionWorker( this.traceId = traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); - // Always recreate a client for connection worker. - HashMap newHeaders = new HashMap<>(); - newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); - newHeaders.put("x-goog-request-params", "write_stream=" + streamName); - BigQueryWriteSettings stubSettings = - clientSettings - .toBuilder() - .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) - .build(); - this.client = BigQueryWriteClient.create(stubSettings); + this.clientSettings = clientSettings; this.appendThread = new Thread( @@ -571,6 +564,20 @@ private void appendLoop() { // should happen before the call to resetConnection. As it is unknown when the connection // could be closed and the doneCallback called, and thus clearing the flag. lock.lock(); + if (this.client == null) { + // Always recreate a client for connection worker. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put( + "x-goog-request-params", + "write_stream=" + localQueue.pollFirst().message.getWriteStream()); + BigQueryWriteSettings stubSettings = + clientSettings + .toBuilder() + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) + .build(); + this.client = BigQueryWriteClient.create(stubSettings); + } try { this.streamConnectionIsConnected = true; } finally { From bd39dfb221801b0ed9db9e5ae835b03e738c00b6 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Feb 2023 02:06:44 +0000 Subject: [PATCH 04/12] . --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index be701dea09..4d09094dc3 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -513,7 +513,7 @@ public void close() { * * It takes requests from waiting queue and sends them to server. */ - private void appendLoop() { + private void appendLoop() throws IOException { Deque localQueue = new LinkedList(); boolean streamNeedsConnecting = false; @@ -570,7 +570,7 @@ private void appendLoop() { newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); newHeaders.put( "x-goog-request-params", - "write_stream=" + localQueue.pollFirst().message.getWriteStream()); + "write_stream=" + localQueue.peekFirst().message.getWriteStream()); BigQueryWriteSettings stubSettings = clientSettings .toBuilder() From e1b147d51df51612477608a0ac268f5af8d1a0a6 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Feb 2023 02:20:41 +0000 Subject: [PATCH 05/12] . --- .../bigquery/storage/v1/ConnectionWorker.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4d09094dc3..e2ea582c4d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -513,7 +513,7 @@ public void close() { * * It takes requests from waiting queue and sends them to server. */ - private void appendLoop() throws IOException { + private void appendLoop() { Deque localQueue = new LinkedList(); boolean streamNeedsConnecting = false; @@ -565,18 +565,31 @@ private void appendLoop() throws IOException { // could be closed and the doneCallback called, and thus clearing the flag. lock.lock(); if (this.client == null) { - // Always recreate a client for connection worker. - HashMap newHeaders = new HashMap<>(); - newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); - newHeaders.put( - "x-goog-request-params", - "write_stream=" + localQueue.peekFirst().message.getWriteStream()); - BigQueryWriteSettings stubSettings = - clientSettings - .toBuilder() - .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) - .build(); - this.client = BigQueryWriteClient.create(stubSettings); + try { + // Always recreate a client for connection worker. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put( + "x-goog-request-params", + "write_stream=" + localQueue.peekFirst().message.getWriteStream()); + BigQueryWriteSettings stubSettings = + clientSettings + .toBuilder() + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) + .build(); + this.client = BigQueryWriteClient.create(stubSettings); + } catch (IOException e) { + connectionFinalStatus = e; + while (!localQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); + requestWrapper.appendResult.setException(e); + } + while (!this.waitingRequestQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + requestWrapper.appendResult.setException(e); + } + return; + } } try { this.streamConnectionIsConnected = true; From 42b6066ca952de89086f4fbb44b09ead2cfab4ac Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 00:03:34 +0000 Subject: [PATCH 06/12] . --- .../bigquery/storage/v1/ConnectionWorker.java | 51 +++++++++---------- .../storage/v1/ConnectionWorkerPool.java | 7 +-- .../bigquery/storage/v1/StreamWriter.java | 1 + 3 files changed, 28 insertions(+), 31 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index e2ea582c4d..93d3898651 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -78,6 +78,11 @@ class ConnectionWorker implements AutoCloseable { */ private String streamName; + /* + * The location of this connection. + */ + private String location; + /* * The proto schema of rows to write. This schema can change during multiplexing. */ @@ -214,6 +219,7 @@ public static long getApiMaxRequestBytes() { public ConnectionWorker( String streamName, + String location, ProtoSchema writerSchema, long maxInflightRequests, long maxInflightBytes, @@ -226,6 +232,7 @@ public ConnectionWorker( this.hasMessageInWaitingQueue = lock.newCondition(); this.inflightReduced = lock.newCondition(); this.streamName = streamName; + this.location = location; this.maxRetryDuration = maxRetryDuration; if (writerSchema == null) { throw new StatusRuntimeException( @@ -238,7 +245,18 @@ public ConnectionWorker( this.traceId = traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); - this.clientSettings = clientSettings; + // Always recreate a client for connection worker. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put( + "x-goog-request-params", + "write_location=" + this.location); + BigQueryWriteSettings stubSettings = + clientSettings + .toBuilder() + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) + .build(); + this.client = BigQueryWriteClient.create(clientSettings); this.appendThread = new Thread( @@ -324,6 +342,10 @@ Boolean isUserClosed() { } } + String getWriteLocation() { + return this.location; + } + private ApiFuture appendInternal( StreamWriter streamWriter, AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); @@ -564,33 +586,6 @@ private void appendLoop() { // should happen before the call to resetConnection. As it is unknown when the connection // could be closed and the doneCallback called, and thus clearing the flag. lock.lock(); - if (this.client == null) { - try { - // Always recreate a client for connection worker. - HashMap newHeaders = new HashMap<>(); - newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); - newHeaders.put( - "x-goog-request-params", - "write_stream=" + localQueue.peekFirst().message.getWriteStream()); - BigQueryWriteSettings stubSettings = - clientSettings - .toBuilder() - .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) - .build(); - this.client = BigQueryWriteClient.create(stubSettings); - } catch (IOException e) { - connectionFinalStatus = e; - while (!localQueue.isEmpty()) { - AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); - requestWrapper.appendResult.setException(e); - } - while (!this.waitingRequestQueue.isEmpty()) { - AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); - requestWrapper.appendResult.setException(e); - } - return; - } - } try { this.streamConnectionIsConnected = true; } finally { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 8fcb84165e..55f5f644b0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -288,7 +288,7 @@ private ConnectionWorker createOrReuseConnectionWorker( String streamReference = streamWriter.getStreamName(); if (connectionWorkerPool.size() < currentMaxConnectionCount) { // Always create a new connection if we haven't reached current maximum. - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -304,7 +304,7 @@ private ConnectionWorker createOrReuseConnectionWorker( if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) { currentMaxConnectionCount = settings.maxConnectionsPerRegion(); } - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -359,7 +359,7 @@ static ConnectionWorker pickBestLoadConnection( * a single stream reference. This is because createConnectionWorker(...) is called via * computeIfAbsent(...) which is at most once per key. */ - private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema) + private ConnectionWorker createConnectionWorker(String streamName, String location, ProtoSchema writeSchema) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. @@ -368,6 +368,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w ConnectionWorker connectionWorker = new ConnectionWorker( streamName, + location, writeSchema, maxInflightRequests, maxInflightBytes, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ffc1290a78..b21a52a63d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -208,6 +208,7 @@ private StreamWriter(Builder builder) throws IOException { SingleConnectionOrConnectionPool.ofSingleConnection( new ConnectionWorker( builder.streamName, + builder.location, builder.writerSchema, builder.maxInflightRequest, builder.maxInflightBytes, From b0757efab021a654502b15d45b2b334525c49b8a Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 00:36:38 +0000 Subject: [PATCH 07/12] . --- .../bigquery/storage/v1/ConnectionWorker.java | 13 +++-- .../storage/v1/ConnectionWorkerPool.java | 12 +++-- .../storage/v1/ConnectionWorkerTest.java | 47 +++++++++++++++++++ 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 93d3898651..a249f2b231 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -248,9 +248,7 @@ public ConnectionWorker( // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); - newHeaders.put( - "x-goog-request-params", - "write_location=" + this.location); + newHeaders.put("x-goog-request-params", "write_location=" + this.location); BigQueryWriteSettings stubSettings = clientSettings .toBuilder() @@ -317,6 +315,15 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { + if (streamWriter.getLocation() != this.location) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "StreamWriter with location " + + streamWriter.getLocation() + + " is scheduled to use a connection with location " + + this.location)); + } Preconditions.checkNotNull(streamWriter); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 55f5f644b0..83be8ce52a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -288,7 +288,8 @@ private ConnectionWorker createOrReuseConnectionWorker( String streamReference = streamWriter.getStreamName(); if (connectionWorkerPool.size() < currentMaxConnectionCount) { // Always create a new connection if we haven't reached current maximum. - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); + return createConnectionWorker( + streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -304,7 +305,10 @@ private ConnectionWorker createOrReuseConnectionWorker( if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) { currentMaxConnectionCount = settings.maxConnectionsPerRegion(); } - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); + return createConnectionWorker( + streamWriter.getStreamName(), + streamWriter.getLocation(), + streamWriter.getProtoSchema()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -359,8 +363,8 @@ static ConnectionWorker pickBestLoadConnection( * a single stream reference. This is because createConnectionWorker(...) is called via * computeIfAbsent(...) which is at most once per key. */ - private ConnectionWorker createConnectionWorker(String streamName, String location, ProtoSchema writeSchema) - throws IOException { + private ConnectionWorker createConnectionWorker( + String streamName, String location, ProtoSchema writeSchema) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index fbd0850ee0..7137c23e93 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -309,6 +309,7 @@ public void testAppendButInflightQueueFull() throws Exception { ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, + "us", createProtoSchema("foo"), 6, 100000, @@ -360,6 +361,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, + "us", createProtoSchema("foo"), 100000, 100000, @@ -411,6 +413,50 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen."); } + @Test + public void testLocationMismatch() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(schema1) + .setLocation("eu") + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); + ConnectionWorker.setMaxInflightQueueWaitTime(500); + + connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop( + new RuntimeException("Any exception can happen.")); + // Sleep 1 second before erroring out. + connectionWorker.setTestOnlyAppendLoopSleepTime(1000L); + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(0)}), + 0)); + assertThat( + ex.getMessage() + .contains( + "StreamWriter with location eu is scheduled to use a connection with location us")); + } + @Test public void testExponentialBackoff() throws Exception { assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1); @@ -440,6 +486,7 @@ private ConnectionWorker createConnectionWorker( throws IOException { return new ConnectionWorker( streamName, + "us", createProtoSchema("foo"), maxRequests, maxBytes, From c6631551dc60069d509acdf4fdb76ee708d08a9f Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 00:51:35 +0000 Subject: [PATCH 08/12] . --- .../bigquery/storage/v1/ConnectionWorker.java | 24 ++++++++-- .../storage/v1/ConnectionWorkerTest.java | 47 ++++++++++++++----- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index a249f2b231..0a4c812182 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -81,7 +81,7 @@ class ConnectionWorker implements AutoCloseable { /* * The location of this connection. */ - private String location; + private String location = null; /* * The proto schema of rows to write. This schema can change during multiplexing. @@ -232,7 +232,9 @@ public ConnectionWorker( this.hasMessageInWaitingQueue = lock.newCondition(); this.inflightReduced = lock.newCondition(); this.streamName = streamName; - this.location = location; + if (location != null && !location.isEmpty()) { + this.location = location; + } this.maxRetryDuration = maxRetryDuration; if (writerSchema == null) { throw new StatusRuntimeException( @@ -248,7 +250,11 @@ public ConnectionWorker( // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); - newHeaders.put("x-goog-request-params", "write_location=" + this.location); + if (this.location == null) { + newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName); + } else { + newHeaders.put("x-goog-request-params", "write_location=" + this.location); + } BigQueryWriteSettings stubSettings = clientSettings .toBuilder() @@ -315,7 +321,8 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { - if (streamWriter.getLocation() != this.location) { + if (this.location != null && this.location != streamWriter.getLocation()) { + log.info("111111"); throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription( @@ -323,6 +330,15 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, + streamWriter.getLocation() + " is scheduled to use a connection with location " + this.location)); + } else if (this.location == null && streamWriter.getStreamName() != this.streamName) { + log.info("2222222"); + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "StreamWriter with stream name " + + streamWriter.getStreamName() + + " is scheduled to use a connection with stream name " + + this.streamName)); } Preconditions.checkNotNull(streamWriter); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 7137c23e93..cca35a2b8b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,6 +47,7 @@ @RunWith(JUnit4.class) public class ConnectionWorkerTest { + private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1"; private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; @@ -432,16 +434,36 @@ public void testLocationMismatch() throws Exception { FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, client.getSettings()); - testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); - ConnectionWorker.setMaxInflightQueueWaitTime(500); - - connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop( - new RuntimeException("Any exception can happen.")); - // Sleep 1 second before erroring out. - connectionWorker.setTestOnlyAppendLoopSleepTime(1000L); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(0)}), + 0)); + assertEquals( + "INVALID_ARGUMENT: StreamWriter with location eu is scheduled to use a connection with location us", + ex.getMessage()); + } - // In total insert 5 requests, - List> futures = new ArrayList<>(); + @Test + public void testStreamNameMismatch() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_2, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -451,10 +473,9 @@ public void testLocationMismatch() throws Exception { sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0)); - assertThat( - ex.getMessage() - .contains( - "StreamWriter with location eu is scheduled to use a connection with location us")); + assertEquals( + "INVALID_ARGUMENT: StreamWriter with stream name projects/p1/datasets/d1/tables/t1/streams/s1 is scheduled to use a connection with stream name projects/p2/datasets/d2/tables/t2/streams/s2", + ex.getMessage()); } @Test From 4d8881c740f3a9b406fe788ac2071d6e6a301b49 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 00:54:44 +0000 Subject: [PATCH 09/12] . --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 0a4c812182..c9976d149a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -210,8 +210,6 @@ class ConnectionWorker implements AutoCloseable { private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; private long testOnlyAppendLoopSleepTime = 0; - private BigQueryWriteSettings clientSettings = null; - /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) From cf802a2bbacb061875b0cc96548c5bcdfa747e43 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 00:56:17 +0000 Subject: [PATCH 10/12] . --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index c9976d149a..d63e0871d4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -320,7 +320,6 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { if (this.location != null && this.location != streamWriter.getLocation()) { - log.info("111111"); throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription( @@ -329,7 +328,6 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, + " is scheduled to use a connection with location " + this.location)); } else if (this.location == null && streamWriter.getStreamName() != this.streamName) { - log.info("2222222"); throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription( From 48b78c79c63ca08c52e46ef55b057d260ce3aa1d Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 21:38:46 +0000 Subject: [PATCH 11/12] . --- .../com/google/cloud/bigquery/storage/v1/ConnectionWorker.java | 1 + .../cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index d63e0871d4..835d411714 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -328,6 +328,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, + " is scheduled to use a connection with location " + this.location)); } else if (this.location == null && streamWriter.getStreamName() != this.streamName) { + // Location is null implies this is non-multiplexed connection. throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 980772b2ff..e558d567c8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -430,6 +430,7 @@ private StreamWriter getTestStreamWriter(String streamName) throws IOException { return StreamWriter.newBuilder(streamName) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) + .setLocation("us") .setCredentialsProvider(NoCredentialsProvider.create()) .setChannelProvider(serviceHelper.createChannelProvider()) .build(); From 73832c908f421786098bfbacbb1c101b94450163 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 28 Feb 2023 21:52:57 +0000 Subject: [PATCH 12/12] . --- .../storage/v1/ConnectionWorkerTest.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index cca35a2b8b..13711bddd0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -86,10 +86,12 @@ public void testMultiplexedAppendSuccess() throws Exception { StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema("foo")) + .setLocation("us") .build(); StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_2, client) .setWriterSchema(createProtoSchema("complicate")) + .setLocation("us") .build(); // We do a pattern of: // send to stream1, string1 @@ -207,11 +209,20 @@ public void testAppendInSameStream_switchSchema() throws Exception { // send to stream1, schema1 // ... StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); StreamWriter sw2 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema2) + .build(); StreamWriter sw3 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema3) + .build(); for (long i = 0; i < appendCount; i++) { switch ((int) i % 4) { case 0: @@ -307,7 +318,10 @@ public void testAppendInSameStream_switchSchema() throws Exception { public void testAppendButInflightQueueFull() throws Exception { ProtoSchema schema1 = createProtoSchema("foo"); StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, @@ -359,7 +373,10 @@ public void testAppendButInflightQueueFull() throws Exception { public void testThrowExceptionWhileWithinAppendLoop() throws Exception { ProtoSchema schema1 = createProtoSchema("foo"); StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1,