diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index eb614ff48e..a16273db44 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Message; import java.io.IOException; import java.util.logging.Logger; @@ -34,7 +35,10 @@ /** * A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is * built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then - * calls StreamWriter's append() method to write to BigQuery tables. + * calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter + * functions, but also provides an additional feature: schema update support, where if the BigQuery + * table schema is updated, users will be able to ingest data on the new schema after some time (in + * order of minutes). */ public class JsonStreamWriter implements AutoCloseable { private static String streamPatternString = @@ -81,27 +85,49 @@ private JsonStreamWriter(Builder builder) /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON * data to protobuf messages, then using StreamWriter's append() to write the data at current end - * of stream. + * of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter + * is created with the updated TableSchema. * * @param jsonArr The JSON array that contains JSONObjects to be written * @return ApiFuture returns an AppendRowsResponse message wrapped in an * ApiFuture */ - public ApiFuture append(JSONArray jsonArr) { + public ApiFuture append(JSONArray jsonArr) + throws IOException, DescriptorValidationException { return append(jsonArr, -1); } /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON * data to protobuf messages, then using StreamWriter's append() to write the data at the - * specified offset. + * specified offset. If there is a schema update, the current StreamWriter is closed. A new + * StreamWriter is created with the updated TableSchema. * * @param jsonArr The JSON array that contains JSONObjects to be written * @param offset Offset for deduplication * @return ApiFuture returns an AppendRowsResponse message wrapped in an * ApiFuture */ - public ApiFuture append(JSONArray jsonArr, long offset) { + public ApiFuture append(JSONArray jsonArr, long offset) + throws IOException, DescriptorValidationException { + // Handle schema updates in a Thread-safe way by locking down the operation + synchronized (this) { + TableSchema updatedSchema = this.streamWriter.getUpdatedSchema(); + if (updatedSchema != null) { + // Close the StreamWriter + this.streamWriter.close(); + // Update JsonStreamWriter's TableSchema and Descriptor + this.tableSchema = updatedSchema; + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); + // Create a new underlying StreamWriter with the updated TableSchema and Descriptor + this.streamWriter = + streamWriterBuilder + .setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)) + .build(); + } + } + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); // Any error in convertJsonToProtoMessage will throw an // IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing @@ -155,9 +181,9 @@ private void setStreamWriterSettings( streamWriterBuilder.setEndpoint(endpoint); } if (traceId != null) { - streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId); + streamWriterBuilder.setTraceId("JsonWriter_" + traceId); } else { - streamWriterBuilder.setTraceId("JsonWriterBeta:null"); + streamWriterBuilder.setTraceId("JsonWriter:null"); } if (flowControlSettings != null) { if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 1cac296a03..8845e85743 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -44,8 +44,6 @@ * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * *

TODO: Support batching. - * - *

TODO: Support schema change. */ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); @@ -135,6 +133,12 @@ public class StreamWriter implements AutoCloseable { @GuardedBy("lock") private final Deque inflightRequestQueue; + /* + * Contains the updated TableSchema. + */ + @GuardedBy("lock") + private TableSchema updatedSchema; + /* * A client used to interact with BigQuery. */ @@ -526,6 +530,9 @@ private void cleanupInflightRequests() { private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); + if (response.hasUpdatedSchema()) { + this.updatedSchema = response.getUpdatedSchema(); + } try { // Had a successful connection with at least one result, reset retries. // conectionRetryCountWithoutCallback is reset so that only multiple retries, without @@ -622,7 +629,12 @@ public static StreamWriter.Builder newBuilder(String streamName) { return new StreamWriter.Builder(streamName); } - /** A builder of {@link StreamWriterV2}s. */ + /** Thread-safe getter of updated TableSchema */ + public synchronized TableSchema getUpdatedSchema() { + return this.updatedSchema; + } + + /** A builder of {@link StreamWriter}s. */ public static final class Builder { private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; @@ -649,6 +661,8 @@ public static final class Builder { private String traceId = null; + private TableSchema updatedTableSchema = null; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index cebd8cc7c2..24475c03e3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.gax.core.ExecutorProvider; @@ -26,6 +27,7 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.JsonTest; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; @@ -193,7 +195,7 @@ public void testSingleAppendSimpleJson() throws Exception { .getSerializedRows(0), expectedProto.toByteString()); assertEquals( - testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty"); + testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty"); } } @@ -284,8 +286,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception { .getProtoRows() .getRows() .getSerializedRowsCount()); - assertEquals( - testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null"); + assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null"); for (int i = 0; i < 4; i++) { assertEquals( testBigQueryWrite @@ -388,4 +389,111 @@ public void testCreateDefaultStream() throws Exception { assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName()); } } + + @Test + public void testSimpleSchemaUpdate() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); + // First append + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = writer.append(jsonArr); + ApiFuture appendFuture2 = writer.append(jsonArr); + ApiFuture appendFuture3 = writer.append(jsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("aaa").build().toByteString()); + + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("aaa").build().toByteString()); + + // Second append with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + ApiFuture appendFuture4 = writer.append(updatedJsonArr); + + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + assertEquals(4, testBigQueryWrite.getAppendRequests().size()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(3) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString()); + + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema() + || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 9f65a74ca7..a6b386f2b2 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -28,11 +28,14 @@ import com.google.cloud.bigquery.storage.test.Test.*; import com.google.cloud.bigquery.storage.v1.*; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.common.collect.ImmutableList; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import org.json.JSONArray; import org.json.JSONObject; @@ -406,6 +409,215 @@ public void testJsonStreamWriterWithDefaultStream() } } + @Test + public void testJsonStreamWriterSchemaUpdate() + throws DescriptorValidationException, IOException, InterruptedException, ExecutionException { + String tableName = "SchemaUpdateTestTable"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema originalSchema = Schema.of(col1); + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + // write the 1st row + JSONObject foo = new JSONObject(); + foo.put("col1", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture response = jsonStreamWriter.append(jsonArr, 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + + // update schema with a new column + Field col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build(); + Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2)); + TableInfo updatedTableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build(); + Table updatedTable = bigquery.update(updatedTableInfo); + assertEquals(updatedSchema, updatedTable.getDefinition().getSchema()); + + // continue writing rows until backend acknowledges schema update + JSONObject foo2 = new JSONObject(); + foo2.put("col1", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo2); + + int next = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = jsonStreamWriter.append(jsonArr2, i); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); + if (response2.get().hasUpdatedSchema()) { + next = i; + break; + } else { + Thread.sleep(1000); + } + } + + // write rows with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("col1", "ccc"); + updatedFoo.put("col2", "ddd"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + for (int i = 0; i < 10; i++) { + ApiFuture response3 = + jsonStreamWriter.append(updatedJsonArr, next + 1 + i); + assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); + } + + // verify table data correctness + Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator(); + // 1 row of aaa + assertEquals("aaa", rowsIter.next().get(0).getStringValue()); + // a few rows of bbb + for (int j = 1; j <= next; j++) { + assertEquals("bbb", rowsIter.next().get(0).getStringValue()); + } + // 10 rows of ccc, ddd + for (int j = next + 1; j < next + 1 + 10; j++) { + FieldValueList temp = rowsIter.next(); + assertEquals("ccc", temp.get(0).getStringValue()); + assertEquals("ddd", temp.get(1).getStringValue()); + } + assertFalse(rowsIter.hasNext()); + } + } + + @Test + public void testJsonStreamWriterSchemaUpdateConcurrent() + throws DescriptorValidationException, IOException, InterruptedException { + // Create test table and test stream + String tableName = "ConcurrentSchemaUpdateTestTable"; + TableId tableId = TableId.of(DATASET, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema originalSchema = Schema.of(col1); + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + + // Create test JSON objects + JSONObject foo = new JSONObject(); + foo.put("col1", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + JSONObject foo2 = new JSONObject(); + foo2.put("col1", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo2); + + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("col1", "ccc"); + updatedFoo.put("col2", "ddd"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + // Prepare updated schema + Field col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build(); + Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2)); + TableInfo updatedTableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build(); + + // Start writing using the JsonWriter + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + int numberOfThreads = 5; + ExecutorService streamTaskExecutor = Executors.newFixedThreadPool(5); + CountDownLatch latch = new CountDownLatch(numberOfThreads); + // Used to verify data correctness + AtomicInteger next = new AtomicInteger(); + + // update TableSchema async + Runnable updateTableSchemaTask = + () -> { + Table updatedTable = bigquery.update(updatedTableInfo); + assertEquals(updatedSchema, updatedTable.getDefinition().getSchema()); + }; + streamTaskExecutor.execute(updateTableSchemaTask); + + // stream data async + for (int i = 0; i < numberOfThreads; i++) { + streamTaskExecutor.submit( + () -> { + // write 2 rows of aaa on each Thread + for (int j = 0; j < 2; j++) { + try { + jsonStreamWriter.append(jsonArr); + next.getAndIncrement(); + } catch (IOException | DescriptorValidationException e) { + e.printStackTrace(); + } + } + + // write filler rows bbb until backend acknowledges schema update due to possible + // delay + for (int w = 0; w < 15; w++) { + ApiFuture response2 = null; + try { + response2 = jsonStreamWriter.append(jsonArr2); + next.getAndIncrement(); + } catch (IOException | DescriptorValidationException e) { + LOG.severe("Issue with append " + e.getMessage()); + } + try { + assert response2 != null; + if (response2.get().hasUpdatedSchema()) { + break; + } else { + Thread.sleep(1000); + } + } catch (InterruptedException | ExecutionException e) { + LOG.severe("Issue with append " + e.getMessage()); + } + } + + // write 5 rows of ccc,ddd on each Thread + for (int m = 0; m < 5; m++) { + try { + jsonStreamWriter.append(updatedJsonArr); + next.getAndIncrement(); + } catch (IOException | DescriptorValidationException e) { + LOG.severe("Issue with append " + e.getMessage()); + } + } + latch.countDown(); + }); + } + latch.await(); + + // verify that the last 5 rows streamed are ccc,ddd + Iterator rowsIter = bigquery.listTableData(tableId).getValues().iterator(); + + int position = 0; + while (rowsIter.hasNext()) { + FieldValueList row = rowsIter.next(); + position++; + if (position > next.get() - 5) { + assertEquals("ccc", row.get(0).getStringValue()); + assertEquals("ddd", row.get(1).getStringValue()); + } + } + } + } + @Test public void testComplicateSchemaWithPendingStream() throws IOException, InterruptedException, ExecutionException {