From 33d860a5d32edda92332d20c1733c00903657b36 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 18:53:35 -0800 Subject: [PATCH 1/4] feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones. --- .../storage/v1beta2/StreamWriterV2.java | 19 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..bebf672b9a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..2b8c89ed68 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -192,6 +193,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); } From 25d9874fd777ddf3936ee985b7e8751652044a05 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 18:53:35 -0800 Subject: [PATCH 2/4] feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones --- .../storage/v1beta2/StreamWriterV2.java | 19 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..bebf672b9a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -296,6 +296,7 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { + boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); while (!waitingQueueDrained()) { this.lock.lock(); @@ -322,7 +323,11 @@ private void appendLoop() { // TODO: Add reconnection here. while (!localQueue.isEmpty()) { - this.streamConnection.send(localQueue.pollFirst().message); + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; } } @@ -371,6 +376,18 @@ private void waitForDoneCallback() { } } + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + requestBuilder.setWriteStream(this.streamName); + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..2b8c89ed68 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -192,6 +193,19 @@ public void testAppendSuccess() throws Exception { } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i == 0) { + // First request received by server should have schema and stream name. + assertTrue(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + } else { + // Following request should not have schema and stream name. + assertFalse(serverRequest.getProtoRows().hasWriterSchema()); + assertEquals(serverRequest.getWriteStream(), ""); + } + } + writer.close(); } From 296062de490d999072872efa365f8fadcf545ca0 Mon Sep 17 00:00:00 2001 From: yayi Date: Thu, 25 Feb 2021 14:48:14 -0800 Subject: [PATCH 3/4] feat: StreamWriterV2 to support traceId --- .../storage/v1beta2/StreamWriterV2.java | 19 +++++++++++++++++-- .../storage/v1beta2/StreamWriterV2Test.java | 5 ++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6990f13173..157324ae0b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -41,8 +41,6 @@ * *

TODO: Attach schema. * - *

TODO: Attach traceId. - * *

TODO: Support batching. * *

TODO: Support schema change. @@ -69,6 +67,11 @@ public class StreamWriterV2 implements AutoCloseable { */ private final long maxInflightBytes; + /* + * TraceId for debugging purpose. + */ + private final String traceId; + /* * Tracks current inflight requests in the stream. */ @@ -137,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException { this.streamName = builder.streamName; this.maxInflightRequests = builder.maxInflightRequest; this.maxInflightBytes = builder.maxInflightBytes; + this.traceId = builder.traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); if (builder.client == null) { @@ -381,6 +385,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition( AppendRowsRequest.Builder requestBuilder = original.toBuilder(); if (isFirstRequest) { requestBuilder.setWriteStream(this.streamName); + if (this.traceId != null) { + requestBuilder.setTraceId(this.traceId); + } } else { requestBuilder.clearWriteStream(); requestBuilder.getProtoRowsBuilder().clearWriterSchema(); @@ -485,6 +492,8 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private String traceId = null; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -531,6 +540,12 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { return this; } + /** TraceId for debuging purpose. */ + public Builder setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriterV2 build() throws IOException { return new StreamWriterV2(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index f32487afb1..2a2cb77236 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -54,6 +54,7 @@ public class StreamWriterV2Test { private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName()); private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_TRACE_ID = "test trace id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; private static MockServiceHelper serviceHelper; @@ -84,7 +85,7 @@ public void tearDown() throws Exception { } private StreamWriterV2 getTestStreamWriterV2() throws IOException { - return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); + return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build(); } private AppendRowsRequest createAppendRequest(String[] messages, long offset) { @@ -205,10 +206,12 @@ public void testAppendSuccess() throws Exception { // First request received by server should have schema and stream name. assertTrue(serverRequest.getProtoRows().hasWriterSchema()); assertEquals(serverRequest.getWriteStream(), TEST_STREAM); + assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID); } else { // Following request should not have schema and stream name. assertFalse(serverRequest.getProtoRows().hasWriterSchema()); assertEquals(serverRequest.getWriteStream(), ""); + assertEquals(serverRequest.getTraceId(), ""); } } From 5ca7a289ce30a0c2da1ad2796f947637f71554eb Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 1 Mar 2021 18:16:05 -0800 Subject: [PATCH 4/4] Enforce traceId to be the format of A:B --- .../storage/v1beta2/StreamWriterV2.java | 10 +++++- .../storage/v1beta2/StreamWriterV2Test.java | 35 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 51296e9096..33abf53bf4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -600,8 +600,16 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { return this; } - /** TraceId for debuging purpose. */ + /** + * Sets traceId for debuging purpose. TraceId must follow the format of + * CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x. + */ public Builder setTraceId(String traceId) { + int colonIndex = traceId.indexOf(':'); + if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) { + throw new IllegalArgumentException( + "TraceId must follow the format of A:B. Actual:" + traceId); + } this.traceId = traceId; return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index fed7ca778d..69aa4341a0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -54,7 +54,7 @@ public class StreamWriterV2Test { private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName()); private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; - private static final String TEST_TRACE_ID = "test trace id"; + private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; private static MockServiceHelper serviceHelper; @@ -212,7 +212,10 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception { @Test public void testAppendWithRowsSuccess() throws Exception { StreamWriterV2 writer = - StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build(); + StreamWriterV2.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .build(); long appendCount = 100; for (int i = 0; i < appendCount; i++) { @@ -272,6 +275,34 @@ public void run() throws Throwable { assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided")); } + @Test + public void testInvalidTraceId() throws Exception { + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc"); + } + }); + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc:"); + } + }); + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriterV2.newBuilder(TEST_STREAM).setTraceId(":abc"); + } + }); + } + @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2();