Skip to content

Commit

Permalink
Addressed comments: Revived the getMissingValueInterpretationMap meth…
Browse files Browse the repository at this point in the history
…od and added Integration Test.
  • Loading branch information
yifatgortler committed Sep 25, 2024
1 parent 547c4ff commit 671128a
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
10 changes: 0 additions & 10 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,11 @@
<differenceType>1001</differenceType>
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
<method>java.util.Map getMissingValueInterpretationMap()</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>
<method>java.util.Map getMissingValueInterpretationMap()</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ public long getInflightWaitSeconds() {
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return this.schemaAwareStreamWriter.getMissingValueInterpretationMap();
}

/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return streamWriter.getMissingValueInterpretationMap();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(createAppendResponse(1));
// Verify the map before the writer is refreshed
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
testBigQueryWrite.addResponse(createAppendResponse(2));
testBigQueryWrite.addResponse(createAppendResponse(3));

Expand Down Expand Up @@ -944,6 +946,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());

// Verify the map after the writer is refreshed
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
assertEquals(
testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
Expand Down Expand Up @@ -1618,6 +1622,8 @@ public void testAppendWithMissingValueMap() throws Exception {
.setTraceId("test:empty")
.build()) {

assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
}
}

@Test
public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
throws DescriptorValidationException, ExecutionException, IOException, InterruptedException,
ParseException {
String tableName = "SchemaUpdateMissingValueMapTestTable";
TableId tableId = TableId.of(DATASET, tableName);
tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
missingValueMap.put(
"foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
missingValueMap.put(
"date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);

try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), client)
.setMissingValueInterpretationMap(missingValueMap)
.build()) {
// Verify the missing value map
assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());

// First append with the current schema
JSONObject jsonObject = new JSONObject();
jsonObject.put("bar_without_default", "existing_col_before_update");
JSONArray jsonArr = new JSONArray();
jsonArr.put(jsonObject);
ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response1.get().getAppendResult().getOffset().getValue());

// Add a column to the table
Field newCol =
Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build();
ArrayList<Field> updatedFields =
new ArrayList<>(defaultValueTableDefinition.getSchema().getFields());
updatedFields.add(newCol);
Schema updatedSchema = Schema.of(updatedFields);
TableInfo updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
Table updatedTable = bigquery.update(updatedTableInfo);
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());

// Continue writing rows until backend acknowledges schema update
JSONObject jsonObject2 = new JSONObject();
jsonObject2.put("bar_without_default", "no_schema_update_yet");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(jsonObject2);

int nextI = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
nextI = i + 1;
break;
} else {
Thread.sleep(1000);
}
}

// Write using the new schema with 10 new requests
JSONObject updatedCol = new JSONObject();
updatedCol.put("bar_without_default", "existing_col");
updatedCol.put("new_col_without_default", "new_col");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedCol);
for (int i = nextI; i < nextI + 10; i++) {
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(updatedJsonArr, i);
assertEquals(i, response3.get().getAppendResult().getOffset().getValue());
}

// List all rows to verify table data correctness
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();

// Verify 1st row (with "existing_col_before_update")
FieldValueList currentRow = rowsIter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertEquals("existing_col_before_update", currentRow.get(1).getStringValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
// Check whether the recorded value is close enough.
Instant parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));

// A few rows (with "no_schema_update_yet") until the schema was updated
for (int j = 1; j < nextI; j++) {
currentRow = rowsIter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue());
// Check whether the recorded value is close enough.
parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
}
// 10 rows after schema update with new column included
for (int j = nextI; j < nextI + 10; j++) {
currentRow = rowsIter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertEquals("existing_col", currentRow.get(1).getStringValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
// Check whether the recorded value is close enough.
parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
// Verify the new column
assertEquals("new_col", currentRow.get(3).getStringValue());
}
assertFalse(rowsIter.hasNext());

// Verify that the missing value map hasn't changed
assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
}
}

@Test
public void testJsonStreamWriterWithFlexibleColumnName()
throws IOException, InterruptedException, ExecutionException,
Expand Down

0 comments on commit 671128a

Please sign in to comment.