diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index beb4a0ef37..8eab8bfdde 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -59,6 +59,7 @@
*
TODO: support updated schema
*/
class ConnectionWorker implements AutoCloseable {
+
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
// Maximum wait time on inflight quota before error out.
@@ -280,6 +281,8 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamWriter.getStreamName());
+ requestBuilder.putAllMissingValueInterpretations(
+ streamWriter.getMissingValueInterpretationMap());
return appendInternal(streamWriter, requestBuilder.build());
}
@@ -853,6 +856,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {
// Class that wraps AppendRowsRequest and its corresponding Response future.
static final class AppendRequestAndResponse {
+
final SettableApiFuture appendResult;
final AppendRowsRequest message;
final long messageSize;
@@ -884,6 +888,7 @@ public Load getLoad() {
*/
@AutoValue
public abstract static class Load {
+
// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.2;
@@ -957,6 +962,7 @@ static void setMaxInflightQueueWaitTime(long waitTime) {
@AutoValue
abstract static class TableSchemaAndTimestamp {
+
// Shows the timestamp updated schema is reported from response
abstract long updateTimeStamp();
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
index 24061878f2..edf40c1e64 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
@@ -275,6 +275,24 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}
+ /**
+ * Sets the missing value interpretation map for the JsonStreamWriter. The input
+ * missingValueInterpretationMap is used for all append requests unless otherwise changed.
+ *
+ * @param missingValueInterpretationMap the missing value interpretation map used by the
+ * JsonStreamWriter.
+ */
+ public void setMissingValueInterpretationMap(
+ Map missingValueInterpretationMap) {
+ streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
+ }
+
+ /** @return the missing value interpretation map used for the writer. */
+ public Map
+ getMissingValueInterpretationMap() {
+ return streamWriter.getMissingValueInterpretationMap();
+ }
+
/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index 77bad3eb24..ff965f0477 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -32,6 +32,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -61,6 +62,11 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>();
+ // Map of fields to their MissingValueInterpretation, which dictates how a field should be
+ // populated when it is missing from an input user row.
+ private Map missingValueInterpretationMap =
+ new HashMap();
+
/*
* The identifier of stream to write to.
*/
@@ -336,6 +342,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}
+ /**
+ * Sets the missing value interpretation map for the stream writer. The input
+ * missingValueInterpretationMap is used for all write requests unless otherwise changed.
+ *
+ * @param missingValueInterpretationMap the missing value interpretation map used by stream
+ * writer.
+ */
+ public void setMissingValueInterpretationMap(
+ Map missingValueInterpretationMap) {
+ this.missingValueInterpretationMap = missingValueInterpretationMap;
+ }
+
/**
* Schedules the writing of rows at the end of current stream.
*
@@ -419,6 +437,12 @@ public String getLocation() {
return location;
}
+ /** @return the missing value interpretation map used for the writer. */
+ public Map
+ getMissingValueInterpretationMap() {
+ return missingValueInterpretationMap;
+ }
+
/**
* @return if a stream writer can no longer be used for writing. It is due to either the
* StreamWriter is explicitly closed or the underlying connection is broken when connection
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
index 691ec4afde..a748678839 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
@@ -48,6 +48,7 @@
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -1290,4 +1291,43 @@ private AppendRowsResponse createAppendResponse(long offset) {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
.build();
}
+
+ @Test
+ public void testAppendWithMissingValueMap() throws Exception {
+ TableFieldSchema field =
+ TableFieldSchema.newBuilder()
+ .setType(TableFieldSchema.Type.STRING)
+ .setMode(TableFieldSchema.Mode.NULLABLE)
+ .setName("test-列")
+ .build();
+ TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
+ FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
+ JSONObject flexible = new JSONObject();
+ flexible.put("test-列", "allen");
+ JSONArray jsonArr = new JSONArray();
+ jsonArr.put(flexible);
+
+ try (JsonStreamWriter writer =
+ getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
+
+ Map missingValueMap = new HashMap();
+ missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
+ missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+ writer.setMissingValueInterpretationMap(missingValueMap);
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
+
+ testBigQueryWrite.addResponse(
+ AppendRowsResponse.newBuilder()
+ .setAppendResult(
+ AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
+ .build());
+
+ ApiFuture appendFuture = writer.append(jsonArr);
+ assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
+ appendFuture.get();
+ assertEquals(
+ testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
+ missingValueMap);
+ }
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 383301d820..43c5fd2bea 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -49,7 +49,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -67,6 +69,7 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
+
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
@@ -1227,6 +1230,51 @@ public void testCloseDisconnectedStream() throws Exception {
writer.close();
}
+ @Test
+ public void testSetAndGetMissingValueInterpretationMap() throws Exception {
+ StreamWriter writer = getTestStreamWriter();
+ Map missingValueMap = new HashMap();
+ missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
+ missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+ writer.setMissingValueInterpretationMap(missingValueMap);
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
+ }
+
+ @Test
+ public void testAppendWithMissingValueMap() throws Exception {
+ StreamWriter writer = getTestStreamWriter();
+
+ long appendCount = 2;
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+ testBigQueryWrite.addResponse(createAppendResponse(1));
+
+ List> futures = new ArrayList<>();
+ // The first append doesn't use a missing value map.
+ futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));
+
+ // The second append uses a missing value map.
+ Map missingValueMap = new HashMap();
+ missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
+ missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+ writer.setMissingValueInterpretationMap(missingValueMap);
+ futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));
+
+ for (int i = 0; i < appendCount; i++) {
+ assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
+ }
+
+ // Ensure that the AppendRowsRequest for the first append operation does not have a missing
+ // value map, and that the second AppendRowsRequest has the missing value map provided in the
+ // second append.
+ verifyAppendRequests(appendCount);
+ AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
+ AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
+ assertTrue(request1.getMissingValueInterpretations().isEmpty());
+ assertEquals(request2.getMissingValueInterpretations(), missingValueMap);
+
+ writer.close();
+ }
+
@Test(timeout = 10000)
public void testStreamWriterUserCloseMultiplexing() throws Exception {
StreamWriter writer =