From 61e4a43f3b8b0eb344f5f932161fc7ac6dae43cc Mon Sep 17 00:00:00 2001 From: Farhan Ahmed Date: Wed, 4 Oct 2023 17:14:22 -0400 Subject: [PATCH] feat: Adding CDC Sample --- .../AppendCompleteCallback.java | 62 ++++++++ .../BqToBqStorageSchemaConverter.java | 96 ++++++++++++ .../bigquerystorage/JsonWriterStreamCdc.java | 142 ++++++++++++++++++ .../JsonWriterStreamCdcIT.java | 80 ++++++++++ .../src/test/resources/ModifiedCustomers.json | 37 +++++ .../src/test/resources/NewCustomers.json | 83 ++++++++++ 6 files changed, 500 insertions(+) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/JsonWriterStreamCdcIT.java create mode 100644 samples/snippets/src/test/resources/ModifiedCustomers.json create mode 100644 samples/snippets/src/test/resources/NewCustomers.json diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java b/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java new file mode 100644 index 0000000000..4556cd66fa --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/AppendCompleteCallback.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; + +class AppendCompleteCallback implements ApiFutureCallback { + private static final Object lock = new Object(); + private static int batchCount = 0; + + public void onSuccess(AppendRowsResponse response) { + synchronized (lock) { + if (response.hasError()) { + System.out.format("Error: %s\n", response.getError()); + } else { + ++batchCount; + System.out.format("Wrote batch %d\n", batchCount); + } + } + } + + public void onFailure(Throwable throwable) { + System.out.format("Error: %s\n", throwable.toString()); + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java new file mode 100644 index 0000000000..5131618488 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Class is copied from java-bigquerystorage/samples snippet, as a temporary workaround + * to the fact there is no built-in converter between the REST object + * {@see com.google.cloud.bigquery.Schema} + * and the gRPC/Proto based {@see com.google.cloud.bigquery.storage.v1.TableSchema}. + * https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java + */ + +package com.example.bigquerystorage; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java new file mode 100644 index 0000000000..43e2f31afb --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java @@ -0,0 +1,142 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; + +public class JsonWriterStreamCdc { + + private static final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type"; + + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("Arguments: project, dataset, table, source_file"); + return; + } + + String projectId = args[0]; + String datasetName = args[1]; + String tableName = args[2]; + String dataFile = args[3]; + createDestinationTable(projectId, datasetName, tableName); + writeToDefaultStream(projectId, datasetName, tableName, dataFile); + } + + public static void createDestinationTable( + String projectId, String datasetName, String tableName) { + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + // Create a schema that matches the source data. + Schema schema = + Schema.of( + Field.of("Customer_ID", StandardSQLTypeName.STRING), + Field.of("Customer_Enrollment_Date", StandardSQLTypeName.NUMERIC), + Field.of("Customer_Address", StandardSQLTypeName.STRING), + Field.of("Customer_Tier", StandardSQLTypeName.STRING), + Field.of("Active_Subscriptions", StandardSQLTypeName.STRUCT)); + + // Create a table that uses this schema. + TableId tableId = TableId.of(projectId, datasetName, tableName); + Table table = bigquery.getTable(tableId); + if (table == null) { + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + } + } + + // writeToDefaultStream: Writes records from the source file to the destination table. + public static void writeToDefaultStream( + String projectId, String datasetName, String tableName, String dataFile) + throws DescriptorValidationException, InterruptedException, IOException { + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Get the schema of the destination table and convert to the equivalent BigQueryStorage type. + Table table = bigquery.getTable(datasetName, tableName); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + + // Use the JSON stream writer to send records in JSON format. + TableName parentTable = TableName.of(projectId, datasetName, tableName); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), + addPseudoColumns(tableSchema)) + .build()) { + // Read JSON data from the source file and send it to the Write API. + BufferedReader reader = new BufferedReader(new FileReader(dataFile)); + String line = reader.readLine(); + while (line != null) { + // As a best practice, send batches of records, instead of single records at a time. + JSONArray jsonArr = new JSONArray(); + for (int i = 0; i < 100; i++) { + JSONObject record = new JSONObject(line); + jsonArr.put(record); + line = reader.readLine(); + if (line == null) { + break; + } + } // batch + ApiFuture future = writer.append(jsonArr); + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback( + future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + } + } + } + + private static TableSchema addPseudoColumns(TableSchema tableSchema) { + return tableSchema + .toBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName(CHANGE_TYPE_PSEUDO_COLUMN) + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/JsonWriterStreamCdcIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/JsonWriterStreamCdcIT.java new file mode 100644 index 0000000000..5f52cad156 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/JsonWriterStreamCdcIT.java @@ -0,0 +1,80 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class JsonWriterStreamCdcIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + private BigQuery bigquery; + private String datasetName; + + @BeforeClass + public static void beforeClass() {} + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset for each test. + datasetName = "JAVA_WRITER_STREAM_CDC_TEST" + UUID.randomUUID().toString().substring(0, 8); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + } + + @Test + public void testJsonWriterDefaultStream() throws Exception { + Path dataFilePath = FileSystems.getDefault().getPath("src/test/resources", "NewCustomers.json"); + + System.out.println(dataFilePath.toString()); + String[] args = {GOOGLE_CLOUD_PROJECT, datasetName, "github", dataFilePath.toString()}; + JsonWriterStreamCdc.main(args); + assertThat(bout.toString()).contains("Wrote batch"); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); + } +} diff --git a/samples/snippets/src/test/resources/ModifiedCustomers.json b/samples/snippets/src/test/resources/ModifiedCustomers.json new file mode 100644 index 0000000000..171bd56d5f --- /dev/null +++ b/samples/snippets/src/test/resources/ModifiedCustomers.json @@ -0,0 +1,37 @@ +[ + { + "Customer_ID": 1, + "Customer_Enrollment_Date": 19301, + "Customer_Name": "Nick_2.0", + "Customer_Address": "1600 Amphitheatre Pkwy, Mountain View, CA", + "Customer_Tier": "Platinum", + "Active_Subscriptions": "{\"Internet_Subscription\":\"Paid\",\"Music_Subscription\":\"Paid\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 2, + "Customer_Enrollment_Date": 19318, + "Customer_Name": "Heather", + "Customer_Address": "285 Fulton St, New York, NY", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"TV_Subscription\":\"Free\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 7, + "_CHANGE_TYPE": "DELETE" + }, + { + "Customer_ID": 8, + "_CHANGE_TYPE": "DELETE" + }, + { + "Customer_ID": 10, + "Customer_Enrollment_Date": 19410, + "Customer_Name": "Melody", + "Customer_Address": "345 Spear St, San Francisco, CA", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"Music_Subscription\":\"Free\"}", + "_CHANGE_TYPE": "UPSERT" + } +] diff --git a/samples/snippets/src/test/resources/NewCustomers.json b/samples/snippets/src/test/resources/NewCustomers.json new file mode 100644 index 0000000000..20542b9554 --- /dev/null +++ b/samples/snippets/src/test/resources/NewCustomers.json @@ -0,0 +1,83 @@ +[ + { + "Customer_ID": 1, + "Customer_Enrollment_Date": 19301, + "Customer_Name": "Nick", + "Customer_Address": "1600 Amphitheatre Pkwy, Mountain View, CA", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"Internet_Subscription\":\"Trial\",\"Music_Subscription\":\"Free\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 2, + "Customer_Enrollment_Date": 19318, + "Customer_Name": "Heather", + "Customer_Address": "350 Fifth Avenue, New York, NY", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 3, + "Customer_Enrollment_Date": 19250, + "Customer_Name": "Lyle", + "Customer_Address": "10 Downing Street, London, England", + "Customer_Tier": "Enterprise", + "Active_Subscriptions": "{\"Internet_Subscription\":\"Paid\",\"Music_Subscription\":\"Paid\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 4, + "Customer_Enrollment_Date": 19140, + "Customer_Name": "Heidi", + "Customer_Address": "4059 Mt Lee Dr., Hollywood, CA", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"TV_Subscription\":\"Free\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 5, + "Customer_Enrollment_Date": 19299, + "Customer_Name": "Paul", + "Customer_Address": "221 B Baker St, London, England", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"Music_Subscription\":\"Free\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 6, + "Customer_Enrollment_Date": 19329, + "Customer_Name": "Dylan", + "Customer_Address": "1 Dr Carlton B Goodlett Pl, San Francisco, CA", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"TV_Subscription\":\"Trial\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 7, + "Customer_Enrollment_Date": 19400, + "Customer_Name": "Monica", + "Customer_Address": "Piazza del Colosseo, 1, 00184 Roma RM, Italy", + "Customer_Tier": "Commercial", + "Active_Subscriptions": "{\"Internet_Subscription\":\"Paid\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 8, + "Customer_Enrollment_Date": 19377, + "Customer_Name": "Katie", + "Customer_Address": "11 Wall Street, New York, NY", + "Customer_Tier": "Enterprise", + "Active_Subscriptions": "{\"Music_Subscription\":\"Paid\"}", + "_CHANGE_TYPE": "UPSERT" + }, + { + "Customer_ID": 9, + "Customer_Enrollment_Date": 19410, + "Customer_Name": "Jeremy", + "Customer_Address": "1600 Pennsylvania Avenue, Washington DC", + "Customer_Tier": "Enterprise", + "Active_Subscriptions": "{\"Internet_Subscription\":\"Paid\",\"TV_Subscription\":\"Paid\",\"Music_Subscription\":\"Trial\"}", + "_CHANGE_TYPE": "UPSERT" + } +]