From 98d7e446b75aba02ce27cdcb5e835c3fd0f3ad54 Mon Sep 17 00:00:00 2001 From: risnayak <36604411+risnayak@users.noreply.github.com> Date: Tue, 21 Feb 2023 11:30:24 -0800 Subject: [PATCH] feat: add functions to set missing value map in the stream writers (#1966) * feat: add functions to set missing value map in the stream writers * fix syntax error * fix lint errors --------- Co-authored-by: Rishabh Nayak --- .../bigquery/storage/v1/ConnectionWorker.java | 6 +++ .../bigquery/storage/v1/JsonStreamWriter.java | 18 +++++++ .../bigquery/storage/v1/StreamWriter.java | 24 ++++++++++ .../storage/v1/JsonStreamWriterTest.java | 40 ++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 48 +++++++++++++++++++ 5 files changed, 136 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index beb4a0ef37..8eab8bfdde 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -59,6 +59,7 @@ *

TODO: support updated schema */ class ConnectionWorker implements AutoCloseable { + private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); // Maximum wait time on inflight quota before error out. @@ -280,6 +281,8 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, requestBuilder.setOffset(Int64Value.of(offset)); } requestBuilder.setWriteStream(streamWriter.getStreamName()); + requestBuilder.putAllMissingValueInterpretations( + streamWriter.getMissingValueInterpretationMap()); return appendInternal(streamWriter, requestBuilder.build()); } @@ -853,6 +856,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() { // Class that wraps AppendRowsRequest and its corresponding Response future. static final class AppendRequestAndResponse { + final SettableApiFuture appendResult; final AppendRowsRequest message; final long messageSize; @@ -884,6 +888,7 @@ public Load getLoad() { */ @AutoValue public abstract static class Load { + // Consider the load on this worker to be overwhelmed when above some percentage of // in-flight bytes or in-flight requests count. private static double overwhelmedInflightCount = 0.2; @@ -957,6 +962,7 @@ static void setMaxInflightQueueWaitTime(long waitTime) { @AutoValue abstract static class TableSchemaAndTimestamp { + // Shows the timestamp updated schema is reported from response abstract long updateTimeStamp(); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 24061878f2..edf40c1e64 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -275,6 +275,24 @@ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); } + /** + * Sets the missing value interpretation map for the JsonStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * JsonStreamWriter. + */ + public void setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); + } + + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return streamWriter.getMissingValueInterpretationMap(); + } + /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 77bad3eb24..ff965f0477 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -32,6 +32,7 @@ import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -61,6 +62,11 @@ public class StreamWriter implements AutoCloseable { // Cache of location info for a given dataset. private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>(); + // Map of fields to their MissingValueInterpretation, which dictates how a field should be + // populated when it is missing from an input user row. + private Map missingValueInterpretationMap = + new HashMap(); + /* * The identifier of stream to write to. */ @@ -336,6 +342,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { } } + /** + * Sets the missing value interpretation map for the stream writer. The input + * missingValueInterpretationMap is used for all write requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by stream + * writer. + */ + public void setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + this.missingValueInterpretationMap = missingValueInterpretationMap; + } + /** * Schedules the writing of rows at the end of current stream. * @@ -419,6 +437,12 @@ public String getLocation() { return location; } + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return missingValueInterpretationMap; + } + /** * @return if a stream writer can no longer be used for writing. It is due to either the * StreamWriter is explicitly closed or the underlying connection is broken when connection diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 691ec4afde..a748678839 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -48,6 +48,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -1290,4 +1291,43 @@ private AppendRowsResponse createAppendResponse(long offset) { AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) .build(); } + + @Test + public void testAppendWithMissingValueMap() throws Exception { + TableFieldSchema field = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test-列") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build(); + FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build(); + JSONObject flexible = new JSONObject(); + flexible.put("test-列", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(flexible); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) { + + Map missingValueMap = new HashMap(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + writer.setMissingValueInterpretationMap(missingValueMap); + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + + ApiFuture appendFuture = writer.append(jsonArr); + assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue()); + appendFuture.get(); + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(), + missingValueMap); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 383301d820..43c5fd2bea 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -49,7 +49,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -67,6 +69,7 @@ @RunWith(JUnit4.class) public class StreamWriterTest { + private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName()); private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default"; private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; @@ -1227,6 +1230,51 @@ public void testCloseDisconnectedStream() throws Exception { writer.close(); } + @Test + public void testSetAndGetMissingValueInterpretationMap() throws Exception { + StreamWriter writer = getTestStreamWriter(); + Map missingValueMap = new HashMap(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + writer.setMissingValueInterpretationMap(missingValueMap); + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + } + + @Test + public void testAppendWithMissingValueMap() throws Exception { + StreamWriter writer = getTestStreamWriter(); + + long appendCount = 2; + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + List> futures = new ArrayList<>(); + // The first append doesn't use a missing value map. + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0)); + + // The second append uses a missing value map. + Map missingValueMap = new HashMap(); + missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); + missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + writer.setMissingValueInterpretationMap(missingValueMap); + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1)); + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + + // Ensure that the AppendRowsRequest for the first append operation does not have a missing + // value map, and that the second AppendRowsRequest has the missing value map provided in the + // second append. + verifyAppendRequests(appendCount); + AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0); + AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1); + assertTrue(request1.getMissingValueInterpretations().isEmpty()); + assertEquals(request2.getMissingValueInterpretations(), missingValueMap); + + writer.close(); + } + @Test(timeout = 10000) public void testStreamWriterUserCloseMultiplexing() throws Exception { StreamWriter writer =