From 671128a44057174740e854941fdf3559840a84a7 Mon Sep 17 00:00:00 2001 From: yifatgortler Date: Wed, 25 Sep 2024 19:07:50 +0000 Subject: [PATCH] Addressed comments: Revived the getMissingValueInterpretationMap method and added Integration Test. --- .../clirr-ignored-differences.xml | 10 -- .../bigquery/storage/v1/JsonStreamWriter.java | 6 + .../storage/v1/SchemaAwareStreamWriter.java | 6 + .../storage/v1/JsonStreamWriterTest.java | 6 + .../it/ITBigQueryWriteManualClientTest.java | 123 ++++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index bcb75e3ca8..8e6c8958df 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -197,21 +197,11 @@ 1001 com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool - - 7002 - com/google/cloud/bigquery/storage/v1/JsonStreamWriter - java.util.Map getMissingValueInterpretationMap() - 7002 com/google/cloud/bigquery/storage/v1/JsonStreamWriter void setMissingValueInterpretationMap(java.util.Map) - - 7002 - com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter - java.util.Map getMissingValueInterpretationMap() - 7002 com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index f16cee9b54..d6446113bd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -119,6 +119,12 @@ public long getInflightWaitSeconds() { return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); + } + /** * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by * StreamWriter by default. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 0c4b76eba5..b16a1d81de 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -299,6 +299,12 @@ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); } + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return streamWriter.getMissingValueInterpretationMap(); + } + /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 9e9e56a688..c6e920192b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -893,6 +893,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); testBigQueryWrite.addResponse(createAppendResponse(1)); + // Verify the map before the writer is refreshed + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); testBigQueryWrite.addResponse(createAppendResponse(2)); testBigQueryWrite.addResponse(createAppendResponse(3)); @@ -944,6 +946,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception { testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); + // Verify the map after the writer is refreshed + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); assertEquals( testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(), MissingValueInterpretation.DEFAULT_VALUE); @@ -1618,6 +1622,8 @@ public void testAppendWithMissingValueMap() throws Exception { .setTraceId("test:empty") .build()) { + assertEquals(missingValueMap, writer.getMissingValueInterpretationMap()); + testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 9d29d4cfd2..58f80dbce4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() } } + @Test + public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap() + throws DescriptorValidationException, ExecutionException, IOException, InterruptedException, + ParseException { + String tableName = "SchemaUpdateMissingValueMapTestTable"; + TableId tableId = TableId.of(DATASET, tableName); + tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + Map missingValueMap = new HashMap<>(); + missingValueMap.put( + "foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + missingValueMap.put( + "date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); + + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), client) + .setMissingValueInterpretationMap(missingValueMap) + .build()) { + // Verify the missing value map + assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap()); + + // First append with the current schema + JSONObject jsonObject = new JSONObject(); + jsonObject.put("bar_without_default", "existing_col_before_update"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(jsonObject); + ApiFuture response1 = jsonStreamWriter.append(jsonArr, 0); + assertEquals(0, response1.get().getAppendResult().getOffset().getValue()); + + // Add a column to the table + Field newCol = + Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + ArrayList updatedFields = + new ArrayList<>(defaultValueTableDefinition.getSchema().getFields()); + updatedFields.add(newCol); + Schema updatedSchema = Schema.of(updatedFields); + TableInfo updatedTableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build(); + Table updatedTable = bigquery.update(updatedTableInfo); + assertEquals(updatedSchema, updatedTable.getDefinition().getSchema()); + + // Continue writing rows until backend acknowledges schema update + JSONObject jsonObject2 = new JSONObject(); + jsonObject2.put("bar_without_default", "no_schema_update_yet"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(jsonObject2); + + int nextI = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + if (response2.get().hasUpdatedSchema()) { + nextI = i + 1; + break; + } else { + Thread.sleep(1000); + } + } + + // Write using the new schema with 10 new requests + JSONObject updatedCol = new JSONObject(); + updatedCol.put("bar_without_default", "existing_col"); + updatedCol.put("new_col_without_default", "new_col"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedCol); + for (int i = nextI; i < nextI + 10; i++) { + ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, i); + assertEquals(i, response3.get().getAppendResult().getOffset().getValue()); + } + + // List all rows to verify table data correctness + Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator(); + + // Verify 1st row (with "existing_col_before_update") + FieldValueList currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("existing_col_before_update", currentRow.get(1).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + // Check whether the recorded value is close enough. + Instant parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + + // A few rows (with "no_schema_update_yet") until the schema was updated + for (int j = 1; j < nextI; j++) { + currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue()); + // Check whether the recorded value is close enough. + parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + } + // 10 rows after schema update with new column included + for (int j = nextI; j < nextI + 10; j++) { + currentRow = rowsIter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals("existing_col", currentRow.get(1).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + // Check whether the recorded value is close enough. + parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + // Verify the new column + assertEquals("new_col", currentRow.get(3).getStringValue()); + } + assertFalse(rowsIter.hasNext()); + + // Verify that the missing value map hasn't changed + assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap()); + } + } + @Test public void testJsonStreamWriterWithFlexibleColumnName() throws IOException, InterruptedException, ExecutionException,