From 97b69e8e97627529adf861b69be4c5ad4ed19c2a Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Mon, 27 Nov 2017 11:20:28 -0800 Subject: [PATCH 1/7] Add Dataflow SpannerIO sample --- dataflow/spanner-io/pom.xml | 95 +++++++++++++++ .../java/com/example/dataflow/Spanner.java | 112 ++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 dataflow/spanner-io/pom.xml create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml new file mode 100644 index 00000000000..674e61c1319 --- /dev/null +++ b/dataflow/spanner-io/pom.xml @@ -0,0 +1,95 @@ + + + 4.0.0 + + com.example.dataflow + dataflow-spanner + 1.0-SNAPSHOT + jar + + + UTF-8 + 1.8 + 2.3.0-SNAPSHOT + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.5 + + 1.8 + 1.8 + + + + + + + + + + org.apache.beam + beam-sdks-java-core + ${apache_beam.version} + + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + ${apache_beam.version} + + + com.google.api.grpc + grpc-google-common-protos + + + + + + org.apache.beam + beam-runners-direct-java + ${apache_beam.version} + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${apache_beam.version} + + + + + com.google.cloud + google-cloud-spanner + 0.20.0-beta + + + + com.google.api.grpc + proto-google-cloud-spanner-admin-database-v1 + 0.1.9 + + + + + com.google.api.grpc + proto-google-common-protos + 0.1.9 + + + + + + org.slf4j + slf4j-jdk14 + 1.7.25 + + + + + diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java new file mode 100644 index 00000000000..031818801ce --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java @@ -0,0 +1,112 @@ +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.ToString; +import org.apache.beam.sdk.values.PCollection; + +/** + * Created by dcavazos on 10/18/17. + */ +public class Spanner { + + public interface Options extends PipelineOptions { + @Description("Spanner instance ID to query from") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Spanner database name to query from") + @Validation.Required + String getDatabaseName(); + void setDatabaseName(String value); + + @Description("Spanner table name to query from") + @Validation.Required + String getTableName(); + void setTableName(String value); + + @Description("Output filename for records count") + @Validation.Required + String getOutputCount(); + void setOutputCount(String value); + + @Description("Output filename for records size") + @Validation.Required + String getOutputSize(); + void setOutputSize(String value); + } + + private static class EstimateStructSizeFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws Exception { + Struct struct = c.element(); + long result = 0; + for (int i = 0; i < struct.getColumnCount(); i++) { + if (struct.isNull(i)) { + continue; + } + + switch (struct.getColumnType(i).getCode()) { + case BOOL: + result += 1; + break; + case INT64: + case FLOAT64: + result += 8; + break; + case BYTES: + result += struct.getBytes(i).length(); + break; + case STRING: + result += struct.getString(i).length(); + break; + case TIMESTAMP: + case DATE: + result += 12; + break; + case ARRAY: + throw new IllegalArgumentException("Arrays are not supported :("); + case STRUCT: + throw new IllegalArgumentException("Structs are not supported :("); + } + } + c.output(result); + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + Pipeline p = Pipeline.create(options); + + PCollection records = p + .apply(SpannerIO.read() + .withInstanceId(options.getInstanceId()) + .withDatabaseId(options.getDatabaseName()) + .withQuery("SELECT * FROM " + options.getTableName()) + ); + + records + .apply(Count.globally()) + .apply(ToString.elements()) + .apply(TextIO.write().to(options.getOutputCount())); + + records + .apply(ParDo.of(new EstimateStructSizeFn())) + .apply(Sum.longsGlobally()) + .apply(ToString.elements()) + .apply(TextIO.write().to(options.getOutputSize())); + + p.run().waitUntilFinish(); + } +} From 2e8aee588346b3de3edf234f9fb0b987910ce01b Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Mon, 27 Nov 2017 15:40:46 -0800 Subject: [PATCH 2/7] Removed counts and just left the size estimation --- .../java/com/example/dataflow/Spanner.java | 95 ++++++++++--------- 1 file changed, 52 insertions(+), 43 deletions(-) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java index 031818801ce..8dfdb3a82d8 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java @@ -1,3 +1,19 @@ +/* + Copyright 2016, Google, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + package com.example.dataflow; import com.google.cloud.spanner.Struct; @@ -10,17 +26,16 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.values.PCollection; -/** - * Created by dcavazos on 10/18/17. - */ public class Spanner { public interface Options extends PipelineOptions { + @Description("Spanner instance ID to query from") @Validation.Required String getInstanceId(); @@ -28,51 +43,48 @@ public interface Options extends PipelineOptions { @Description("Spanner database name to query from") @Validation.Required - String getDatabaseName(); - void setDatabaseName(String value); + String getDatabaseId(); + void setDatabaseId(String value); @Description("Spanner table name to query from") @Validation.Required String getTableName(); void setTableName(String value); - @Description("Output filename for records count") - @Validation.Required - String getOutputCount(); - void setOutputCount(String value); - @Description("Output filename for records size") @Validation.Required - String getOutputSize(); - void setOutputSize(String value); + String getOutput(); + void setOutput(String value); } - private static class EstimateStructSizeFn extends DoFn { - @ProcessElement public void processElement(ProcessContext c) throws Exception { - Struct struct = c.element(); - long result = 0; - for (int i = 0; i < struct.getColumnCount(); i++) { - if (struct.isNull(i)) { + public static class EstimateStructSizeFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Struct row = c.element(); + long sum = 0; + for (int i = 0; i < row.getColumnCount(); i++) { + if (row.isNull(i)) { continue; } - switch (struct.getColumnType(i).getCode()) { + switch (row.getColumnType(i).getCode()) { case BOOL: - result += 1; + sum += 1; break; case INT64: case FLOAT64: - result += 8; + sum += 8; + break; + case TIMESTAMP: + case DATE: + sum += 12; break; case BYTES: - result += struct.getBytes(i).length(); + sum += row.getBytes(i).length(); break; case STRING: - result += struct.getString(i).length(); - break; - case TIMESTAMP: - case DATE: - result += 12; + sum += row.getString(i).length(); break; case ARRAY: throw new IllegalArgumentException("Arrays are not supported :("); @@ -80,32 +92,29 @@ private static class EstimateStructSizeFn extends DoFn { throw new IllegalArgumentException("Structs are not supported :("); } } - c.output(result); + c.output(sum); } } public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline p = Pipeline.create(options); - PCollection records = p - .apply(SpannerIO.read() - .withInstanceId(options.getInstanceId()) - .withDatabaseId(options.getDatabaseName()) - .withQuery("SELECT * FROM " + options.getTableName()) - ); - - records - .apply(Count.globally()) - .apply(ToString.elements()) - .apply(TextIO.write().to(options.getOutputCount())); + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + String query = "SELECT * FROM " + options.getTableName(); - records + PCollection tableEstimatedSize = p + .apply(SpannerIO.read() + .withInstanceId(instanceId) + .withDatabaseId(databaseId) + .withQuery(query)) .apply(ParDo.of(new EstimateStructSizeFn())) - .apply(Sum.longsGlobally()) + .apply(Sum.longsGlobally()); + + tableEstimatedSize .apply(ToString.elements()) - .apply(TextIO.write().to(options.getOutputSize())); + .apply(TextIO.write().to(options.getOutput())); p.run().waitUntilFinish(); } From f57da144fa3064b673de18cb4aba3fdd67d47fac Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Mon, 27 Nov 2017 15:51:28 -0800 Subject: [PATCH 3/7] Tidy up dependencies --- dataflow/spanner-io/pom.xml | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml index 674e61c1319..55555c30583 100644 --- a/dataflow/spanner-io/pom.xml +++ b/dataflow/spanner-io/pom.xml @@ -63,6 +63,12 @@ + + com.google.api.grpc + proto-google-common-protos + 0.1.9 + + com.google.cloud google-cloud-spanner @@ -75,14 +81,6 @@ 0.1.9 - - - com.google.api.grpc - proto-google-common-protos - 0.1.9 - - - org.slf4j From f9de752bff201f6fd404e286356b31331441044c Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Tue, 28 Nov 2017 16:37:05 -0800 Subject: [PATCH 4/7] Add sample to write to Spanner --- .../com/example/dataflow/SpannerWrite.java | 231 ++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java new file mode 100644 index 00000000000..872b7559b9a --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -0,0 +1,231 @@ +/* + Copyright 2017, Google, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.example.dataflow; + +import com.google.cloud.spanner.Mutation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* +This sample demonstrates how to write to a Spanner table. + +## Prerequisites +* Maven installed +* Set up GCP default credentials, one of the following: + - export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json + - gcloud auth application-default login + [https://developers.google.com/identity/protocols/application-default-credentials] +* Create the Spanner table to write to, you'll need: + - Instance ID + - Database ID + - Singers Table with schema: + *singerId: INT64 NOT NULL + firstName: STRING NOT NULL + lastName: STRING NOT NULL + - Albums Table with schema: + singerId: INT64 NOT NULL + *albumId: INT64 NOT NULL + albumTitle: STRING NOT NULL + [https://cloud.google.com/spanner/docs/quickstart-console] + +## How to run +cd java-docs-samples/dataflow/spanner-io +mvn clean +mvn compile +mvn exec:java \ + -Dexec.mainClass=com.example.dataflow.SpannerWrite \ + -Dexec.args="--instanceId=my-instance-id \ + --databaseId=my-database-id \ + --singersTable=my_singers_table \ + --albumsTable=my_albums_table" +*/ + +public class SpannerWrite { + + static final String DELIMITER = "\t"; + + public interface Options extends PipelineOptions { + + @Description("Singers filename in the format: singer_id\tfirst_name\tlast_name") + @Default.String("data/singers.txt") + String getSingersFilename(); + void setSingersFilename(String value); + + @Description("Albums filename in the format: singer_id\talbum_id\talbum_title") + @Default.String("data/albums.txt") + String getAlbumsFilename(); + void setAlbumsFilename(String value); + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + void setDatabaseId(String value); + + @Description("Spanner singers table name to write to") + @Validation.Required + String getSingersTable(); + void setSingersTable(String value); + + @Description("Spanner albums table name to write to") + @Validation.Required + String getAlbumsTable(); + void setAlbumsTable(String value); + } + + @DefaultCoder(AvroCoder.class) + static class Singer { + long singerId; + String firstName; + String lastName; + + Singer() {} + + Singer(long singerId, String firstName, String lastName) { + this.singerId = singerId; + this.firstName = firstName; + this.lastName = lastName; + } + } + + @DefaultCoder(AvroCoder.class) + static class Album { + long singerId; + long albumId; + String albumTitle; + + Album() {} + + Album(long singerId, long albumId, String albumTitle) { + this.singerId = singerId; + this.albumId = albumId; + this.albumTitle = albumTitle; + } + } + + /** + * Parses each tab-delimited line into a Singer object. The line format is the following: + * singer_id\tfirstName\tlastName + */ + static class ParseSinger extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(ParseSinger.class); + + @ProcessElement + public void processElement(ProcessContext c) { + String[] columns = c.element().split(DELIMITER); + try { + Long singerId = Long.parseLong(columns[0].trim()); + String firstName = columns[1].trim(); + String lastName = columns[2].trim(); + c.output(new Singer(singerId, firstName, lastName)); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + LOG.info("ParseSinger: parse error on '" + c.element() + "': " + e.getMessage()); + } + } + } + + /** + * Parses each tab-delimited line into an Album object. The line format is the following: + * singer_id\talbumId\talbumTitle + */ + static class ParseAlbum extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(ParseAlbum.class); + + @ProcessElement + public void processElement(ProcessContext c) { + String[] columns = c.element().split(DELIMITER); + try { + Long singerId = Long.parseLong(columns[0].trim()); + Long albumId = Long.parseLong(columns[1].trim()); + String albumTitle = columns[2].trim(); + c.output(new Album(singerId, albumId, albumTitle)); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + LOG.info("ParseAlbum: parse error on '" + c.element() + "': " + e.getMessage()); + } + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + String singersTable = options.getSingersTable(); + String albumsTable = options.getAlbumsTable(); + + // Read singers from a tab-delimited file + p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename())) + // Parse the tab-delimited lines into Singer objects + .apply("ParseSingers", ParDo.of(new ParseSinger())) + // Spanner expects a Mutation object, so create it using the Singer's data + .apply("CreateSingerMutation", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Singer singer = c.element(); + c.output(Mutation.newInsertOrUpdateBuilder(singersTable) + .set("singerId").to(singer.singerId) + .set("firstName").to(singer.firstName) + .set("lastName").to(singer.lastName) + .build()); + } + })) + // Finally write the Mutations to Spanner + .apply("WriteSingers", SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId)); + + // Read albums from a tab-delimited file + p.apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) + // Parse the tab-delimited lines into Album objects + .apply("ParseAlbums", ParDo.of(new ParseAlbum())) + // Spanner expects a Mutation object, so create it using the Album's data + .apply("CreateAlbumMutation", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Album album = c.element(); + c.output(Mutation.newInsertOrUpdateBuilder(albumsTable) + .set("singerId").to(album.singerId) + .set("albumId").to(album.albumId) + .set("albumTitle").to(album.albumTitle) + .build()); + } + })) + // Finally write the Mutations to Spanner + .apply("WriteAlbums", SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId)); + + p.run().waitUntilFinish(); + } +} From 19aee9b7aa080de61fa337729ad48f84c0cc8414 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Tue, 28 Nov 2017 16:37:38 -0800 Subject: [PATCH 5/7] Add dataflow module --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 8957fedce88..bcb6cf2d19e 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,8 @@ bigquery/cloud-client bigquery/rest + dataflow/spanner-io + datastore datastore/cloud-client From 6e67266d866091436ba3048e5d6bdf2eb8813342 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Tue, 28 Nov 2017 16:38:04 -0800 Subject: [PATCH 6/7] Cleaned up dependencies --- dataflow/spanner-io/pom.xml | 38 ++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml index 55555c30583..dcbc54c8460 100644 --- a/dataflow/spanner-io/pom.xml +++ b/dataflow/spanner-io/pom.xml @@ -1,4 +1,18 @@ - + @@ -12,7 +26,9 @@ UTF-8 1.8 - 2.3.0-SNAPSHOT + 1.8 + 1.8 + 2.2.0 @@ -20,11 +36,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.5 - - 1.8 - 1.8 - + 3.7.0 @@ -63,24 +75,12 @@ - - com.google.api.grpc - proto-google-common-protos - 0.1.9 - - com.google.cloud google-cloud-spanner 0.20.0-beta - - com.google.api.grpc - proto-google-cloud-spanner-admin-database-v1 - 0.1.9 - - org.slf4j From 8cba515d28338fb14693e9bc66b5de3b8a4d030c Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Tue, 28 Nov 2017 16:38:29 -0800 Subject: [PATCH 7/7] Added more comments and a description on how to run --- .../{Spanner.java => SpannerRead.java} | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) rename dataflow/spanner-io/src/main/java/com/example/dataflow/{Spanner.java => SpannerRead.java} (72%) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java similarity index 72% rename from dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java rename to dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java index 8dfdb3a82d8..2ffea7fe208 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/Spanner.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java @@ -1,5 +1,5 @@ /* - Copyright 2016, Google, Inc. + Copyright 2017, Google, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -32,7 +32,33 @@ import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.values.PCollection; -public class Spanner { +/* +This sample demonstrates how to read from a Spanner table. + +## Prerequisites +* Maven installed +* Set up GCP default credentials, one of the following: + - export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json + - gcloud auth application-default login + [https://developers.google.com/identity/protocols/application-default-credentials] +* Create the Spanner table to read from, you'll need: + - Instance ID + - Database ID + - Any table, preferably populated + [https://cloud.google.com/spanner/docs/quickstart-console] + +## How to run +cd java-docs-samples/dataflow/spanner-io +mvn clean +mvn compile +mvn exec:java \ + -Dexec.mainClass=com.example.dataflow.SpannerRead \ + -Dexec.args="--instanceId=my-instance-id \ + --databaseId=my-database-id \ + --table=my_table \ + --output=path/to/output_file" +*/ +public class SpannerRead { public interface Options extends PipelineOptions { @@ -48,8 +74,8 @@ public interface Options extends PipelineOptions { @Description("Spanner table name to query from") @Validation.Required - String getTableName(); - void setTableName(String value); + String getTable(); + void setTable(String value); @Description("Output filename for records size") @Validation.Required @@ -57,6 +83,9 @@ public interface Options extends PipelineOptions { void setOutput(String value); } + /** + * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. + */ public static class EstimateStructSizeFn extends DoFn { @ProcessElement @@ -102,16 +131,20 @@ public static void main(String[] args) { String instanceId = options.getInstanceId(); String databaseId = options.getDatabaseId(); - String query = "SELECT * FROM " + options.getTableName(); + String query = "SELECT * FROM " + options.getTable(); PCollection tableEstimatedSize = p + // Query for all the columns and rows in the specified Spanner table .apply(SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(databaseId) .withQuery(query)) + // Estimate the size of every row .apply(ParDo.of(new EstimateStructSizeFn())) + // Sum all the row sizes to get the total estimated size of the table .apply(Sum.longsGlobally()); + // Write the total size to a file tableEstimatedSize .apply(ToString.elements()) .apply(TextIO.write().to(options.getOutput()));