diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index da5460a154..8e6c8958df 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -197,5 +197,20 @@
1001
com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool
+
+ 7002
+ com/google/cloud/bigquery/storage/v1/JsonStreamWriter
+ void setMissingValueInterpretationMap(java.util.Map)
+
+
+ 7002
+ com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter
+ void setMissingValueInterpretationMap(java.util.Map)
+
+
+ 7002
+ com/google/cloud/bigquery/storage/v1/StreamWriter
+ void setMissingValueInterpretationMap(java.util.Map)
+
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
index 83f45e4318..d6446113bd 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
@@ -119,18 +119,6 @@ public long getInflightWaitSeconds() {
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
}
- /**
- * Sets the missing value interpretation map for the JsonStreamWriter. The input
- * missingValueInterpretationMap is used for all append requests unless otherwise changed.
- *
- * @param missingValueInterpretationMap the missing value interpretation map used by the
- * JsonStreamWriter.
- */
- public void setMissingValueInterpretationMap(
- Map missingValueInterpretationMap) {
- this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
- }
-
/** @return the missing value interpretation map used for the writer. */
public Map
getMissingValueInterpretationMap() {
@@ -414,6 +402,21 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}
+ /**
+ * Sets the missing value interpretation map for the JsonStreamWriter. The input
+ * missingValueInterpretationMap is used for all append requests unless otherwise changed.
+ *
+ * @param missingValueInterpretationMap the missing value interpretation map used by the
+ * JsonStreamWriter.
+ * @return Builder
+ */
+ public Builder setMissingValueInterpretationMap(
+ Map missingValueInterpretationMap) {
+ this.schemaAwareStreamWriterBuilder.setMissingValueInterpretationMap(
+ missingValueInterpretationMap);
+ return this;
+ }
+
/**
* Builds JsonStreamWriter
*
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
index 8f45f0c5f5..b16a1d81de 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java
@@ -105,6 +105,7 @@ private SchemaAwareStreamWriter(Builder builder)
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
+ streamWriterBuilder.setMissingValueInterpretationMap(builder.missingValueInterpretationMap);
streamWriterBuilder.setClientId(builder.clientId);
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
@@ -298,18 +299,6 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}
- /**
- * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
- * missingValueInterpretationMap is used for all append requests unless otherwise changed.
- *
- * @param missingValueInterpretationMap the missing value interpretation map used by the
- * SchemaAwareStreamWriter.
- */
- public void setMissingValueInterpretationMap(
- Map missingValueInterpretationMap) {
- streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
- }
-
/** @return the missing value interpretation map used for the writer. */
public Map
getMissingValueInterpretationMap() {
@@ -475,6 +464,8 @@ public static final class Builder {
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
+ private Map
+ missingValueInterpretationMap = new HashMap();
private String clientId;
private boolean enableRequestProfiler = false;
@@ -684,6 +675,20 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}
+ /**
+ * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
+ * missingValueInterpretationMap is used for all append requests unless otherwise changed.
+ *
+ * @param missingValueInterpretationMap the missing value interpretation map used by the
+ * SchemaAwareStreamWriter.
+ * @return Builder
+ */
+ public Builder setMissingValueInterpretationMap(
+ Map missingValueInterpretationMap) {
+ this.missingValueInterpretationMap = missingValueInterpretationMap;
+ return this;
+ }
+
/**
* Sets the RetrySettings to use for in-stream error retry.
*
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index fabcac3b0b..a4223adb60 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -68,11 +68,6 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map projectAndDatasetToLocation = new ConcurrentHashMap<>();
- // Map of fields to their MissingValueInterpretation, which dictates how a field should be
- // populated when it is missing from an input user row.
- private Map missingValueInterpretationMap =
- new HashMap();
-
/*
* The identifier of stream to write to.
*/
@@ -103,6 +98,11 @@ public class StreamWriter implements AutoCloseable {
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
+ // Map of fields to their MissingValueInterpretation, which dictates how a field should be
+ // populated when it is missing from an input user row.
+ private Map missingValueInterpretationMap =
+ new HashMap();
+
/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
@@ -229,6 +229,7 @@ private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
+ this.missingValueInterpretationMap = builder.missingValueInterpretationMap;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
this.requestProfilerHook =
new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
@@ -420,18 +421,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}
- /**
- * Sets the missing value interpretation map for the stream writer. The input
- * missingValueInterpretationMap is used for all write requests unless otherwise changed.
- *
- * @param missingValueInterpretationMap the missing value interpretation map used by stream
- * writer.
- */
- public void setMissingValueInterpretationMap(
- Map missingValueInterpretationMap) {
- this.missingValueInterpretationMap = missingValueInterpretationMap;
- }
-
/**
* Schedules the writing of rows at the end of current stream.
*
@@ -700,6 +689,9 @@ public static final class Builder {
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
+ private Map
+ missingValueInterpretationMap = new HashMap();
+
private boolean enableRequestProfiler = false;
private boolean enableOpenTelemetry = false;
@@ -851,6 +843,20 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}
+ /**
+ * Sets the missing value interpretation map for the stream writer. The input
+ * missingValueInterpretationMap is used for all write requests unless otherwise changed.
+ *
+ * @param missingValueInterpretationMap the missing value interpretation map used by stream
+ * writer.
+ * @return Builder
+ */
+ public Builder setMissingValueInterpretationMap(
+ Map missingValueInterpretationMap) {
+ this.missingValueInterpretationMap = missingValueInterpretationMap;
+ return this;
+ }
+
/**
* Enable a latency profiler that would periodically generate a detailed latency report for the
* top latency requests. This is currently an experimental API.
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
index 4aea9c1af5..c6e920192b 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
@@ -868,6 +868,95 @@ public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Ex
}
}
+ @Test
+ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
+ testBigQueryWrite.addResponse(
+ WriteStream.newBuilder()
+ .setName(TEST_STREAM)
+ .setTableSchema(TABLE_SCHEMA)
+ .setLocation("us")
+ .build());
+ Map missingValueMap = new HashMap<>();
+ missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
+ missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+
+ try (JsonStreamWriter writer =
+ getTestJsonStreamWriterBuilder(TEST_STREAM)
+ .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
+ .setMissingValueInterpretationMap(missingValueMap)
+ .build()) {
+
+ testBigQueryWrite.addResponse(
+ AppendRowsResponse.newBuilder()
+ .setAppendResult(
+ AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
+ .setUpdatedSchema(UPDATED_TABLE_SCHEMA)
+ .build());
+ testBigQueryWrite.addResponse(createAppendResponse(1));
+ // Verify the map before the writer is refreshed
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
+ testBigQueryWrite.addResponse(createAppendResponse(2));
+ testBigQueryWrite.addResponse(createAppendResponse(3));
+
+ // First batch of appends. First append request will return an updated-schema, but the second
+ // and maybe the third append will be processed before the first response will refresh the
+ // StreamWriter.
+ JSONObject foo = new JSONObject();
+ foo.put("foo", "aaa");
+ JSONArray jsonArr = new JSONArray();
+ jsonArr.put(foo);
+
+ ApiFuture appendFuture1 = writer.append(jsonArr);
+ ApiFuture appendFuture2 = writer.append(jsonArr);
+ ApiFuture appendFuture3 = writer.append(jsonArr);
+
+ assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
+ assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
+ assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
+
+ // Another append, this time with columns to match the updated schema.
+ JSONObject updatedFoo = new JSONObject();
+ updatedFoo.put("foo", "aaa");
+ updatedFoo.put("bar", "bbb");
+ JSONArray updatedJsonArr = new JSONArray();
+ updatedJsonArr.put(updatedFoo);
+ ApiFuture appendFuture4 = writer.append(updatedJsonArr);
+
+ assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
+ assertEquals(4, testBigQueryWrite.getAppendRequests().size());
+ assertEquals(
+ 1,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(3)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(3)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRows(0),
+ UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());
+
+ assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
+ assertTrue(
+ testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
+ || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
+
+ // Verify the map after the writer is refreshed
+ assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
+ assertEquals(
+ testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
+ MissingValueInterpretation.DEFAULT_VALUE);
+ assertEquals(
+ testBigQueryWrite.getAppendRequests().get(3).getMissingValueInterpretations(),
+ missingValueMap);
+ }
+ }
+
@Test
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
@@ -1523,13 +1612,16 @@ public void testAppendWithMissingValueMap() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);
+ Map missingValueMap = new HashMap<>();
+ missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
+ missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+
try (JsonStreamWriter writer =
- getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
+ getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema)
+ .setMissingValueInterpretationMap(missingValueMap)
+ .setTraceId("test:empty")
+ .build()) {
- Map missingValueMap = new HashMap<>();
- missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
- missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
- writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
testBigQueryWrite.addResponse(
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 5bd242e0ee..8468fb712f 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -171,12 +171,15 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.build();
}
- private StreamWriter getTestStreamWriter() throws IOException {
+ private StreamWriter.Builder getTestStreamWriterBuilder() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
- .setMaxRetryDuration(java.time.Duration.ofSeconds(5))
- .build();
+ .setMaxRetryDuration(java.time.Duration.ofSeconds(5));
+ }
+
+ private StreamWriter getTestStreamWriter() throws IOException {
+ return getTestStreamWriterBuilder().build();
}
private StreamWriter getTestStreamWriterRetryEnabled() throws IOException {
@@ -1583,47 +1586,54 @@ public void testCloseDisconnectedStream() throws Exception {
@Test
public void testSetAndGetMissingValueInterpretationMap() throws Exception {
- StreamWriter writer = getTestStreamWriter();
+ StreamWriter.Builder writerBuilder = getTestStreamWriterBuilder();
Map missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
- writer.setMissingValueInterpretationMap(missingValueMap);
+ writerBuilder.setMissingValueInterpretationMap(missingValueMap);
+ StreamWriter writer = writerBuilder.build();
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
}
@Test
- public void testAppendWithMissingValueMap() throws Exception {
- StreamWriter writer = getTestStreamWriter();
+ public void testAppendWithoutMissingValueMap() throws Exception {
+ try (StreamWriter writer = getTestStreamWriter()) {
- long appendCount = 2;
- testBigQueryWrite.addResponse(createAppendResponse(0));
- testBigQueryWrite.addResponse(createAppendResponse(1));
+ testBigQueryWrite.addResponse(createAppendResponse(0));
- List> futures = new ArrayList<>();
- // The first append doesn't use a missing value map.
- futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));
+ ApiFuture responseFuture =
+ writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);
- // The second append uses a missing value map.
+ assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue());
+
+ verifyAppendRequests(1);
+ assertTrue(
+ testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations().isEmpty());
+ }
+ }
+
+ @Test
+ public void testAppendWithMissingValueMap() throws Exception {
Map missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
- writer.setMissingValueInterpretationMap(missingValueMap);
- futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));
- for (int i = 0; i < appendCount; i++) {
- assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
- }
+ try (StreamWriter writer =
+ getTestStreamWriterBuilder().setMissingValueInterpretationMap(missingValueMap).build()) {
- // Ensure that the AppendRowsRequest for the first append operation does not have a missing
- // value map, and that the second AppendRowsRequest has the missing value map provided in the
- // second append.
- verifyAppendRequests(appendCount);
- AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
- AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
- assertTrue(request1.getMissingValueInterpretations().isEmpty());
- assertEquals(request2.getMissingValueInterpretations(), missingValueMap);
+ testBigQueryWrite.addResponse(createAppendResponse(0));
- writer.close();
+ ApiFuture responseFuture =
+ writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);
+
+ assertEquals(0, responseFuture.get().getAppendResult().getOffset().getValue());
+
+ verifyAppendRequests(1);
+
+ assertEquals(
+ testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
+ missingValueMap);
+ }
}
@Test(timeout = 10000)
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
index 9d29d4cfd2..58f80dbce4 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
@@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
}
}
+ @Test
+ public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
+ throws DescriptorValidationException, ExecutionException, IOException, InterruptedException,
+ ParseException {
+ String tableName = "SchemaUpdateMissingValueMapTestTable";
+ TableId tableId = TableId.of(DATASET, tableName);
+ tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build();
+ bigquery.create(tableInfo);
+ TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(parent.toString())
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ Map missingValueMap = new HashMap<>();
+ missingValueMap.put(
+ "foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+ missingValueMap.put(
+ "date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
+
+ try (JsonStreamWriter jsonStreamWriter =
+ JsonStreamWriter.newBuilder(writeStream.getName(), client)
+ .setMissingValueInterpretationMap(missingValueMap)
+ .build()) {
+ // Verify the missing value map
+ assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
+
+ // First append with the current schema
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("bar_without_default", "existing_col_before_update");
+ JSONArray jsonArr = new JSONArray();
+ jsonArr.put(jsonObject);
+ ApiFuture response1 = jsonStreamWriter.append(jsonArr, 0);
+ assertEquals(0, response1.get().getAppendResult().getOffset().getValue());
+
+ // Add a column to the table
+ Field newCol =
+ Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ ArrayList updatedFields =
+ new ArrayList<>(defaultValueTableDefinition.getSchema().getFields());
+ updatedFields.add(newCol);
+ Schema updatedSchema = Schema.of(updatedFields);
+ TableInfo updatedTableInfo =
+ TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
+ Table updatedTable = bigquery.update(updatedTableInfo);
+ assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());
+
+ // Continue writing rows until backend acknowledges schema update
+ JSONObject jsonObject2 = new JSONObject();
+ jsonObject2.put("bar_without_default", "no_schema_update_yet");
+ JSONArray jsonArr2 = new JSONArray();
+ jsonArr2.put(jsonObject2);
+
+ int nextI = 0;
+ for (int i = 1; i < 100; i++) {
+ ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i);
+ assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
+ if (response2.get().hasUpdatedSchema()) {
+ nextI = i + 1;
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+
+ // Write using the new schema with 10 new requests
+ JSONObject updatedCol = new JSONObject();
+ updatedCol.put("bar_without_default", "existing_col");
+ updatedCol.put("new_col_without_default", "new_col");
+ JSONArray updatedJsonArr = new JSONArray();
+ updatedJsonArr.put(updatedCol);
+ for (int i = nextI; i < nextI + 10; i++) {
+ ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, i);
+ assertEquals(i, response3.get().getAppendResult().getOffset().getValue());
+ }
+
+ // List all rows to verify table data correctness
+ Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator();
+
+ // Verify 1st row (with "existing_col_before_update")
+ FieldValueList currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("existing_col_before_update", currentRow.get(1).getStringValue());
+ assertFalse(currentRow.get(2).getStringValue().isEmpty());
+ // Check whether the recorded value is close enough.
+ Instant parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+
+ // A few rows (with "no_schema_update_yet") until the schema was updated
+ for (int j = 1; j < nextI; j++) {
+ currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue());
+ // Check whether the recorded value is close enough.
+ parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+ }
+ // 10 rows after schema update with new column included
+ for (int j = nextI; j < nextI + 10; j++) {
+ currentRow = rowsIter.next();
+ assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
+ assertEquals("existing_col", currentRow.get(1).getStringValue());
+ assertFalse(currentRow.get(2).getStringValue().isEmpty());
+ // Check whether the recorded value is close enough.
+ parsedInstant =
+ Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
+ assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
+ // Verify the new column
+ assertEquals("new_col", currentRow.get(3).getStringValue());
+ }
+ assertFalse(rowsIter.hasNext());
+
+ // Verify that the missing value map hasn't changed
+ assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
+ }
+ }
+
@Test
public void testJsonStreamWriterWithFlexibleColumnName()
throws IOException, InterruptedException, ExecutionException,