From 33d860a5d32edda92332d20c1733c00903657b36 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 18:53:35 -0800 Subject: [PATCH 1/2] feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones. --- .../storage/v1beta2/StreamWriterV2.java | 19 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..bebf672b9a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..2b8c89ed68 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -192,6 +193,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); } From 25d9874fd777ddf3936ee985b7e8751652044a05 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 18:53:35 -0800 Subject: [PATCH 2/2] feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones --- .../storage/v1beta2/StreamWriterV2.java | 19 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..bebf672b9a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..2b8c89ed68 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -192,6 +193,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); }