From e5d93ae1d0b057bf7a9ab327d50cbda3f40ddff1 Mon Sep 17 00:00:00 2001 From: yifatgortler Date: Tue, 30 Jul 2024 06:55:00 +0000 Subject: [PATCH 1/3] Persist missingValueInterpretationMap in StreamWriter's Builder. In case the StreamWriter is recreated, the map will be used in the new StreamWriter too. --- .../clirr-ignored-differences.xml | 25 +++++ .../bigquery/storage/v1/JsonStreamWriter.java | 33 +++--- .../storage/v1/SchemaAwareStreamWriter.java | 35 +++--- .../bigquery/storage/v1/StreamWriter.java | 40 ++++--- .../storage/v1/JsonStreamWriterTest.java | 100 ++++++++++++++++-- .../bigquery/storage/v1/StreamWriterTest.java | 65 +++++++----- 6 files changed, 210 insertions(+), 88 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index da5460a154..bcb75e3ca8 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -197,5 +197,30 @@ 1001 com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool + + 7002 + com/google/cloud/bigquery/storage/v1/JsonStreamWriter + java.util.Map getMissingValueInterpretationMap() + + + 7002 + com/google/cloud/bigquery/storage/v1/JsonStreamWriter + void setMissingValueInterpretationMap(java.util.Map) + + + 7002 + com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter + java.util.Map getMissingValueInterpretationMap() + + + 7002 + com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter + void setMissingValueInterpretationMap(java.util.Map) + + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter + void setMissingValueInterpretationMap(java.util.Map) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 83f45e4318..f16cee9b54 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -119,24 +119,6 @@ public long getInflightWaitSeconds() { return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } - /** - * Sets the missing value interpretation map for the JsonStreamWriter. The input - * missingValueInterpretationMap is used for all append requests unless otherwise changed. - * - * @param missingValueInterpretationMap the missing value interpretation map used by the - * JsonStreamWriter. - */ - public void setMissingValueInterpretationMap( - Map missingValueInterpretationMap) { - this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); - } - - /** @return the missing value interpretation map used for the writer. */ - public Map - getMissingValueInterpretationMap() { - return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); - } - /** * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by * StreamWriter by default. @@ -414,6 +396,21 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the JsonStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * JsonStreamWriter. + * @return Builder + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.schemaAwareStreamWriterBuilder.setMissingValueInterpretationMap( + missingValueInterpretationMap); + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 8f45f0c5f5..0c4b76eba5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -105,6 +105,7 @@ private SchemaAwareStreamWriter(Builder builder) streamWriterBuilder.setLocation(builder.location); streamWriterBuilder.setDefaultMissingValueInterpretation( builder.defaultMissingValueInterpretation); + streamWriterBuilder.setMissingValueInterpretationMap(builder.missingValueInterpretationMap); streamWriterBuilder.setClientId(builder.clientId); streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler); requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); @@ -298,24 +299,6 @@ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); } - /** - * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input - * missingValueInterpretationMap is used for all append requests unless otherwise changed. - * - * @param missingValueInterpretationMap the missing value interpretation map used by the - * SchemaAwareStreamWriter. - */ - public void setMissingValueInterpretationMap( - Map missingValueInterpretationMap) { - streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); - } - - /** @return the missing value interpretation map used for the writer. */ - public Map - getMissingValueInterpretationMap() { - return streamWriter.getMissingValueInterpretationMap(); - } - /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, @@ -475,6 +458,8 @@ public static final class Builder { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private Map + missingValueInterpretationMap = new HashMap(); private String clientId; private boolean enableRequestProfiler = false; @@ -684,6 +669,20 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * SchemaAwareStreamWriter. + * @return Builder + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.missingValueInterpretationMap = missingValueInterpretationMap; + return this; + } + /** * Sets the RetrySettings to use for in-stream error retry. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index fabcac3b0b..a4223adb60 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -68,11 +68,6 @@ public class StreamWriter implements AutoCloseable { // Cache of location info for a given dataset. private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>(); - // Map of fields to their MissingValueInterpretation, which dictates how a field should be - // populated when it is missing from an input user row. - private Map missingValueInterpretationMap = - new HashMap(); - /* * The identifier of stream to write to. */ @@ -103,6 +98,11 @@ public class StreamWriter implements AutoCloseable { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + // Map of fields to their MissingValueInterpretation, which dictates how a field should be + // populated when it is missing from an input user row. + private Map missingValueInterpretationMap = + new HashMap(); + /** * Stream can access a single connection or a pool of connection depending on whether multiplexing * is enabled. @@ -229,6 +229,7 @@ private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation; + this.missingValueInterpretationMap = builder.missingValueInterpretationMap; BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); @@ -420,18 +421,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { } } - /** - * Sets the missing value interpretation map for the stream writer. The input - * missingValueInterpretationMap is used for all write requests unless otherwise changed. - * - * @param missingValueInterpretationMap the missing value interpretation map used by stream - * writer. - */ - public void setMissingValueInterpretationMap( - Map missingValueInterpretationMap) { - this.missingValueInterpretationMap = missingValueInterpretationMap; - } - /** * Schedules the writing of rows at the end of current stream. * @@ -700,6 +689,9 @@ public static final class Builder { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private Map + missingValueInterpretationMap = new HashMap(); + private boolean enableRequestProfiler = false; private boolean enableOpenTelemetry = false; @@ -851,6 +843,20 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the stream writer. The input + * missingValueInterpretationMap is used for all write requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by stream + * writer. + * @return Builder + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.missingValueInterpretationMap = missingValueInterpretationMap; + return this; + } + /** * Enable a latency profiler that would periodically generate a detailed latency report for the * top latency requests. This is currently an experimental API. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 4aea9c1af5..9e9e56a688 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -868,6 +868,91 @@ public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Ex } } + @Test + public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + Map missingValueMap = new HashMap<>(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .setMissingValueInterpretationMap(missingValueMap) + .build()) { + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + + // First batch of appends. First append request will return an updated-schema, but the second + // and maybe the third append will be processed before the first response will refresh the + // StreamWriter. + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = writer.append(jsonArr); + ApiFuture appendFuture2 = writer.append(jsonArr); + ApiFuture appendFuture3 = writer.append(jsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + + // Another append, this time with columns to match the updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + ApiFuture appendFuture4 = writer.append(updatedJsonArr); + + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + assertEquals(4, testBigQueryWrite.getAppendRequests().size()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString()); + + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() + || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); + + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(), + MissingValueInterpretation.DEFAULT_VALUE); + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getMissingValueInterpretations(), + missingValueMap); + } + } + @Test public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); @@ -1523,14 +1608,15 @@ public void testAppendWithMissingValueMap() throws Exception { JSONArray jsonArr = new JSONArray(); jsonArr.put(flexible); - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) { + Map missingValueMap = new HashMap<>(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - Map missingValueMap = new HashMap<>(); - missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); - missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); - assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema) + .setMissingValueInterpretationMap(missingValueMap) + .setTraceId("test:empty") + .build()) { testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 5bd242e0ee..1aa43c92ce 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -171,12 +171,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException { .build(); } - private StreamWriter getTestStreamWriter() throws IOException { + private StreamWriter.Builder getTestStreamWriterBuilder() throws IOException { return StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) - .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) - .build(); + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)); + } + + private StreamWriter getTestStreamWriter() throws IOException { + return getTestStreamWriterBuilder().build(); } private StreamWriter getTestStreamWriterRetryEnabled() throws IOException { @@ -1583,47 +1586,53 @@ public void testCloseDisconnectedStream() throws Exception { @Test public void testSetAndGetMissingValueInterpretationMap() throws Exception { - StreamWriter writer = getTestStreamWriter(); + StreamWriter.Builder writerBuilder = getTestStreamWriterBuilder(); Map missingValueMap = new HashMap(); missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); + writerBuilder.setMissingValueInterpretationMap(missingValueMap); + StreamWriter writer = writerBuilder.build(); assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); } @Test - public void testAppendWithMissingValueMap() throws Exception { - StreamWriter writer = getTestStreamWriter(); + public void testAppendWithoutMissingValueMap() throws Exception { + try (StreamWriter writer = getTestStreamWriter()) { - long appendCount = 2; - testBigQueryWrite.addResponse(createAppendResponse(0)); - testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(0)); - List> futures = new ArrayList<>(); - // The first append doesn't use a missing value map. - futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0)); + ApiFuture responseFuture = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); - // The second append uses a missing value map. + assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue()); + + verifyAppendRequests(1); + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations().isEmpty()); + } + } + + @Test + public void testAppendWithMissingValueMap() throws Exception { Map missingValueMap = new HashMap(); missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); - futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1)); - for (int i = 0; i < appendCount; i++) { - assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); - } + try (StreamWriter writer = + getTestStreamWriterBuilder().setMissingValueInterpretationMap(missingValueMap).build()) { - // Ensure that the AppendRowsRequest for the first append operation does not have a missing - // value map, and that the second AppendRowsRequest has the missing value map provided in the - // second append. - verifyAppendRequests(appendCount); - AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0); - AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1); - assertTrue(request1.getMissingValueInterpretations().isEmpty()); - assertEquals(request2.getMissingValueInterpretations(), missingValueMap); + testBigQueryWrite.addResponse(createAppendResponse(0)); - writer.close(); + ApiFuture responseFuture = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + + assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue()); + + verifyAppendRequests(1); + + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(), + missingValueMap); + } } @Test(timeout = 10000) From 94cb8d02b678fa8639ebd339ead07d0248efa26f Mon Sep 17 00:00:00 2001 From: yifatgortler Date: Tue, 30 Jul 2024 09:46:25 +0000 Subject: [PATCH 2/3] Fix format --- .../com/google/cloud/bigquery/storage/v1/StreamWriterTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 1aa43c92ce..8468fb712f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1607,7 +1607,8 @@ public void testAppendWithoutMissingValueMap() throws Exception { assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue()); verifyAppendRequests(1); - assertTrue(testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations().isEmpty()); + assertTrue( + testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations().isEmpty()); } } From 99a6a8881fd99d29588b413e555ba996d2f2886a Mon Sep 17 00:00:00 2001 From: yifatgortler Date: Wed, 25 Sep 2024 19:07:50 +0000 Subject: [PATCH 3/3] Addressed comments: Revived the getMissingValueInterpretationMap method and added Integration Test. --- .../clirr-ignored-differences.xml | 10 -- .../bigquery/storage/v1/JsonStreamWriter.java | 6 + .../storage/v1/SchemaAwareStreamWriter.java | 6 + .../storage/v1/JsonStreamWriterTest.java | 6 + .../it/ITBigQueryWriteManualClientTest.java | 123 ++++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index bcb75e3ca8..8e6c8958df 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -197,21 +197,11 @@ 1001 com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool - - 7002 - com/google/cloud/bigquery/storage/v1/JsonStreamWriter - java.util.Map getMissingValueInterpretationMap() - 7002 com/google/cloud/bigquery/storage/v1/JsonStreamWriter void setMissingValueInterpretationMap(java.util.Map) - - 7002 - com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter - java.util.Map getMissingValueInterpretationMap() - 7002 com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index f16cee9b54..d6446113bd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -119,6 +119,12 @@ public long getInflightWaitSeconds() { return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); + } + /** * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by * StreamWriter by default. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 0c4b76eba5..b16a1d81de 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -299,6 +299,12 @@ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); } + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return streamWriter.getMissingValueInterpretationMap(); + } + /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 9e9e56a688..c6e920192b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -893,6 +893,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); testBigQueryWrite.addResponse(createAppendResponse(1)); + // Verify the map before the writer is refreshed + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); testBigQueryWrite.addResponse(createAppendResponse(2)); testBigQueryWrite.addResponse(createAppendResponse(3)); @@ -944,6 +946,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); + // Verify the map after the writer is refreshed + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); assertEquals( testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(), MissingValueInterpretation.DEFAULT_VALUE); @@ -1618,6 +1622,8 @@ public void testAppendWithMissingValueMap() throws Exception { .setTraceId("test:empty") .build()) { + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 9d29d4cfd2..58f80dbce4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() } } + @Test + public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap() + throws DescriptorValidationException, ExecutionException, IOException, InterruptedException, + ParseException { + String tableName = "SchemaUpdateMissingValueMapTestTable"; + TableId tableId = TableId.of(DATASET, tableName); + tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + Map missingValueMap = new HashMap<>(); + missingValueMap.put( + "foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + missingValueMap.put( + "date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), client) + .setMissingValueInterpretationMap(missingValueMap) + .build()) { + // Verify the missing value map + assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap()); + + // First append with the current schema + JSONObject jsonObject = new JSONObject(); + jsonObject.put("bar_without_default", "existing_col_before_update"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(jsonObject); + ApiFuture response1 = jsonStreamWriter.append(jsonArr, 0); + assertEquals(0, response1.get().getAppendResult().getOffset().getValue()); + + // Add a column to the table + Field newCol = + Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + ArrayList updatedFields = + new ArrayList<>(defaultValueTableDefinition.getSchema().getFields()); + updatedFields.add(newCol); + Schema updatedSchema = Schema.of(updatedFields); + TableInfo updatedTableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build(); + Table updatedTable = bigquery.update(updatedTableInfo); + assertEquals(updatedSchema, updatedTable.getDefinition().getSchema()); + + // Continue writing rows until backend acknowledges schema update + JSONObject jsonObject2 = new JSONObject(); + jsonObject2.put("bar_without_default", "no_schema_update_yet"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(jsonObject2); + + int nextI = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + if (response2.get().hasUpdatedSchema()) { + nextI = i + 1; + break; + } else { + Thread.sleep(1000); + } + } + + // Write using the new schema with 10 new requests + JSONObject updatedCol = new JSONObject(); + updatedCol.put("bar_without_default", "existing_col"); + updatedCol.put("new_col_without_default", "new_col"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedCol); + for (int i = nextI; i < nextI + 10; i++) { + ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, i); + assertEquals(i, response3.get().getAppendResult().getOffset().getValue()); + } + + // List all rows to verify table data correctness + Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator(); + + // Verify 1st row (with "existing_col_before_update") + FieldValueList currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("existing_col_before_update", currentRow.get(1).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + // Check whether the recorded value is close enough. + Instant parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + + // A few rows (with "no_schema_update_yet") until the schema was updated + for (int j = 1; j < nextI; j++) { + currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue()); + // Check whether the recorded value is close enough. + parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + } + // 10 rows after schema update with new column included + for (int j = nextI; j < nextI + 10; j++) { + currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("existing_col", currentRow.get(1).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + // Check whether the recorded value is close enough. + parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + // Verify the new column + assertEquals("new_col", currentRow.get(3).getStringValue()); + } + assertFalse(rowsIter.hasNext()); + + // Verify that the missing value map hasn't changed + assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap()); + } + } + @Test public void testJsonStreamWriterWithFlexibleColumnName() throws IOException, InterruptedException, ExecutionException,