From 847c313eae379d84bdb803660dad3eb19520d4be Mon Sep 17 00:00:00 2001 From: yifatgortler Date: Fri, 26 Jul 2024 23:31:35 +0000 Subject: [PATCH] Persist missingValueInterpretationMap in StreamWriter's Builder. In case the StreamWriter is recreated, the map will be used in the new StreamWriter too. --- .../clirr-ignored-differences.xml | 25 +++++ .../bigquery/storage/v1/JsonStreamWriter.java | 32 +++--- .../storage/v1/SchemaAwareStreamWriter.java | 34 +++--- .../bigquery/storage/v1/StreamWriter.java | 27 ++++- .../storage/v1/JsonStreamWriterTest.java | 100 ++++++++++++++++-- .../bigquery/storage/v1/StreamWriterTest.java | 71 ++++++++----- 6 files changed, 213 insertions(+), 76 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index da5460a154..bcb75e3ca8 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -197,5 +197,30 @@ 1001 com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool + + 7002 + com/google/cloud/bigquery/storage/v1/JsonStreamWriter + java.util.Map getMissingValueInterpretationMap() + + + 7002 + com/google/cloud/bigquery/storage/v1/JsonStreamWriter + void setMissingValueInterpretationMap(java.util.Map) + + + 7002 + com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter + java.util.Map getMissingValueInterpretationMap() + + + 7002 + com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter + void setMissingValueInterpretationMap(java.util.Map) + + + 7002 + com/google/cloud/bigquery/storage/v1/StreamWriter + void setMissingValueInterpretationMap(java.util.Map) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 83f45e4318..e11a1729c6 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -119,24 +119,6 @@ public long getInflightWaitSeconds() { return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } - /** - * Sets the missing value interpretation map for the JsonStreamWriter. The input - * missingValueInterpretationMap is used for all append requests unless otherwise changed. - * - * @param missingValueInterpretationMap the missing value interpretation map used by the - * JsonStreamWriter. - */ - public void setMissingValueInterpretationMap( - Map missingValueInterpretationMap) { - this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); - } - - /** @return the missing value interpretation map used for the writer. */ - public Map - getMissingValueInterpretationMap() { - return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); - } - /** * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by * StreamWriter by default. @@ -414,6 +396,20 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the JsonStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * JsonStreamWriter. + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.schemaAwareStreamWriterBuilder.setMissingValueInterpretationMap( + missingValueInterpretationMap); + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 8f45f0c5f5..1ff767a279 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -105,6 +105,7 @@ private SchemaAwareStreamWriter(Builder builder) streamWriterBuilder.setLocation(builder.location); streamWriterBuilder.setDefaultMissingValueInterpretation( builder.defaultMissingValueInterpretation); + streamWriterBuilder.setMissingValueInterpretationMap(builder.missingValueInterpretationMap); streamWriterBuilder.setClientId(builder.clientId); streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler); requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); @@ -298,24 +299,6 @@ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); } - /** - * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input - * missingValueInterpretationMap is used for all append requests unless otherwise changed. - * - * @param missingValueInterpretationMap the missing value interpretation map used by the - * SchemaAwareStreamWriter. - */ - public void setMissingValueInterpretationMap( - Map missingValueInterpretationMap) { - streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); - } - - /** @return the missing value interpretation map used for the writer. */ - public Map - getMissingValueInterpretationMap() { - return streamWriter.getMissingValueInterpretationMap(); - } - /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, @@ -475,6 +458,8 @@ public static final class Builder { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private Map + missingValueInterpretationMap = new HashMap(); private String clientId; private boolean enableRequestProfiler = false; @@ -684,6 +669,19 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * SchemaAwareStreamWriter. + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.missingValueInterpretationMap = missingValueInterpretationMap; + return this; + } + /** * Sets the RetrySettings to use for in-stream error retry. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index fabcac3b0b..4c4e5638dc 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -68,11 +68,6 @@ public class StreamWriter implements AutoCloseable { // Cache of location info for a given dataset. private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>(); - // Map of fields to their MissingValueInterpretation, which dictates how a field should be - // populated when it is missing from an input user row. - private Map missingValueInterpretationMap = - new HashMap(); - /* * The identifier of stream to write to. */ @@ -103,6 +98,11 @@ public class StreamWriter implements AutoCloseable { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + // Map of fields to their MissingValueInterpretation, which dictates how a field should be + // populated when it is missing from an input user row. + private Map missingValueInterpretationMap = + new HashMap(); + /** * Stream can access a single connection or a pool of connection depending on whether multiplexing * is enabled. @@ -229,6 +229,7 @@ private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation; + this.missingValueInterpretationMap = builder.missingValueInterpretationMap; BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); @@ -700,6 +701,9 @@ public static final class Builder { private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private Map + missingValueInterpretationMap = new HashMap(); + private boolean enableRequestProfiler = false; private boolean enableOpenTelemetry = false; @@ -851,6 +855,19 @@ public Builder setDefaultMissingValueInterpretation( return this; } + /** + * Sets the missing value interpretation map for the stream writer. The input + * missingValueInterpretationMap is used for all write requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by stream + * writer. + */ + public Builder setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.missingValueInterpretationMap = missingValueInterpretationMap; + return this; + } + /** * Enable a latency profiler that would periodically generate a detailed latency report for the * top latency requests. This is currently an experimental API. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 4aea9c1af5..9e9e56a688 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -868,6 +868,91 @@ public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Ex } } + @Test + public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + Map missingValueMap = new HashMap<>(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .setMissingValueInterpretationMap(missingValueMap) + .build()) { + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + + // First batch of appends. First append request will return an updated-schema, but the second + // and maybe the third append will be processed before the first response will refresh the + // StreamWriter. + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = writer.append(jsonArr); + ApiFuture appendFuture2 = writer.append(jsonArr); + ApiFuture appendFuture3 = writer.append(jsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + + // Another append, this time with columns to match the updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + ApiFuture appendFuture4 = writer.append(updatedJsonArr); + + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + assertEquals(4, testBigQueryWrite.getAppendRequests().size()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString()); + + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() + || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); + + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(), + MissingValueInterpretation.DEFAULT_VALUE); + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getMissingValueInterpretations(), + missingValueMap); + } + } + @Test public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); @@ -1523,14 +1608,15 @@ public void testAppendWithMissingValueMap() throws Exception { JSONArray jsonArr = new JSONArray(); jsonArr.put(flexible); - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) { + Map missingValueMap = new HashMap<>(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - Map missingValueMap = new HashMap<>(); - missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); - missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); - assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema) + .setMissingValueInterpretationMap(missingValueMap) + .setTraceId("test:empty") + .build()) { testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 43abdb5999..cd849fc50f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -171,12 +171,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException { .build(); } - private StreamWriter getTestStreamWriter() throws IOException { + private StreamWriter.Builder getTestStreamWriterBuilder() throws IOException { return StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) - .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) - .build(); + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)); + } + + private StreamWriter getTestStreamWriter() throws IOException { + return getTestStreamWriterBuilder().build(); } private StreamWriter getTestStreamWriterRetryEnabled() throws IOException { @@ -1581,47 +1584,59 @@ public void testCloseDisconnectedStream() throws Exception { @Test public void testSetAndGetMissingValueInterpretationMap() throws Exception { - StreamWriter writer = getTestStreamWriter(); + StreamWriter.Builder writerBuilder = getTestStreamWriterBuilder(); Map missingValueMap = new HashMap(); missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); + writerBuilder.setMissingValueInterpretationMap(missingValueMap); + StreamWriter writer = writerBuilder.build(); assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); } @Test - public void testAppendWithMissingValueMap() throws Exception { - StreamWriter writer = getTestStreamWriter(); + public void testAppendWithoutMissingValueMap() throws Exception { + try (StreamWriter writer = getTestStreamWriter()) { - long appendCount = 2; - testBigQueryWrite.addResponse(createAppendResponse(0)); - testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(0)); - List> futures = new ArrayList<>(); - // The first append doesn't use a missing value map. - futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0)); + // The first append doesn't use a missing value map. + ApiFuture responseFuture = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); - // The second append uses a missing value map. + assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue()); + + // Ensure that the AppendRowsRequest for the first append operation does not have a missing + // value map, and that the second AppendRowsRequest has the missing value map provided in the + // second append. + verifyAppendRequests(1); + AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0); + assertTrue(request1.getMissingValueInterpretations().isEmpty()); + } + } + + @Test + public void testAppendWithMissingValueMap() throws Exception { Map missingValueMap = new HashMap(); missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); - writer.setMissingValueInterpretationMap(missingValueMap); - futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1)); - for (int i = 0; i < appendCount; i++) { - assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); - } + try (StreamWriter writer = + getTestStreamWriterBuilder().setMissingValueInterpretationMap(missingValueMap).build()) { - // Ensure that the AppendRowsRequest for the first append operation does not have a missing - // value map, and that the second AppendRowsRequest has the missing value map provided in the - // second append. - verifyAppendRequests(appendCount); - AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0); - AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1); - assertTrue(request1.getMissingValueInterpretations().isEmpty()); - assertEquals(request2.getMissingValueInterpretations(), missingValueMap); + testBigQueryWrite.addResponse(createAppendResponse(0)); - writer.close(); + // The first append doesn't use a missing value map. + ApiFuture responseFuture = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + + assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue()); + + verifyAppendRequests(1); + + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(), + missingValueMap); + } } @Test(timeout = 10000)