From cb18d288f78773af60b9fddc583f46a571cbc3f4 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Thu, 3 Nov 2022 23:27:24 -0700 Subject: [PATCH] feat: Add schema comparision to the main request loop for multiplexing to correctly update schema (#1865) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- README.md | 2 +- .../bigquery/storage/v1/ConnectionWorker.java | 22 ++-- .../storage/v1/ConnectionWorkerTest.java | 112 ++++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 111 ++++++++++++++++- 4 files changed, 234 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index b559028a9f..d4d0d70de7 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.1.3') +implementation platform('com.google.cloud:libraries-bom:26.1.4') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 74d0644902..aef972470a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -457,7 +457,6 @@ private void appendLoop() { && !streamName.isEmpty() && !originalRequest.getWriteStream().equals(streamName)) { streamName = originalRequest.getWriteStream(); - writerSchema = originalRequest.getProtoRows().getWriterSchema(); isMultiplexing = true; firstRequestForDestinationSwitch = true; } @@ -470,17 +469,22 @@ private void appendLoop() { if (this.traceId != null) { originalRequestBuilder.setTraceId(this.traceId); } - firstRequestForDestinationSwitch = false; - } else if (isMultiplexing) { - // If we are not at the first request after table switch, but we are in multiplexing - // mode, we only need the stream name but not the schema in the request. - originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); - } else { - // If we are not at the first request or in multiplexing, create request with no schema - // and no stream name. + } else if (!isMultiplexing) { + // If we are not in multiplexing and not in the first request, clear the stream name. originalRequestBuilder.clearWriteStream(); + } + + // We don't use message differencer to speed up the comparing process. + // `equals(...)` can bring us false positive, e.g. two repeated field can be considered the + // same but is not considered equals(). However as long as it's never provide false negative + // we will always correctly pass writer schema to backend. + if (firstRequestForDestinationSwitch + || !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) { + writerSchema = originalRequest.getProtoRows().getWriterSchema(); + } else { originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); } + firstRequestForDestinationSwitch = false; // Send should only throw an exception if there is a problem with the request. The catch // block will handle this case, and return the exception with the result. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index e6067be735..a2258ad430 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -169,6 +169,118 @@ public void testMultiplexedAppendSuccess() throws Exception { } } + @Test + public void testAppendInSameStream_switchSchema() throws Exception { + try (ConnectionWorker connectionWorker = createConnectionWorker()) { + long appendCount = 20; + for (long i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + + // Schema1 and schema2 are the same content, but different instance. + ProtoSchema schema1 = createProtoSchema("foo"); + ProtoSchema schema2 = createProtoSchema("foo"); + // Schema3 is a different schema + ProtoSchema schema3 = createProtoSchema("bar"); + + // We do a pattern of: + // send to stream1, schema1 + // send to stream1, schema2 + // send to stream1, schema3 + // send to stream1, schema3 + // send to stream1, schema1 + // ... + for (long i = 0; i < appendCount; i++) { + switch ((int) i % 4) { + case 0: + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema1, + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + break; + case 1: + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema2, + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + break; + case 2: + case 3: + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema3, + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + break; + default: // fall out + break; + } + } + // In the real world the response won't contain offset for default stream, but we use offset + // here just to test response. + for (int i = 0; i < appendCount; i++) { + Int64Value offset = futures.get(i).get().getAppendResult().getOffset(); + assertThat(offset).isEqualTo(Int64Value.of(i)); + } + assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + assertThat(serverRequest.getProtoRows().getRows().getSerializedRowsCount()) + .isGreaterThan(0); + assertThat(serverRequest.getOffset().getValue()).isEqualTo(i); + + // We will get the request as the pattern of: + // (writer_stream: t1, schema: schema1) + // (writer_stream: _, schema: _) + // (writer_stream: _, schema: schema3) + // (writer_stream: _, schema: _) + // (writer_stream: _, schema: schema1) + // (writer_stream: _, schema: _) + switch (i % 4) { + case 0: + if (i == 0) { + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); + } + assertThat( + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + .isEqualTo("foo"); + break; + case 1: + assertThat(serverRequest.getWriteStream()).isEmpty(); + // Schema is empty if not at the first request after table switch. + assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); + break; + case 2: + assertThat(serverRequest.getWriteStream()).isEmpty(); + // Schema is populated after table switch. + assertThat( + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + .isEqualTo("bar"); + break; + case 3: + assertThat(serverRequest.getWriteStream()).isEmpty(); + // Schema is empty if not at the first request after table switch. + assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); + break; + default: // fall out + break; + } + } + + assertThat(connectionWorker.getLoad().destinationCount()).isEqualTo(1); + assertThat(connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo(0); + } + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 3f029ac811..851abc9a49 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -61,8 +61,8 @@ @RunWith(JUnit4.class) public class StreamWriterTest { private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); - private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s"; - private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -112,13 +112,17 @@ private StreamWriter getTestStreamWriter() throws IOException { } private ProtoSchema createProtoSchema() { + return createProtoSchema("foo"); + } + + private ProtoSchema createProtoSchema(String fieldName) { return ProtoSchema.newBuilder() .setProtoDescriptor( DescriptorProtos.DescriptorProto.newBuilder() .setName("Message") .addField( DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("foo") + .setName(fieldName) .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) .setNumber(1) .build()) @@ -562,6 +566,107 @@ public void testOneMaxInflightRequests_MultiplexingCase() throws Exception { writer2.close(); } + @Test + public void testProtoSchemaPiping_nonMultiplexingCase() throws Exception { + ProtoSchema protoSchema = createProtoSchema(); + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(protoSchema) + .setMaxInflightBytes(1) + .build(); + long appendCount = 5; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i); + assertEquals(i, appendRowsRequest.getOffset().getValue()); + if (i == 0) { + appendRowsRequest.getProtoRows().getWriterSchema().equals(protoSchema); + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1); + } else { + appendRowsRequest.getProtoRows().getWriterSchema().equals(ProtoSchema.getDefaultInstance()); + } + } + writer.close(); + } + + @Test + public void testProtoSchemaPiping_multiplexingCase() throws Exception { + // Use the shared connection mode. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + ProtoSchema schema1 = createProtoSchema("Schema1"); + ProtoSchema schema2 = createProtoSchema("Schema2"); + StreamWriter writer1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(schema1) + .setLocation("US") + .setEnableConnectionPool(true) + .setMaxInflightRequests(1) + .build(); + StreamWriter writer2 = + StreamWriter.newBuilder(TEST_STREAM_2, client) + .setWriterSchema(schema2) + .setMaxInflightRequests(1) + .setEnableConnectionPool(true) + .setLocation("US") + .build(); + + long appendCountPerStream = 5; + for (int i = 0; i < appendCountPerStream * 4; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + List> futures = new ArrayList<>(); + // In total insert append `appendCountPerStream` * 4 requests. + // We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2 + for (int i = 0; i < appendCountPerStream; i++) { + futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4)); + futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1)); + futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2)); + futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3)); + } + + for (int i = 0; i < appendCountPerStream * 4; i++) { + AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i); + assertEquals(i, appendRowsRequest.getOffset().getValue()); + if (i % 4 == 0) { + assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema1); + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1); + } else if (i % 4 == 1) { + assertEquals( + appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance()); + // Before entering multiplexing (i == 1) case, the write stream won't be populated. + if (i == 1) { + assertEquals(appendRowsRequest.getWriteStream(), ""); + } else { + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1); + } + } else if (i % 4 == 2) { + assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2); + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2); + } else { + assertEquals( + appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance()); + assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2); + } + } + + writer1.close(); + writer2.close(); + } + @Test public void testAppendsWithTinyMaxInflightBytes() throws Exception { StreamWriter writer =