diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml index 65700a9645f..165efbffa65 100644 --- a/dataflow/spanner-io/pom.xml +++ b/dataflow/spanner-io/pom.xml @@ -48,6 +48,16 @@ maven-compiler-plugin 3.7.0 + + org.apache.maven.plugins + maven-failsafe-plugin + 2.19.1 + + + default-instance + + + @@ -84,13 +94,6 @@ ${apache_beam.version} - - - com.google.cloud - google-cloud-spanner - 0.34.0-beta - - org.slf4j diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java new file mode 100644 index 00000000000..5f393d3ed38 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java @@ -0,0 +1,86 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Estimates the size of the {@code Struct}. + */ +public class EstimateSize extends PTransform, PCollection> { + + public static EstimateSize create() { + return new EstimateSize(); + } + + private EstimateSize() { + } + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new EstimateStructSizeFn())); + } + + /** + * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. + */ + public static class EstimateStructSizeFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Struct row = c.element(); + long sum = 0; + for (int i = 0; i < row.getColumnCount(); i++) { + if (row.isNull(i)) { + continue; + } + + switch (row.getColumnType(i).getCode()) { + case BOOL: + sum += 1; + break; + case INT64: + case FLOAT64: + sum += 8; + break; + case TIMESTAMP: + case DATE: + sum += 12; + break; + case BYTES: + sum += row.getBytes(i).length(); + break; + case STRING: + sum += row.getString(i).length(); + break; + case ARRAY: + throw new IllegalArgumentException("Arrays are not supported :("); + case STRUCT: + throw new IllegalArgumentException("Structs are not supported :("); + default: + throw new IllegalArgumentException("Unsupported type :("); + } + } + c.output(sum); + } + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java new file mode 100644 index 00000000000..e1db90f1821 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -0,0 +1,115 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Mutation; +import com.google.common.base.Charsets; +import com.google.common.hash.Hashing; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; + +/** + * This sample demonstrates how to group together mutations when writing to the Cloud Spanner + * database. + */ +public class SpannerGroupWrite { + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") + @Validation.Required + String getSuspiciousUsersFile(); + + void setSuspiciousUsersFile(String value); + + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + + String usersIdFile = options.getSuspiciousUsersFile(); + + PCollection suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile)); + + final Timestamp timestamp = Timestamp.now(); + + // [START spanner_dataflow_writegroup] + PCollection mutations = suspiciousUserIds + .apply(MapElements.via(new SimpleFunction() { + + @Override + public MutationGroup apply(String userId) { + // Immediately block the user. + Mutation userMutation = Mutation.newUpdateBuilder("Users") + .set("id").to(userId) + .set("state").to("BLOCKED") + .build(); + long generatedId = Hashing.sha1().newHasher() + .putString(userId, Charsets.UTF_8) + .putLong(timestamp.getSeconds()) + .putLong(timestamp.getNanos()) + .hash() + .asLong(); + + // Add an entry to pending review requests. + Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews") + .set("id").to(generatedId) // Must be deterministically generated. + .set("userId").to(userId) + .set("action").to("REVIEW ACCOUNT") + .set("note").to("Suspicious activity detected.") + .build(); + + return MutationGroup.create(userMutation, pendingReview); + } + })); + + mutations.apply(SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId) + .grouped()); + // [END spanner_dataflow_writegroup] + + p.run().waitUntilFinish(); + + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java index 330c560cb92..439549f245f 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java @@ -24,10 +24,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.values.PCollection; @@ -87,49 +83,6 @@ public interface Options extends PipelineOptions { void setOutput(String value); } - /** - * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. - */ - public static class EstimateStructSizeFn extends DoFn { - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Struct row = c.element(); - long sum = 0; - for (int i = 0; i < row.getColumnCount(); i++) { - if (row.isNull(i)) { - continue; - } - - switch (row.getColumnType(i).getCode()) { - case BOOL: - sum += 1; - break; - case INT64: - case FLOAT64: - sum += 8; - break; - case TIMESTAMP: - case DATE: - sum += 12; - break; - case BYTES: - sum += row.getBytes(i).length(); - break; - case STRING: - sum += row.getString(i).length(); - break; - case ARRAY: - throw new IllegalArgumentException("Arrays are not supported :("); - case STRUCT: - throw new IllegalArgumentException("Structs are not supported :("); - default: - throw new IllegalArgumentException("Unsupported type :("); - } - } - c.output(sum); - } - } public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); @@ -137,23 +90,26 @@ public static void main(String[] args) { String instanceId = options.getInstanceId(); String databaseId = options.getDatabaseId(); - String query = "SELECT * FROM " + options.getTable(); - - PCollection tableEstimatedSize = p - // Query for all the columns and rows in the specified Spanner table - .apply(SpannerIO.read() + // [START spanner_dataflow_read] + // Query for all the columns and rows in the specified Spanner table + PCollection records = p.apply( + SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(databaseId) - .withQuery(query)) + .withQuery("SELECT * FROM " + options.getTable())); + // [START spanner_dataflow_read] + + + PCollection tableEstimatedSize = records // Estimate the size of every row - .apply(ParDo.of(new EstimateStructSizeFn())) + .apply(EstimateSize.create()) // Sum all the row sizes to get the total estimated size of the table .apply(Sum.longsGlobally()); // Write the total size to a file tableEstimatedSize .apply(ToString.elements()) - .apply(TextIO.write().to(options.getOutput())); + .apply(TextIO.write().to(options.getOutput()).withoutSharding()); p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java new file mode 100644 index 00000000000..b0eb417ced5 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java @@ -0,0 +1,90 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.ToString; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * This sample demonstrates how to read all data from the Cloud Spanner database. + */ +public class SpannerReadAll { + + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to query from") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to query from") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Output filename for records size") + @Validation.Required + String getOutput(); + + void setOutput(String value); + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + SpannerConfig spannerConfig = SpannerConfig.create() + .withInstanceId(options.getInstanceId()) + .withDatabaseId(options.getDatabaseId()); + // [START spanner_dataflow_readall] + PCollection allRecords = p.apply(SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t" + + ".table_catalog = '' AND t.table_schema = ''")).apply( + MapElements.into(TypeDescriptor.of(ReadOperation.class)) + .via((SerializableFunction) input -> { + String tableName = input.getString(0); + return ReadOperation.create().withQuery("SELECT * FROM " + tableName); + })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig)); + // [END spanner_dataflow_readall] + + PCollection dbEstimatedSize = allRecords.apply(EstimateSize.create()) + .apply(Sum.longsGlobally()); + + dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput()) + .withoutSharding()); + + p.run().waitUntilFinish(); + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java index 67502c05fdc..09e9b3e301a 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -22,13 +22,13 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +61,7 @@ mvn exec:java \ -Dexec.mainClass=com.example.dataflow.SpannerWrite \ -Dexec.args="--instanceId=my-instance-id \ - --databaseId=my-database-id \ - --singersTable=my_singers_table \ - --albumsTable=my_albums_table" + --databaseId=my-database-id */ public class SpannerWrite { @@ -73,13 +71,11 @@ public class SpannerWrite { public interface Options extends PipelineOptions { @Description("Singers filename in the format: singer_id\tfirst_name\tlast_name") - @Default.String("data/singers.txt") String getSingersFilename(); void setSingersFilename(String value); @Description("Albums filename in the format: singer_id\talbum_id\talbum_title") - @Default.String("data/albums.txt") String getAlbumsFilename(); void setAlbumsFilename(String value); @@ -95,18 +91,6 @@ public interface Options extends PipelineOptions { String getDatabaseId(); void setDatabaseId(String value); - - @Description("Spanner singers table name to write to") - @Validation.Required - String getSingersTable(); - - void setSingersTable(String value); - - @Description("Spanner albums table name to write to") - @Validation.Required - String getAlbumsTable(); - - void setAlbumsTable(String value); } @DefaultCoder(AvroCoder.class) @@ -187,8 +171,6 @@ public static void main(String[] args) { String instanceId = options.getInstanceId(); String databaseId = options.getDatabaseId(); - String singersTable = options.getSingersTable(); - String albumsTable = options.getAlbumsTable(); // Read singers from a tab-delimited file p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename())) @@ -199,7 +181,7 @@ public static void main(String[] args) { @ProcessElement public void processElement(ProcessContext c) { Singer singer = c.element(); - c.output(Mutation.newInsertOrUpdateBuilder(singersTable) + c.output(Mutation.newInsertOrUpdateBuilder("singers") .set("singerId").to(singer.singerId) .set("firstName").to(singer.firstName) .set("lastName").to(singer.lastName) @@ -212,25 +194,30 @@ public void processElement(ProcessContext c) { .withDatabaseId(databaseId)); // Read albums from a tab-delimited file - p.apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) + PCollection albums = p + .apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) // Parse the tab-delimited lines into Album objects - .apply("ParseAlbums", ParDo.of(new ParseAlbum())) + .apply("ParseAlbums", ParDo.of(new ParseAlbum())); + + // [START spanner_dataflow_write] + albums // Spanner expects a Mutation object, so create it using the Album's data .apply("CreateAlbumMutation", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { Album album = c.element(); - c.output(Mutation.newInsertOrUpdateBuilder(albumsTable) + c.output(Mutation.newInsertOrUpdateBuilder("albums") .set("singerId").to(album.singerId) .set("albumId").to(album.albumId) .set("albumTitle").to(album.albumTitle) .build()); } })) - // Finally write the Mutations to Spanner + // Write mutations to Spanner .apply("WriteAlbums", SpannerIO.write() .withInstanceId(instanceId) .withDatabaseId(databaseId)); + // [END spanner_dataflow_write] p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java new file mode 100644 index 00000000000..155a9e51a13 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java @@ -0,0 +1,146 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.common.base.Joiner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.Transaction; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +public class TransactionalRead { + + private static final String DELIMITER = "\t"; + + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") + String getSingersFilename(); + + void setSingersFilename(String value); + + @Description("Albums output filename in the format: singer_id\talbum_id\talbum_title") + String getAlbumsFilename(); + + void setAlbumsFilename(String value); + + } + + @DefaultCoder(AvroCoder.class) + static class Singer { + + long singerId; + String firstName; + String lastName; + + Singer() { + } + + Singer(long singerId, String firstName, String lastName) { + this.singerId = singerId; + this.firstName = firstName; + this.lastName = lastName; + } + } + + @DefaultCoder(AvroCoder.class) + static class Album { + + long singerId; + long albumId; + String albumTitle; + + Album() { + } + + Album(long singerId, long albumId, String albumTitle) { + this.singerId = singerId; + this.albumId = albumId; + this.albumTitle = albumTitle; + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + + // [START spanner_dataflow_txread] + SpannerConfig spannerConfig = SpannerConfig.create() + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + PCollectionView tx = p.apply( + SpannerIO.createTransaction() + .withSpannerConfig(spannerConfig) + .withTimestampBound(TimestampBound.strong())); + PCollection singers = p.apply(SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT SingerID, FirstName, LastName FROM Singers") + .withTransaction(tx)); + PCollection albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig) + .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums") + .withTransaction(tx)); + // [END spanner_dataflow_txread] + + singers.apply(MapElements.via(new SimpleFunction() { + + @Override + public String apply(Struct input) { + return Joiner.on(DELIMITER).join(input.getLong(0), input.getString(1), input.getString(2)); + } + })).apply(TextIO.write().to(options.getSingersFilename()).withoutSharding()); + + albums.apply(MapElements.via(new SimpleFunction() { + + @Override + public String apply(Struct input) { + return Joiner.on(DELIMITER).join(input.getLong(0), input.getLong(1), input.getString(2)); + } + })).apply(TextIO.write().to(options.getAlbumsFilename()).withoutSharding()); + + p.run().waitUntilFinish(); + + } + +} diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java new file mode 100644 index 00000000000..c35773fd8a5 --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -0,0 +1,149 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings("checkstyle:abbreviationaswordinname") +public class SpannerGroupWriteIT { + + String instanceId; + String databaseId; + + Path tempPath; + Spanner spanner; + SpannerOptions spannerOptions; + + @Before + public void setUp() throws Exception { + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-groupwrite-it"; + + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE users (" + + "id STRING(MAX) NOT NULL, state STRING(MAX) NOT NULL) PRIMARY KEY (id)", + "CREATE TABLE PendingReviews (id INT64, action STRING(MAX), " + + "note STRING(MAX), userId STRING(MAX),) PRIMARY KEY (id)")); + + op.waitFor(); + + DatabaseClient dbClient = getDbClient(); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + mutations.add( + Mutation.newInsertBuilder("users").set("id").to(Integer.toString(i)).set("state") + .to("ACTIVE").build()); + } + TransactionRunner runner = dbClient.readWriteTransaction(); + runner.run(new TransactionRunner.TransactionCallable() { + + @Nullable + @Override + public Void run(TransactionContext tx) throws Exception { + tx.buffer(mutations); + return null; + } + }); + + String content = IntStream.range(0, 10).mapToObj(Integer::toString) + .collect(Collectors.joining("\n")); + tempPath = Files.createTempFile("suspicious-ids", "txt"); + Files.write(tempPath, content.getBytes()); + } + + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } + + spanner.close(); + } + + @Test + public void testEndToEnd() throws Exception { + SpannerGroupWrite.main( + new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--suspiciousUsersFile=" + tempPath, "--runner=DirectRunner" }); + + DatabaseClient dbClient = getDbClient(); + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.newBuilder("SELECT COUNT(*) FROM users WHERE STATE = @state").bind("state") + .to("BLOCKED").build()); + assertTrue(rs.next()); + assertEquals(10, rs.getLong(0)); + + } + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.newBuilder("SELECT COUNT(*) FROM PendingReviews WHERE ACTION = @action") + .bind("action").to("REVIEW ACCOUNT").build()); + assertTrue(rs.next()); + assertEquals(10, rs.getLong(0)); + } + } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + +} diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java new file mode 100644 index 00000000000..621c00b2018 --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java @@ -0,0 +1,177 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings("checkstyle:abbreviationaswordinname") +public class SpannerReadIT { + + String instanceId; + String databaseId; + + Spanner spanner; + SpannerOptions spannerOptions; + + @Before + public void setUp() throws Exception { + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-read-it"; + + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers " + + "(singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, " + + "lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, " + + "albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); + + op.waitFor(); + + List mutations = Arrays.asList( + Mutation.newInsertBuilder("singers") + .set("singerId").to(1L) + .set("firstName").to("John") + .set("lastName").to("Lennon") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(2L) + .set("firstName").to("Paul") + .set("lastName").to("Mccartney") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(3L) + .set("firstName").to("George") + .set("lastName").to("Harrison") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(4L) + .set("firstName").to("Ringo") + .set("lastName").to("Starr") + .build(), + + Mutation.newInsertBuilder("albums") + .set("singerId").to(1L) + .set("albumId").to(1L) + .set("albumTitle").to("Imagine") + .build(), + Mutation.newInsertBuilder("albums") + .set("singerId").to(2L) + .set("albumId").to(1L) + .set("albumTitle").to("Pipes of Peace") + .build() + ); + + + DatabaseClient dbClient = getDbClient(); + + TransactionRunner runner = dbClient.readWriteTransaction(); + runner.run(new TransactionRunner.TransactionCallable() { + @Nullable + @Override + public Void run(TransactionContext tx) throws Exception { + tx.buffer(mutations); + return null; + } + }); + } + + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } + + spanner.close(); + } + + @Test + public void readDbEndToEnd() throws Exception { + Path outPath = Files.createTempFile("out", "txt"); + SpannerReadAll.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--output=" + outPath, "--runner=DirectRunner" }); + + String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); + + assertEquals("132", content); + } + + @Test + public void readTableEndToEnd() throws Exception { + Path outPath = Files.createTempFile("out", "txt"); + SpannerRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--output=" + outPath, "--table=albums", "--runner=DirectRunner" }); + + String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); + + assertEquals("53", content); + } + + @Test + public void reaTransactionalReadEndToEnd() throws Exception { + Path singersPath = Files.createTempFile("singers", "txt"); + Path albumsPath = Files.createTempFile("albums", "txt"); + TransactionalRead.main( + new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath, + "--runner=DirectRunner" }); + + assertEquals(4, Files.readAllLines(singersPath).size()); + assertEquals(2, Files.readAllLines(albumsPath).size()); + } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + +} \ No newline at end of file diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java new file mode 100644 index 00000000000..5631ddcbd46 --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java @@ -0,0 +1,135 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings("checkstyle:abbreviationaswordinname") +public class SpannerWriteIT { + + String instanceId; + String databaseId; + + Path singersPath; + Path albumsPath; + Spanner spanner; + SpannerOptions spannerOptions; + + @Before + public void setUp() throws Exception { + + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-write-it"; + + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers " + + "(singerId INT64 NOT NULL, " + + "firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) " + + "PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, " + + "albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) " + + "PRIMARY KEY (singerId, albumId)")); + + op.waitFor(); + + String singers = Stream + .of("1\tJohn\tLennon", "2\tPaul\tMccartney", "3\tGeorge\tHarrison", "4\tRingo\tStarr") + .collect(Collectors.joining("\n")); + singersPath = Files.createTempFile("singers", "txt"); + Files.write(singersPath, singers.getBytes()); + + String albums = Stream + .of("1\t1\tImagine", "2\t1\tPipes of Peace", "3\t1\tDark Horse") + .collect(Collectors.joining("\n")); + albumsPath = Files.createTempFile("albums", "txt"); + Files.write(albumsPath, albums.getBytes()); + + + } + + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } + + spanner.close(); + } + + @Test + public void testEndToEnd() throws Exception { + SpannerWrite.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath, + "--runner=DirectRunner" }); + + DatabaseClient dbClient = getDbClient(); + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.of("SELECT COUNT(*) FROM singers")); + assertTrue(rs.next()); + assertEquals(4, rs.getLong(0)); + + } + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery(Statement.of("SELECT COUNT(*) FROM albums")); + assertTrue(rs.next()); + assertEquals(3, rs.getLong(0)); + } + } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + +}