diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml new file mode 100644 index 00000000000..dcbc54c8460 --- /dev/null +++ b/dataflow/spanner-io/pom.xml @@ -0,0 +1,93 @@ + + + 4.0.0 + + com.example.dataflow + dataflow-spanner + 1.0-SNAPSHOT + jar + + + UTF-8 + 1.8 + 1.8 + 1.8 + 2.2.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.7.0 + + + + + + + + + org.apache.beam + beam-sdks-java-core + ${apache_beam.version} + + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + ${apache_beam.version} + + + com.google.api.grpc + grpc-google-common-protos + + + + + + org.apache.beam + beam-runners-direct-java + ${apache_beam.version} + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${apache_beam.version} + + + + + com.google.cloud + google-cloud-spanner + 0.20.0-beta + + + + + org.slf4j + slf4j-jdk14 + 1.7.25 + + + + + diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java new file mode 100644 index 00000000000..2ffea7fe208 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java @@ -0,0 +1,154 @@ +/* + Copyright 2017, Google, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.ToString; +import org.apache.beam.sdk.values.PCollection; + +/* +This sample demonstrates how to read from a Spanner table. + +## Prerequisites +* Maven installed +* Set up GCP default credentials, one of the following: + - export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json + - gcloud auth application-default login + [https://developers.google.com/identity/protocols/application-default-credentials] +* Create the Spanner table to read from, you'll need: + - Instance ID + - Database ID + - Any table, preferably populated + [https://cloud.google.com/spanner/docs/quickstart-console] + +## How to run +cd java-docs-samples/dataflow/spanner-io +mvn clean +mvn compile +mvn exec:java \ + -Dexec.mainClass=com.example.dataflow.SpannerRead \ + -Dexec.args="--instanceId=my-instance-id \ + --databaseId=my-database-id \ + --table=my_table \ + --output=path/to/output_file" +*/ +public class SpannerRead { + + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to query from") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Spanner database name to query from") + @Validation.Required + String getDatabaseId(); + void setDatabaseId(String value); + + @Description("Spanner table name to query from") + @Validation.Required + String getTable(); + void setTable(String value); + + @Description("Output filename for records size") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + /** + * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. + */ + public static class EstimateStructSizeFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Struct row = c.element(); + long sum = 0; + for (int i = 0; i < row.getColumnCount(); i++) { + if (row.isNull(i)) { + continue; + } + + switch (row.getColumnType(i).getCode()) { + case BOOL: + sum += 1; + break; + case INT64: + case FLOAT64: + sum += 8; + break; + case TIMESTAMP: + case DATE: + sum += 12; + break; + case BYTES: + sum += row.getBytes(i).length(); + break; + case STRING: + sum += row.getString(i).length(); + break; + case ARRAY: + throw new IllegalArgumentException("Arrays are not supported :("); + case STRUCT: + throw new IllegalArgumentException("Structs are not supported :("); + } + } + c.output(sum); + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + String query = "SELECT * FROM " + options.getTable(); + + PCollection tableEstimatedSize = p + // Query for all the columns and rows in the specified Spanner table + .apply(SpannerIO.read() + .withInstanceId(instanceId) + .withDatabaseId(databaseId) + .withQuery(query)) + // Estimate the size of every row + .apply(ParDo.of(new EstimateStructSizeFn())) + // Sum all the row sizes to get the total estimated size of the table + .apply(Sum.longsGlobally()); + + // Write the total size to a file + tableEstimatedSize + .apply(ToString.elements()) + .apply(TextIO.write().to(options.getOutput())); + + p.run().waitUntilFinish(); + } +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java new file mode 100644 index 00000000000..872b7559b9a --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -0,0 +1,231 @@ +/* + Copyright 2017, Google, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.example.dataflow; + +import com.google.cloud.spanner.Mutation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* +This sample demonstrates how to write to a Spanner table. + +## Prerequisites +* Maven installed +* Set up GCP default credentials, one of the following: + - export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json + - gcloud auth application-default login + [https://developers.google.com/identity/protocols/application-default-credentials] +* Create the Spanner table to write to, you'll need: + - Instance ID + - Database ID + - Singers Table with schema: + *singerId: INT64 NOT NULL + firstName: STRING NOT NULL + lastName: STRING NOT NULL + - Albums Table with schema: + singerId: INT64 NOT NULL + *albumId: INT64 NOT NULL + albumTitle: STRING NOT NULL + [https://cloud.google.com/spanner/docs/quickstart-console] + +## How to run +cd java-docs-samples/dataflow/spanner-io +mvn clean +mvn compile +mvn exec:java \ + -Dexec.mainClass=com.example.dataflow.SpannerWrite \ + -Dexec.args="--instanceId=my-instance-id \ + --databaseId=my-database-id \ + --singersTable=my_singers_table \ + --albumsTable=my_albums_table" +*/ + +public class SpannerWrite { + + static final String DELIMITER = "\t"; + + public interface Options extends PipelineOptions { + + @Description("Singers filename in the format: singer_id\tfirst_name\tlast_name") + @Default.String("data/singers.txt") + String getSingersFilename(); + void setSingersFilename(String value); + + @Description("Albums filename in the format: singer_id\talbum_id\talbum_title") + @Default.String("data/albums.txt") + String getAlbumsFilename(); + void setAlbumsFilename(String value); + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + void setDatabaseId(String value); + + @Description("Spanner singers table name to write to") + @Validation.Required + String getSingersTable(); + void setSingersTable(String value); + + @Description("Spanner albums table name to write to") + @Validation.Required + String getAlbumsTable(); + void setAlbumsTable(String value); + } + + @DefaultCoder(AvroCoder.class) + static class Singer { + long singerId; + String firstName; + String lastName; + + Singer() {} + + Singer(long singerId, String firstName, String lastName) { + this.singerId = singerId; + this.firstName = firstName; + this.lastName = lastName; + } + } + + @DefaultCoder(AvroCoder.class) + static class Album { + long singerId; + long albumId; + String albumTitle; + + Album() {} + + Album(long singerId, long albumId, String albumTitle) { + this.singerId = singerId; + this.albumId = albumId; + this.albumTitle = albumTitle; + } + } + + /** + * Parses each tab-delimited line into a Singer object. The line format is the following: + * singer_id\tfirstName\tlastName + */ + static class ParseSinger extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(ParseSinger.class); + + @ProcessElement + public void processElement(ProcessContext c) { + String[] columns = c.element().split(DELIMITER); + try { + Long singerId = Long.parseLong(columns[0].trim()); + String firstName = columns[1].trim(); + String lastName = columns[2].trim(); + c.output(new Singer(singerId, firstName, lastName)); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + LOG.info("ParseSinger: parse error on '" + c.element() + "': " + e.getMessage()); + } + } + } + + /** + * Parses each tab-delimited line into an Album object. The line format is the following: + * singer_id\talbumId\talbumTitle + */ + static class ParseAlbum extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(ParseAlbum.class); + + @ProcessElement + public void processElement(ProcessContext c) { + String[] columns = c.element().split(DELIMITER); + try { + Long singerId = Long.parseLong(columns[0].trim()); + Long albumId = Long.parseLong(columns[1].trim()); + String albumTitle = columns[2].trim(); + c.output(new Album(singerId, albumId, albumTitle)); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + LOG.info("ParseAlbum: parse error on '" + c.element() + "': " + e.getMessage()); + } + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + String singersTable = options.getSingersTable(); + String albumsTable = options.getAlbumsTable(); + + // Read singers from a tab-delimited file + p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename())) + // Parse the tab-delimited lines into Singer objects + .apply("ParseSingers", ParDo.of(new ParseSinger())) + // Spanner expects a Mutation object, so create it using the Singer's data + .apply("CreateSingerMutation", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Singer singer = c.element(); + c.output(Mutation.newInsertOrUpdateBuilder(singersTable) + .set("singerId").to(singer.singerId) + .set("firstName").to(singer.firstName) + .set("lastName").to(singer.lastName) + .build()); + } + })) + // Finally write the Mutations to Spanner + .apply("WriteSingers", SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId)); + + // Read albums from a tab-delimited file + p.apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) + // Parse the tab-delimited lines into Album objects + .apply("ParseAlbums", ParDo.of(new ParseAlbum())) + // Spanner expects a Mutation object, so create it using the Album's data + .apply("CreateAlbumMutation", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Album album = c.element(); + c.output(Mutation.newInsertOrUpdateBuilder(albumsTable) + .set("singerId").to(album.singerId) + .set("albumId").to(album.albumId) + .set("albumTitle").to(album.albumTitle) + .build()); + } + })) + // Finally write the Mutations to Spanner + .apply("WriteAlbums", SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId)); + + p.run().waitUntilFinish(); + } +} diff --git a/pom.xml b/pom.xml index 881222883dc..86668e917fd 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,8 @@ bigquery/cloud-client bigquery/rest + dataflow/spanner-io + datastore datastore/cloud-client