diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml
index 65700a9645f..165efbffa65 100644
--- a/dataflow/spanner-io/pom.xml
+++ b/dataflow/spanner-io/pom.xml
@@ -48,6 +48,16 @@
maven-compiler-plugin
3.7.0
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.19.1
+
+
+ default-instance
+
+
+
@@ -84,13 +94,6 @@
${apache_beam.version}
-
-
- com.google.cloud
- google-cloud-spanner
- 0.34.0-beta
-
-
org.slf4j
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java
new file mode 100644
index 00000000000..5f393d3ed38
--- /dev/null
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import com.google.cloud.spanner.Struct;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Estimates the size of the {@code Struct}.
+ */
+public class EstimateSize extends PTransform, PCollection> {
+
+ public static EstimateSize create() {
+ return new EstimateSize();
+ }
+
+ private EstimateSize() {
+ }
+
+ @Override
+ public PCollection expand(PCollection input) {
+ return input.apply(ParDo.of(new EstimateStructSizeFn()));
+ }
+
+ /**
+ * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
+ */
+ public static class EstimateStructSizeFn extends DoFn {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ Struct row = c.element();
+ long sum = 0;
+ for (int i = 0; i < row.getColumnCount(); i++) {
+ if (row.isNull(i)) {
+ continue;
+ }
+
+ switch (row.getColumnType(i).getCode()) {
+ case BOOL:
+ sum += 1;
+ break;
+ case INT64:
+ case FLOAT64:
+ sum += 8;
+ break;
+ case TIMESTAMP:
+ case DATE:
+ sum += 12;
+ break;
+ case BYTES:
+ sum += row.getBytes(i).length();
+ break;
+ case STRING:
+ sum += row.getString(i).length();
+ break;
+ case ARRAY:
+ throw new IllegalArgumentException("Arrays are not supported :(");
+ case STRUCT:
+ throw new IllegalArgumentException("Structs are not supported :(");
+ default:
+ throw new IllegalArgumentException("Unsupported type :(");
+ }
+ }
+ c.output(sum);
+ }
+ }
+
+}
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java
new file mode 100644
index 00000000000..e1db90f1821
--- /dev/null
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Mutation;
+import com.google.common.base.Charsets;
+import com.google.common.hash.Hashing;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * This sample demonstrates how to group together mutations when writing to the Cloud Spanner
+ * database.
+ */
+public class SpannerGroupWrite {
+ public interface Options extends PipelineOptions {
+
+ @Description("Spanner instance ID to write to")
+ @Validation.Required
+ String getInstanceId();
+
+ void setInstanceId(String value);
+
+ @Description("Spanner database name to write to")
+ @Validation.Required
+ String getDatabaseId();
+
+ void setDatabaseId(String value);
+
+ @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name")
+ @Validation.Required
+ String getSuspiciousUsersFile();
+
+ void setSuspiciousUsersFile(String value);
+
+ }
+
+ public static void main(String[] args) {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+
+ String instanceId = options.getInstanceId();
+ String databaseId = options.getDatabaseId();
+
+ String usersIdFile = options.getSuspiciousUsersFile();
+
+ PCollection suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile));
+
+ final Timestamp timestamp = Timestamp.now();
+
+ // [START spanner_dataflow_writegroup]
+ PCollection mutations = suspiciousUserIds
+ .apply(MapElements.via(new SimpleFunction() {
+
+ @Override
+ public MutationGroup apply(String userId) {
+ // Immediately block the user.
+ Mutation userMutation = Mutation.newUpdateBuilder("Users")
+ .set("id").to(userId)
+ .set("state").to("BLOCKED")
+ .build();
+ long generatedId = Hashing.sha1().newHasher()
+ .putString(userId, Charsets.UTF_8)
+ .putLong(timestamp.getSeconds())
+ .putLong(timestamp.getNanos())
+ .hash()
+ .asLong();
+
+ // Add an entry to pending review requests.
+ Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
+ .set("id").to(generatedId) // Must be deterministically generated.
+ .set("userId").to(userId)
+ .set("action").to("REVIEW ACCOUNT")
+ .set("note").to("Suspicious activity detected.")
+ .build();
+
+ return MutationGroup.create(userMutation, pendingReview);
+ }
+ }));
+
+ mutations.apply(SpannerIO.write()
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId)
+ .grouped());
+ // [END spanner_dataflow_writegroup]
+
+ p.run().waitUntilFinish();
+
+ }
+
+}
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java
index 330c560cb92..439549f245f 100644
--- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java
@@ -24,10 +24,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.values.PCollection;
@@ -87,49 +83,6 @@ public interface Options extends PipelineOptions {
void setOutput(String value);
}
- /**
- * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
- */
- public static class EstimateStructSizeFn extends DoFn {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Struct row = c.element();
- long sum = 0;
- for (int i = 0; i < row.getColumnCount(); i++) {
- if (row.isNull(i)) {
- continue;
- }
-
- switch (row.getColumnType(i).getCode()) {
- case BOOL:
- sum += 1;
- break;
- case INT64:
- case FLOAT64:
- sum += 8;
- break;
- case TIMESTAMP:
- case DATE:
- sum += 12;
- break;
- case BYTES:
- sum += row.getBytes(i).length();
- break;
- case STRING:
- sum += row.getString(i).length();
- break;
- case ARRAY:
- throw new IllegalArgumentException("Arrays are not supported :(");
- case STRUCT:
- throw new IllegalArgumentException("Structs are not supported :(");
- default:
- throw new IllegalArgumentException("Unsupported type :(");
- }
- }
- c.output(sum);
- }
- }
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
@@ -137,23 +90,26 @@ public static void main(String[] args) {
String instanceId = options.getInstanceId();
String databaseId = options.getDatabaseId();
- String query = "SELECT * FROM " + options.getTable();
-
- PCollection tableEstimatedSize = p
- // Query for all the columns and rows in the specified Spanner table
- .apply(SpannerIO.read()
+ // [START spanner_dataflow_read]
+ // Query for all the columns and rows in the specified Spanner table
+ PCollection records = p.apply(
+ SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(databaseId)
- .withQuery(query))
+ .withQuery("SELECT * FROM " + options.getTable()));
+ // [START spanner_dataflow_read]
+
+
+ PCollection tableEstimatedSize = records
// Estimate the size of every row
- .apply(ParDo.of(new EstimateStructSizeFn()))
+ .apply(EstimateSize.create())
// Sum all the row sizes to get the total estimated size of the table
.apply(Sum.longsGlobally());
// Write the total size to a file
tableEstimatedSize
.apply(ToString.elements())
- .apply(TextIO.write().to(options.getOutput()));
+ .apply(TextIO.write().to(options.getOutput()).withoutSharding());
p.run().waitUntilFinish();
}
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java
new file mode 100644
index 00000000000..b0eb417ced5
--- /dev/null
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import com.google.cloud.spanner.Struct;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.ToString;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * This sample demonstrates how to read all data from the Cloud Spanner database.
+ */
+public class SpannerReadAll {
+
+ public interface Options extends PipelineOptions {
+
+ @Description("Spanner instance ID to query from")
+ @Validation.Required
+ String getInstanceId();
+
+ void setInstanceId(String value);
+
+ @Description("Spanner database name to query from")
+ @Validation.Required
+ String getDatabaseId();
+
+ void setDatabaseId(String value);
+
+ @Description("Output filename for records size")
+ @Validation.Required
+ String getOutput();
+
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+
+ SpannerConfig spannerConfig = SpannerConfig.create()
+ .withInstanceId(options.getInstanceId())
+ .withDatabaseId(options.getDatabaseId());
+ // [START spanner_dataflow_readall]
+ PCollection allRecords = p.apply(SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
+ + ".table_catalog = '' AND t.table_schema = ''")).apply(
+ MapElements.into(TypeDescriptor.of(ReadOperation.class))
+ .via((SerializableFunction) input -> {
+ String tableName = input.getString(0);
+ return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
+ })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
+ // [END spanner_dataflow_readall]
+
+ PCollection dbEstimatedSize = allRecords.apply(EstimateSize.create())
+ .apply(Sum.longsGlobally());
+
+ dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput())
+ .withoutSharding());
+
+ p.run().waitUntilFinish();
+ }
+
+}
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java
index 67502c05fdc..09e9b3e301a 100644
--- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java
@@ -22,13 +22,13 @@
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
-import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +61,7 @@
mvn exec:java \
-Dexec.mainClass=com.example.dataflow.SpannerWrite \
-Dexec.args="--instanceId=my-instance-id \
- --databaseId=my-database-id \
- --singersTable=my_singers_table \
- --albumsTable=my_albums_table"
+ --databaseId=my-database-id
*/
public class SpannerWrite {
@@ -73,13 +71,11 @@ public class SpannerWrite {
public interface Options extends PipelineOptions {
@Description("Singers filename in the format: singer_id\tfirst_name\tlast_name")
- @Default.String("data/singers.txt")
String getSingersFilename();
void setSingersFilename(String value);
@Description("Albums filename in the format: singer_id\talbum_id\talbum_title")
- @Default.String("data/albums.txt")
String getAlbumsFilename();
void setAlbumsFilename(String value);
@@ -95,18 +91,6 @@ public interface Options extends PipelineOptions {
String getDatabaseId();
void setDatabaseId(String value);
-
- @Description("Spanner singers table name to write to")
- @Validation.Required
- String getSingersTable();
-
- void setSingersTable(String value);
-
- @Description("Spanner albums table name to write to")
- @Validation.Required
- String getAlbumsTable();
-
- void setAlbumsTable(String value);
}
@DefaultCoder(AvroCoder.class)
@@ -187,8 +171,6 @@ public static void main(String[] args) {
String instanceId = options.getInstanceId();
String databaseId = options.getDatabaseId();
- String singersTable = options.getSingersTable();
- String albumsTable = options.getAlbumsTable();
// Read singers from a tab-delimited file
p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename()))
@@ -199,7 +181,7 @@ public static void main(String[] args) {
@ProcessElement
public void processElement(ProcessContext c) {
Singer singer = c.element();
- c.output(Mutation.newInsertOrUpdateBuilder(singersTable)
+ c.output(Mutation.newInsertOrUpdateBuilder("singers")
.set("singerId").to(singer.singerId)
.set("firstName").to(singer.firstName)
.set("lastName").to(singer.lastName)
@@ -212,25 +194,30 @@ public void processElement(ProcessContext c) {
.withDatabaseId(databaseId));
// Read albums from a tab-delimited file
- p.apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename()))
+ PCollection albums = p
+ .apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename()))
// Parse the tab-delimited lines into Album objects
- .apply("ParseAlbums", ParDo.of(new ParseAlbum()))
+ .apply("ParseAlbums", ParDo.of(new ParseAlbum()));
+
+ // [START spanner_dataflow_write]
+ albums
// Spanner expects a Mutation object, so create it using the Album's data
.apply("CreateAlbumMutation", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
Album album = c.element();
- c.output(Mutation.newInsertOrUpdateBuilder(albumsTable)
+ c.output(Mutation.newInsertOrUpdateBuilder("albums")
.set("singerId").to(album.singerId)
.set("albumId").to(album.albumId)
.set("albumTitle").to(album.albumTitle)
.build());
}
}))
- // Finally write the Mutations to Spanner
+ // Write mutations to Spanner
.apply("WriteAlbums", SpannerIO.write()
.withInstanceId(instanceId)
.withDatabaseId(databaseId));
+ // [END spanner_dataflow_write]
p.run().waitUntilFinish();
}
diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java
new file mode 100644
index 00000000000..155a9e51a13
--- /dev/null
+++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.TimestampBound;
+import com.google.common.base.Joiner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.Transaction;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+public class TransactionalRead {
+
+ private static final String DELIMITER = "\t";
+
+ public interface Options extends PipelineOptions {
+
+ @Description("Spanner instance ID to write to")
+ @Validation.Required
+ String getInstanceId();
+
+ void setInstanceId(String value);
+
+ @Description("Spanner database name to write to")
+ @Validation.Required
+ String getDatabaseId();
+
+ void setDatabaseId(String value);
+
+ @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name")
+ String getSingersFilename();
+
+ void setSingersFilename(String value);
+
+ @Description("Albums output filename in the format: singer_id\talbum_id\talbum_title")
+ String getAlbumsFilename();
+
+ void setAlbumsFilename(String value);
+
+ }
+
+ @DefaultCoder(AvroCoder.class)
+ static class Singer {
+
+ long singerId;
+ String firstName;
+ String lastName;
+
+ Singer() {
+ }
+
+ Singer(long singerId, String firstName, String lastName) {
+ this.singerId = singerId;
+ this.firstName = firstName;
+ this.lastName = lastName;
+ }
+ }
+
+ @DefaultCoder(AvroCoder.class)
+ static class Album {
+
+ long singerId;
+ long albumId;
+ String albumTitle;
+
+ Album() {
+ }
+
+ Album(long singerId, long albumId, String albumTitle) {
+ this.singerId = singerId;
+ this.albumId = albumId;
+ this.albumTitle = albumTitle;
+ }
+ }
+
+ public static void main(String[] args) {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+
+ String instanceId = options.getInstanceId();
+ String databaseId = options.getDatabaseId();
+
+ // [START spanner_dataflow_txread]
+ SpannerConfig spannerConfig = SpannerConfig.create()
+ .withInstanceId(instanceId)
+ .withDatabaseId(databaseId);
+ PCollectionView tx = p.apply(
+ SpannerIO.createTransaction()
+ .withSpannerConfig(spannerConfig)
+ .withTimestampBound(TimestampBound.strong()));
+ PCollection singers = p.apply(SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
+ .withTransaction(tx));
+ PCollection albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
+ .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
+ .withTransaction(tx));
+ // [END spanner_dataflow_txread]
+
+ singers.apply(MapElements.via(new SimpleFunction() {
+
+ @Override
+ public String apply(Struct input) {
+ return Joiner.on(DELIMITER).join(input.getLong(0), input.getString(1), input.getString(2));
+ }
+ })).apply(TextIO.write().to(options.getSingersFilename()).withoutSharding());
+
+ albums.apply(MapElements.via(new SimpleFunction() {
+
+ @Override
+ public String apply(Struct input) {
+ return Joiner.on(DELIMITER).join(input.getLong(0), input.getLong(1), input.getString(2));
+ }
+ })).apply(TextIO.write().to(options.getAlbumsFilename()).withoutSharding());
+
+ p.run().waitUntilFinish();
+
+ }
+
+}
diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java
new file mode 100644
index 00000000000..c35773fd8a5
--- /dev/null
+++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.spanner.Database;
+import com.google.cloud.spanner.DatabaseAdminClient;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Operation;
+import com.google.cloud.spanner.ReadContext;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.TransactionContext;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("checkstyle:abbreviationaswordinname")
+public class SpannerGroupWriteIT {
+
+ String instanceId;
+ String databaseId;
+
+ Path tempPath;
+ Spanner spanner;
+ SpannerOptions spannerOptions;
+
+ @Before
+ public void setUp() throws Exception {
+ instanceId = System.getProperty("spanner.test.instance");
+ databaseId = "df-spanner-groupwrite-it";
+
+ spannerOptions = SpannerOptions.getDefaultInstance();
+ spanner = spannerOptions.getService();
+
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Does not exist, ignore.
+ }
+
+ Operation op = adminClient
+ .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE users ("
+ + "id STRING(MAX) NOT NULL, state STRING(MAX) NOT NULL) PRIMARY KEY (id)",
+ "CREATE TABLE PendingReviews (id INT64, action STRING(MAX), "
+ + "note STRING(MAX), userId STRING(MAX),) PRIMARY KEY (id)"));
+
+ op.waitFor();
+
+ DatabaseClient dbClient = getDbClient();
+
+ List mutations = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ mutations.add(
+ Mutation.newInsertBuilder("users").set("id").to(Integer.toString(i)).set("state")
+ .to("ACTIVE").build());
+ }
+ TransactionRunner runner = dbClient.readWriteTransaction();
+ runner.run(new TransactionRunner.TransactionCallable() {
+
+ @Nullable
+ @Override
+ public Void run(TransactionContext tx) throws Exception {
+ tx.buffer(mutations);
+ return null;
+ }
+ });
+
+ String content = IntStream.range(0, 10).mapToObj(Integer::toString)
+ .collect(Collectors.joining("\n"));
+ tempPath = Files.createTempFile("suspicious-ids", "txt");
+ Files.write(tempPath, content.getBytes());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Failed to cleanup.
+ }
+
+ spanner.close();
+ }
+
+ @Test
+ public void testEndToEnd() throws Exception {
+ SpannerGroupWrite.main(
+ new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
+ "--suspiciousUsersFile=" + tempPath, "--runner=DirectRunner" });
+
+ DatabaseClient dbClient = getDbClient();
+ try (ReadContext context = dbClient.singleUse()) {
+ ResultSet rs = context.executeQuery(
+ Statement.newBuilder("SELECT COUNT(*) FROM users WHERE STATE = @state").bind("state")
+ .to("BLOCKED").build());
+ assertTrue(rs.next());
+ assertEquals(10, rs.getLong(0));
+
+ }
+ try (ReadContext context = dbClient.singleUse()) {
+ ResultSet rs = context.executeQuery(
+ Statement.newBuilder("SELECT COUNT(*) FROM PendingReviews WHERE ACTION = @action")
+ .bind("action").to("REVIEW ACCOUNT").build());
+ assertTrue(rs.next());
+ assertEquals(10, rs.getLong(0));
+ }
+ }
+
+ private DatabaseClient getDbClient() {
+ return spanner
+ .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId));
+ }
+
+}
diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java
new file mode 100644
index 00000000000..621c00b2018
--- /dev/null
+++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.spanner.Database;
+import com.google.cloud.spanner.DatabaseAdminClient;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Operation;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.TransactionContext;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("checkstyle:abbreviationaswordinname")
+public class SpannerReadIT {
+
+ String instanceId;
+ String databaseId;
+
+ Spanner spanner;
+ SpannerOptions spannerOptions;
+
+ @Before
+ public void setUp() throws Exception {
+ instanceId = System.getProperty("spanner.test.instance");
+ databaseId = "df-spanner-read-it";
+
+ spannerOptions = SpannerOptions.getDefaultInstance();
+ spanner = spannerOptions.getService();
+
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Does not exist, ignore.
+ }
+
+ Operation op = adminClient
+ .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers "
+ + "(singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, "
+ + "lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)",
+ "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, "
+ + "albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)"));
+
+ op.waitFor();
+
+ List mutations = Arrays.asList(
+ Mutation.newInsertBuilder("singers")
+ .set("singerId").to(1L)
+ .set("firstName").to("John")
+ .set("lastName").to("Lennon")
+ .build(),
+ Mutation.newInsertBuilder("singers")
+ .set("singerId").to(2L)
+ .set("firstName").to("Paul")
+ .set("lastName").to("Mccartney")
+ .build(),
+ Mutation.newInsertBuilder("singers")
+ .set("singerId").to(3L)
+ .set("firstName").to("George")
+ .set("lastName").to("Harrison")
+ .build(),
+ Mutation.newInsertBuilder("singers")
+ .set("singerId").to(4L)
+ .set("firstName").to("Ringo")
+ .set("lastName").to("Starr")
+ .build(),
+
+ Mutation.newInsertBuilder("albums")
+ .set("singerId").to(1L)
+ .set("albumId").to(1L)
+ .set("albumTitle").to("Imagine")
+ .build(),
+ Mutation.newInsertBuilder("albums")
+ .set("singerId").to(2L)
+ .set("albumId").to(1L)
+ .set("albumTitle").to("Pipes of Peace")
+ .build()
+ );
+
+
+ DatabaseClient dbClient = getDbClient();
+
+ TransactionRunner runner = dbClient.readWriteTransaction();
+ runner.run(new TransactionRunner.TransactionCallable() {
+ @Nullable
+ @Override
+ public Void run(TransactionContext tx) throws Exception {
+ tx.buffer(mutations);
+ return null;
+ }
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Failed to cleanup.
+ }
+
+ spanner.close();
+ }
+
+ @Test
+ public void readDbEndToEnd() throws Exception {
+ Path outPath = Files.createTempFile("out", "txt");
+ SpannerReadAll.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
+ "--output=" + outPath, "--runner=DirectRunner" });
+
+ String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n"));
+
+ assertEquals("132", content);
+ }
+
+ @Test
+ public void readTableEndToEnd() throws Exception {
+ Path outPath = Files.createTempFile("out", "txt");
+ SpannerRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
+ "--output=" + outPath, "--table=albums", "--runner=DirectRunner" });
+
+ String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n"));
+
+ assertEquals("53", content);
+ }
+
+ @Test
+ public void reaTransactionalReadEndToEnd() throws Exception {
+ Path singersPath = Files.createTempFile("singers", "txt");
+ Path albumsPath = Files.createTempFile("albums", "txt");
+ TransactionalRead.main(
+ new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
+ "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath,
+ "--runner=DirectRunner" });
+
+ assertEquals(4, Files.readAllLines(singersPath).size());
+ assertEquals(2, Files.readAllLines(albumsPath).size());
+ }
+
+ private DatabaseClient getDbClient() {
+ return spanner
+ .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId));
+ }
+
+}
\ No newline at end of file
diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java
new file mode 100644
index 00000000000..5631ddcbd46
--- /dev/null
+++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2018 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.dataflow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.spanner.Database;
+import com.google.cloud.spanner.DatabaseAdminClient;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Operation;
+import com.google.cloud.spanner.ReadContext;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.Statement;
+import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("checkstyle:abbreviationaswordinname")
+public class SpannerWriteIT {
+
+ String instanceId;
+ String databaseId;
+
+ Path singersPath;
+ Path albumsPath;
+ Spanner spanner;
+ SpannerOptions spannerOptions;
+
+ @Before
+ public void setUp() throws Exception {
+
+ instanceId = System.getProperty("spanner.test.instance");
+ databaseId = "df-spanner-write-it";
+
+ spannerOptions = SpannerOptions.getDefaultInstance();
+ spanner = spannerOptions.getService();
+
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Does not exist, ignore.
+ }
+
+ Operation op = adminClient
+ .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers "
+ + "(singerId INT64 NOT NULL, "
+ + "firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) "
+ + "PRIMARY KEY (singerId)",
+ "CREATE TABLE Albums (singerId INT64 NOT NULL, "
+ + "albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) "
+ + "PRIMARY KEY (singerId, albumId)"));
+
+ op.waitFor();
+
+ String singers = Stream
+ .of("1\tJohn\tLennon", "2\tPaul\tMccartney", "3\tGeorge\tHarrison", "4\tRingo\tStarr")
+ .collect(Collectors.joining("\n"));
+ singersPath = Files.createTempFile("singers", "txt");
+ Files.write(singersPath, singers.getBytes());
+
+ String albums = Stream
+ .of("1\t1\tImagine", "2\t1\tPipes of Peace", "3\t1\tDark Horse")
+ .collect(Collectors.joining("\n"));
+ albumsPath = Files.createTempFile("albums", "txt");
+ Files.write(albumsPath, albums.getBytes());
+
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient();
+ try {
+ adminClient.dropDatabase(instanceId, databaseId);
+ } catch (SpannerException e) {
+ // Failed to cleanup.
+ }
+
+ spanner.close();
+ }
+
+ @Test
+ public void testEndToEnd() throws Exception {
+ SpannerWrite.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId,
+ "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath,
+ "--runner=DirectRunner" });
+
+ DatabaseClient dbClient = getDbClient();
+ try (ReadContext context = dbClient.singleUse()) {
+ ResultSet rs = context.executeQuery(
+ Statement.of("SELECT COUNT(*) FROM singers"));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(0));
+
+ }
+ try (ReadContext context = dbClient.singleUse()) {
+ ResultSet rs = context.executeQuery(Statement.of("SELECT COUNT(*) FROM albums"));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getLong(0));
+ }
+ }
+
+ private DatabaseClient getDbClient() {
+ return spanner
+ .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId));
+ }
+
+}