From bc0475823356028036df1c0055dd5a8f0ec56282 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 13 Mar 2018 15:51:54 -0700 Subject: [PATCH 1/9] Updated Spanner Dataflow connector samples --- .../com/example/dataflow/EstimateSize.java | 85 ++++++++++ .../example/dataflow/SpannerGroupWrite.java | 105 +++++++++++++ .../com/example/dataflow/SpannerRead.java | 60 ++------ .../com/example/dataflow/SpannerReadAll.java | 84 ++++++++++ .../com/example/dataflow/SpannerWrite.java | 12 +- .../example/dataflow/TransactionalRead.java | 145 ++++++++++++++++++ 6 files changed, 438 insertions(+), 53 deletions(-) create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java create mode 100644 dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java new file mode 100644 index 00000000000..e3c0eb9b99f --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java @@ -0,0 +1,85 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Estimates the size of the {@code Struct}. + */ +public class EstimateSize extends PTransform, PCollection> { + + public static EstimateSize create() { + return new EstimateSize(); + } + + private EstimateSize() { + } + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new EstimateStructSizeFn())); + } + + /** + * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. + */ + public static class EstimateStructSizeFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Struct row = c.element(); + long sum = 0; + for (int i = 0; i < row.getColumnCount(); i++) { + if (row.isNull(i)) { + continue; + } + + switch (row.getColumnType(i).getCode()) { + case BOOL: + sum += 1; + break; + case INT64: + case FLOAT64: + sum += 8; + break; + case TIMESTAMP: + case DATE: + sum += 12; + break; + case BYTES: + sum += row.getBytes(i).length(); + break; + case STRING: + sum += row.getString(i).length(); + break; + case ARRAY: + throw new IllegalArgumentException("Arrays are not supported :("); + case STRUCT: + throw new IllegalArgumentException("Structs are not supported :("); + default: + throw new IllegalArgumentException("Unsupported type :("); + } + } + c.output(sum); + } + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java new file mode 100644 index 00000000000..406cd1c7285 --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -0,0 +1,105 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.dataflow; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Mutation; +import com.google.common.base.Charsets; +import com.google.common.hash.Hashing; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.*; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; + +/** + * This sample demonstrates how to group together mutations when writing to the Cloud Spanner database. + */ +public class SpannerGroupWrite { + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") + @Default.String("data/usersids.txt") + String getSuspiciousUsersFile(); + + void setSuspiciousUserFile(String value); + + } + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + + String usersIdFile = options.getSuspiciousUsersFile(); + + PCollection suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile)); + + final Timestamp timestamp = Timestamp.now(); + + // [START spanner_dataflow_writegroup] + PCollection mutations = suspiciousUserIds.apply(MapElements.via(new SimpleFunction() { + @Override + public MutationGroup apply(String userId) { + // Immediately block the user. + Mutation userMutation = Mutation.newUpdateBuilder("Users") + .set("id").to(userId) + .set("state").to("BLOCKED") + .build(); + long generatedId = Hashing.sha1().newHasher() + .putString(userId, Charsets.UTF_8) + .putLong(timestamp.getSeconds()) + .putLong(timestamp.getNanos()) + .hash() + .asLong(); + + // Add an entry to pending review requests. + Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews") + .set("id").to(generatedId) // Must be deterministically generated. + .set("userId").to(userId) + .set("action").to("REVIEW ACCOUNT") + .set("node").to("Suspicious activity detected.") + .build(); + + return MutationGroup.create(userMutation, pendingReview); + } + })); + + mutations.apply(SpannerIO.write() + .withInstanceId(instanceId) + .withDatabaseId(databaseId) + .grouped()); + // [END spanner_dataflow_writegroup] + + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java index 330c560cb92..78c44bac6af 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java @@ -87,49 +87,6 @@ public interface Options extends PipelineOptions { void setOutput(String value); } - /** - * Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported. - */ - public static class EstimateStructSizeFn extends DoFn { - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Struct row = c.element(); - long sum = 0; - for (int i = 0; i < row.getColumnCount(); i++) { - if (row.isNull(i)) { - continue; - } - - switch (row.getColumnType(i).getCode()) { - case BOOL: - sum += 1; - break; - case INT64: - case FLOAT64: - sum += 8; - break; - case TIMESTAMP: - case DATE: - sum += 12; - break; - case BYTES: - sum += row.getBytes(i).length(); - break; - case STRING: - sum += row.getString(i).length(); - break; - case ARRAY: - throw new IllegalArgumentException("Arrays are not supported :("); - case STRUCT: - throw new IllegalArgumentException("Structs are not supported :("); - default: - throw new IllegalArgumentException("Unsupported type :("); - } - } - c.output(sum); - } - } public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); @@ -137,16 +94,19 @@ public static void main(String[] args) { String instanceId = options.getInstanceId(); String databaseId = options.getDatabaseId(); - String query = "SELECT * FROM " + options.getTable(); - - PCollection tableEstimatedSize = p - // Query for all the columns and rows in the specified Spanner table - .apply(SpannerIO.read() + // [START spanner_dataflow_read] + // Query for all the columns and rows in the specified Spanner table + PCollection records = p.apply( + SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(databaseId) - .withQuery(query)) + .withQuery("SELECT * FROM " + options.getTable())); + // [START spanner_dataflow_read] + + + PCollection tableEstimatedSize = records // Estimate the size of every row - .apply(ParDo.of(new EstimateStructSizeFn())) + .apply(EstimateSize.create()) // Sum all the row sizes to get the total estimated size of the table .apply(Sum.longsGlobally()); diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java new file mode 100644 index 00000000000..1c525eef17a --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.ToString; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * This sample demonstrates how to read all data from the Cloud Spanner database. + */ +public class SpannerReadAll { + + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to query from") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to query from") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Output filename for records size") + @Validation.Required + String getOutput(); + + void setOutput(String value); + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + // [START spanner_dataflow_readall] + PCollection allRecords = p.apply(SpannerIO.read().withQuery( + "SELECT t.table_name FROM information_schema.tables AS t WHERE t" + + ".table_catalog = '' AND t.table_schema = ''")).apply( + MapElements.into(TypeDescriptor.of(ReadOperation.class)) + .via((SerializableFunction) input -> { + String tableName = input.getString(0); + return ReadOperation.create().withQuery("SELECT * FROM " + tableName); + })).apply(SpannerIO.readAll()); + // [END spanner_dataflow_readall] + + PCollection dbEstimatedSize = allRecords.apply(EstimateSize.create()) + .apply(Sum.longsGlobally()); + + dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput())); + + p.run().waitUntilFinish(); + } + +} diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java index 67502c05fdc..8644d965009 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,9 +213,13 @@ public void processElement(ProcessContext c) { .withDatabaseId(databaseId)); // Read albums from a tab-delimited file - p.apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) + PCollection albums = p + .apply("ReadAlbums", TextIO.read().from(options.getAlbumsFilename())) // Parse the tab-delimited lines into Album objects - .apply("ParseAlbums", ParDo.of(new ParseAlbum())) + .apply("ParseAlbums", ParDo.of(new ParseAlbum())); + + // [START spanner_dataflow_write] + albums // Spanner expects a Mutation object, so create it using the Album's data .apply("CreateAlbumMutation", ParDo.of(new DoFn() { @ProcessElement @@ -227,10 +232,11 @@ public void processElement(ProcessContext c) { .build()); } })) - // Finally write the Mutations to Spanner + // Write mutations to Spanner .apply("WriteAlbums", SpannerIO.write() .withInstanceId(instanceId) .withDatabaseId(databaseId)); + // [END spanner_dataflow_write] p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java new file mode 100644 index 00000000000..95d9e339edd --- /dev/null +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java @@ -0,0 +1,145 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.common.base.Joiner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.Transaction; +import org.apache.beam.sdk.options.*; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +public class TransactionalRead { + + private static final String DELIMITER = "\t"; + + public interface Options extends PipelineOptions { + + @Description("Spanner instance ID to write to") + @Validation.Required + String getInstanceId(); + + void setInstanceId(String value); + + @Description("Spanner database name to write to") + @Validation.Required + String getDatabaseId(); + + void setDatabaseId(String value); + + @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") + @Default.String("data/singers.txt") + String getSingersFilename(); + + void setSingersFilename(String value); + + @Description("Albums output filename in the format: singer_id\talbum_id\talbum_title") + @Default.String("data/albums.txt") + String getAlbumsFilename(); + + void setAlbumsFilename(String value); + + } + + @DefaultCoder(AvroCoder.class) + static class Singer { + + long singerId; + String firstName; + String lastName; + + Singer() { + } + + Singer(long singerId, String firstName, String lastName) { + this.singerId = singerId; + this.firstName = firstName; + this.lastName = lastName; + } + } + + @DefaultCoder(AvroCoder.class) + static class Album { + + long singerId; + long albumId; + String albumTitle; + + Album() { + } + + Album(long singerId, long albumId, String albumTitle) { + this.singerId = singerId; + this.albumId = albumId; + this.albumTitle = albumTitle; + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + String instanceId = options.getInstanceId(); + String databaseId = options.getDatabaseId(); + + // [START spanner_dataflow_txread] + SpannerConfig spannerConfig = SpannerConfig.create() + .withInstanceId(instanceId) + .withDatabaseId(databaseId); + PCollectionView tx = p.apply( + SpannerIO.createTransaction() + .withSpannerConfig(spannerConfig) + .withTimestampBound(TimestampBound.strong())); + PCollection singers = p.apply(SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT SingerID, FirstName, LastName FROM Singers") + .withTransaction(tx)); + PCollection albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig) + .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums") + .withTransaction(tx)); + // [END spanner_dataflow_txread] + + singers.apply(MapElements.via(new SimpleFunction() { + + @Override + public String apply(Struct input) { + return Joiner.on(DELIMITER).join(input.getLong(0), input.getString(1), input.getString(2)); + } + })).apply(TextIO.write().to(options.getSingersFilename())); + + albums.apply(MapElements.via(new SimpleFunction() { + + @Override + public String apply(Struct input) { + return Joiner.on(DELIMITER).join(input.getLong(0), input.getLong(1), input.getString(2)); + } + })).apply(TextIO.write().to(options.getAlbumsFilename())); + + p.run().waitUntilFinish(); + + } + +} From 2b40d68f4a96ac62600f63f781161f7346158fcf Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Mar 2018 15:31:54 -0700 Subject: [PATCH 2/9] Make checkstyle happy --- .../com/example/dataflow/EstimateSize.java | 1 + .../example/dataflow/SpannerGroupWrite.java | 65 +++++++++++-------- .../example/dataflow/TransactionalRead.java | 6 +- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java index e3c0eb9b99f..5f393d3ed38 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/EstimateSize.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example.dataflow; import com.google.cloud.spanner.Struct; diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java index 406cd1c7285..8ac32dd10df 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example.dataflow; import com.google.cloud.Timestamp; @@ -23,13 +24,18 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.options.*; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; /** - * This sample demonstrates how to group together mutations when writing to the Cloud Spanner database. + * This sample demonstrates how to group together mutations when writing to the Cloud Spanner + * database. */ public class SpannerGroupWrite { public interface Options extends PipelineOptions { @@ -53,6 +59,7 @@ public interface Options extends PipelineOptions { void setSuspiciousUserFile(String value); } + public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline p = Pipeline.create(options); @@ -67,32 +74,34 @@ public static void main(String[] args) { final Timestamp timestamp = Timestamp.now(); // [START spanner_dataflow_writegroup] - PCollection mutations = suspiciousUserIds.apply(MapElements.via(new SimpleFunction() { - @Override - public MutationGroup apply(String userId) { - // Immediately block the user. - Mutation userMutation = Mutation.newUpdateBuilder("Users") - .set("id").to(userId) - .set("state").to("BLOCKED") - .build(); - long generatedId = Hashing.sha1().newHasher() - .putString(userId, Charsets.UTF_8) - .putLong(timestamp.getSeconds()) - .putLong(timestamp.getNanos()) - .hash() - .asLong(); - - // Add an entry to pending review requests. - Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews") - .set("id").to(generatedId) // Must be deterministically generated. - .set("userId").to(userId) - .set("action").to("REVIEW ACCOUNT") - .set("node").to("Suspicious activity detected.") - .build(); - - return MutationGroup.create(userMutation, pendingReview); - } - })); + PCollection mutations = suspiciousUserIds + .apply(MapElements.via(new SimpleFunction() { + + @Override + public MutationGroup apply(String userId) { + // Immediately block the user. + Mutation userMutation = Mutation.newUpdateBuilder("Users") + .set("id").to(userId) + .set("state").to("BLOCKED") + .build(); + long generatedId = Hashing.sha1().newHasher() + .putString(userId, Charsets.UTF_8) + .putLong(timestamp.getSeconds()) + .putLong(timestamp.getNanos()) + .hash() + .asLong(); + + // Add an entry to pending review requests. + Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews") + .set("id").to(generatedId) // Must be deterministically generated. + .set("userId").to(userId) + .set("action").to("REVIEW ACCOUNT") + .set("node").to("Suspicious activity detected.") + .build(); + + return MutationGroup.create(userMutation, pendingReview); + } + })); mutations.apply(SpannerIO.write() .withInstanceId(instanceId) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java index 95d9e339edd..a09404cc6f8 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java @@ -26,7 +26,11 @@ import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.Transaction; -import org.apache.beam.sdk.options.*; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; From fba6faa501bd33f8670123cb03fa6eb09045f843 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Mar 2018 16:52:43 -0700 Subject: [PATCH 3/9] First test --- dataflow/spanner-io/pom.xml | 7 -- .../example/dataflow/SpannerGroupWrite.java | 7 +- .../java/com/example/dataflow/SpannerIT.java | 114 ++++++++++++++++++ 3 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml index 65700a9645f..8df1981b89d 100644 --- a/dataflow/spanner-io/pom.xml +++ b/dataflow/spanner-io/pom.xml @@ -84,13 +84,6 @@ ${apache_beam.version} - - - com.google.cloud - google-cloud-spanner - 0.34.0-beta - - org.slf4j diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java index 8ac32dd10df..ba3b4b06362 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -54,9 +54,10 @@ public interface Options extends PipelineOptions { @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") @Default.String("data/usersids.txt") + @Validation.Required String getSuspiciousUsersFile(); - void setSuspiciousUserFile(String value); + void setSuspiciousUsersFile(String value); } @@ -96,7 +97,7 @@ public MutationGroup apply(String userId) { .set("id").to(generatedId) // Must be deterministically generated. .set("userId").to(userId) .set("action").to("REVIEW ACCOUNT") - .set("node").to("Suspicious activity detected.") + .set("note").to("Suspicious activity detected.") .build(); return MutationGroup.create(userMutation, pendingReview); @@ -109,6 +110,8 @@ public MutationGroup apply(String userId) { .grouped()); // [END spanner_dataflow_writegroup] + p.run().waitUntilFinish(); + } } diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java new file mode 100644 index 00000000000..28a8d2155e3 --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java @@ -0,0 +1,114 @@ +package com.example.dataflow; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SpannerIT { + + private String runSample(Consumer main) throws Exception { + PrintStream stdOut = System.out; + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + main.accept(null); + System.setOut(stdOut); + return bout.toString(); + } + + @Test + public void testSpannerGroupWrite() throws Exception { + Path tempPath = Files.createTempFile("suspicious-ids", "txt"); + + String instanceId = "mairbek-deleteme"; + String databaseId = "test2"; + + SpannerOptions options = SpannerOptions.getDefaultInstance(); + Spanner spanner = options.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE users (" + + "id STRING(MAX) NOT NULL, state STRING(MAX) NOT NULL) PRIMARY KEY (id)", + "CREATE TABLE PendingReviews (id INT64, action STRING(MAX), note STRING(MAX), userId STRING(MAX),) PRIMARY KEY (id)")); + + op.waitFor(); + + DatabaseClient dbClient = spanner.getDatabaseClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId)); + + List mutations = new ArrayList<>(); + for (int i = 0; i< 10; i++) { + mutations.add(Mutation.newInsertBuilder("users").set("id").to(Integer.toString(i)).set("state").to("ACTIVE").build()); + } + TransactionRunner runner = dbClient.readWriteTransaction(); + runner.run(new TransactionRunner.TransactionCallable() { + @Nullable + @Override + public Void run(TransactionContext tx) throws Exception { + tx.buffer(mutations); + return null; + } + }); + + String content = IntStream.range(0, 10).mapToObj(Integer::toString) + .collect(Collectors.joining("\n")); + Files.write(tempPath, content.getBytes()); + + + String out = runSample(v -> SpannerGroupWrite.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--suspiciousUsersFile=" + tempPath, "--runner=DirectRunner" })); + + System.out.println(out); + + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.newBuilder("SELECT COUNT(*) FROM users WHERE STATE = @state").bind("state").to("BLOCKED") + .build()); + assertTrue(rs.next()); + assertEquals(10, rs.getLong(0)); + + } + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.newBuilder("SELECT COUNT(*) FROM PendingReviews WHERE ACTION = @action").bind("action").to("REVIEW ACCOUNT").build()); + assertTrue(rs.next()); + assertEquals(10, rs.getLong(0)); + } + } +} \ No newline at end of file From 0a8d6919fcb2dfadff3ec9c49dd3a48d52cf1877 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Mar 2018 17:03:31 -0700 Subject: [PATCH 4/9] Updated the test --- ...pannerIT.java => SpannerGroupWriteIT.java} | 76 +++++++++++-------- 1 file changed, 46 insertions(+), 30 deletions(-) rename dataflow/spanner-io/src/test/java/com/example/dataflow/{SpannerIT.java => SpannerGroupWriteIT.java} (63%) diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java similarity index 63% rename from dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java rename to dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java index 28a8d2155e3..80a0c2e3215 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -15,45 +15,36 @@ import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class SpannerIT { +public class SpannerGroupWriteIT { - private String runSample(Consumer main) throws Exception { - PrintStream stdOut = System.out; - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(bout); - System.setOut(out); - main.accept(null); - System.setOut(stdOut); - return bout.toString(); - } + private final String instanceId = "mairbek-deleteme"; + private final String databaseId = "test2"; - @Test - public void testSpannerGroupWrite() throws Exception { - Path tempPath = Files.createTempFile("suspicious-ids", "txt"); + Path tempPath; + Spanner spanner; + SpannerOptions spannerOptions; - String instanceId = "mairbek-deleteme"; - String databaseId = "test2"; + @Before + public void setUp() throws Exception { - SpannerOptions options = SpannerOptions.getDefaultInstance(); - Spanner spanner = options.getService(); + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); @@ -70,14 +61,17 @@ public void testSpannerGroupWrite() throws Exception { op.waitFor(); - DatabaseClient dbClient = spanner.getDatabaseClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId)); + DatabaseClient dbClient = getDbClient(); List mutations = new ArrayList<>(); - for (int i = 0; i< 10; i++) { - mutations.add(Mutation.newInsertBuilder("users").set("id").to(Integer.toString(i)).set("state").to("ACTIVE").build()); + for (int i = 0; i < 20; i++) { + mutations.add( + Mutation.newInsertBuilder("users").set("id").to(Integer.toString(i)).set("state") + .to("ACTIVE").build()); } TransactionRunner runner = dbClient.readWriteTransaction(); runner.run(new TransactionRunner.TransactionCallable() { + @Nullable @Override public Void run(TransactionContext tx) throws Exception { @@ -88,27 +82,49 @@ public Void run(TransactionContext tx) throws Exception { String content = IntStream.range(0, 10).mapToObj(Integer::toString) .collect(Collectors.joining("\n")); + tempPath = Files.createTempFile("suspicious-ids", "txt"); Files.write(tempPath, content.getBytes()); + } + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } - String out = runSample(v -> SpannerGroupWrite.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, - "--suspiciousUsersFile=" + tempPath, "--runner=DirectRunner" })); + spanner.close(); + } - System.out.println(out); + @Test + public void testEndToEnd() throws Exception { + SpannerGroupWrite.main( + new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--suspiciousUsersFile=" + tempPath, "--runner=DirectRunner" }); + DatabaseClient dbClient = getDbClient(); try (ReadContext context = dbClient.singleUse()) { ResultSet rs = context.executeQuery( - Statement.newBuilder("SELECT COUNT(*) FROM users WHERE STATE = @state").bind("state").to("BLOCKED") - .build()); + Statement.newBuilder("SELECT COUNT(*) FROM users WHERE STATE = @state").bind("state") + .to("BLOCKED").build()); assertTrue(rs.next()); assertEquals(10, rs.getLong(0)); } try (ReadContext context = dbClient.singleUse()) { ResultSet rs = context.executeQuery( - Statement.newBuilder("SELECT COUNT(*) FROM PendingReviews WHERE ACTION = @action").bind("action").to("REVIEW ACCOUNT").build()); + Statement.newBuilder("SELECT COUNT(*) FROM PendingReviews WHERE ACTION = @action") + .bind("action").to("REVIEW ACCOUNT").build()); assertTrue(rs.next()); assertEquals(10, rs.getLong(0)); } } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + } \ No newline at end of file From d89c0fb77ba385673bfff1d332b9ae1eff7bb9e1 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Mar 2018 18:28:20 -0700 Subject: [PATCH 5/9] Added tests --- .../example/dataflow/SpannerGroupWrite.java | 1 - .../com/example/dataflow/SpannerRead.java | 6 +- .../com/example/dataflow/SpannerReadAll.java | 13 +- .../com/example/dataflow/SpannerWrite.java | 27 +-- .../example/dataflow/TransactionalRead.java | 7 +- .../example/dataflow/SpannerGroupWriteIT.java | 4 +- .../com/example/dataflow/SpannerReadIT.java | 166 ++++++++++++++++++ .../com/example/dataflow/SpannerWriteIT.java | 118 +++++++++++++ 8 files changed, 302 insertions(+), 40 deletions(-) create mode 100644 dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java create mode 100644 dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java index ba3b4b06362..c127115e073 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -53,7 +53,6 @@ public interface Options extends PipelineOptions { void setDatabaseId(String value); @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") - @Default.String("data/usersids.txt") @Validation.Required String getSuspiciousUsersFile(); diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java index 78c44bac6af..439549f245f 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerRead.java @@ -24,10 +24,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.values.PCollection; @@ -113,7 +109,7 @@ public static void main(String[] args) { // Write the total size to a file tableEstimatedSize .apply(ToString.elements()) - .apply(TextIO.write().to(options.getOutput())); + .apply(TextIO.write().to(options.getOutput()).withoutSharding()); p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java index 1c525eef17a..a96dbc46e76 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -62,21 +63,25 @@ public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline p = Pipeline.create(options); + SpannerConfig spannerConfig = SpannerConfig.create() + .withInstanceId(options.getInstanceId()) + .withDatabaseId(options.getDatabaseId()); // [START spanner_dataflow_readall] - PCollection allRecords = p.apply(SpannerIO.read().withQuery( - "SELECT t.table_name FROM information_schema.tables AS t WHERE t" + PCollection allRecords = p.apply(SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t" + ".table_catalog = '' AND t.table_schema = ''")).apply( MapElements.into(TypeDescriptor.of(ReadOperation.class)) .via((SerializableFunction) input -> { String tableName = input.getString(0); return ReadOperation.create().withQuery("SELECT * FROM " + tableName); - })).apply(SpannerIO.readAll()); + })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig)); // [END spanner_dataflow_readall] PCollection dbEstimatedSize = allRecords.apply(EstimateSize.create()) .apply(Sum.longsGlobally()); - dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput())); + dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput()).withoutSharding()); p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java index 8644d965009..fd03c88f764 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -62,9 +61,7 @@ mvn exec:java \ -Dexec.mainClass=com.example.dataflow.SpannerWrite \ -Dexec.args="--instanceId=my-instance-id \ - --databaseId=my-database-id \ - --singersTable=my_singers_table \ - --albumsTable=my_albums_table" + --databaseId=my-database-id */ public class SpannerWrite { @@ -74,13 +71,11 @@ public class SpannerWrite { public interface Options extends PipelineOptions { @Description("Singers filename in the format: singer_id\tfirst_name\tlast_name") - @Default.String("data/singers.txt") String getSingersFilename(); void setSingersFilename(String value); @Description("Albums filename in the format: singer_id\talbum_id\talbum_title") - @Default.String("data/albums.txt") String getAlbumsFilename(); void setAlbumsFilename(String value); @@ -96,19 +91,7 @@ public interface Options extends PipelineOptions { String getDatabaseId(); void setDatabaseId(String value); - - @Description("Spanner singers table name to write to") - @Validation.Required - String getSingersTable(); - - void setSingersTable(String value); - - @Description("Spanner albums table name to write to") - @Validation.Required - String getAlbumsTable(); - - void setAlbumsTable(String value); - } + } @DefaultCoder(AvroCoder.class) static class Singer { @@ -188,8 +171,6 @@ public static void main(String[] args) { String instanceId = options.getInstanceId(); String databaseId = options.getDatabaseId(); - String singersTable = options.getSingersTable(); - String albumsTable = options.getAlbumsTable(); // Read singers from a tab-delimited file p.apply("ReadSingers", TextIO.read().from(options.getSingersFilename())) @@ -200,7 +181,7 @@ public static void main(String[] args) { @ProcessElement public void processElement(ProcessContext c) { Singer singer = c.element(); - c.output(Mutation.newInsertOrUpdateBuilder(singersTable) + c.output(Mutation.newInsertOrUpdateBuilder("singers") .set("singerId").to(singer.singerId) .set("firstName").to(singer.firstName) .set("lastName").to(singer.lastName) @@ -225,7 +206,7 @@ public void processElement(ProcessContext c) { @ProcessElement public void processElement(ProcessContext c) { Album album = c.element(); - c.output(Mutation.newInsertOrUpdateBuilder(albumsTable) + c.output(Mutation.newInsertOrUpdateBuilder("albums") .set("singerId").to(album.singerId) .set("albumId").to(album.albumId) .set("albumTitle").to(album.albumTitle) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java index a09404cc6f8..155a9e51a13 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/TransactionalRead.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.Transaction; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -55,13 +54,11 @@ public interface Options extends PipelineOptions { void setDatabaseId(String value); @Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name") - @Default.String("data/singers.txt") String getSingersFilename(); void setSingersFilename(String value); @Description("Albums output filename in the format: singer_id\talbum_id\talbum_title") - @Default.String("data/albums.txt") String getAlbumsFilename(); void setAlbumsFilename(String value); @@ -132,7 +129,7 @@ public static void main(String[] args) { public String apply(Struct input) { return Joiner.on(DELIMITER).join(input.getLong(0), input.getString(1), input.getString(2)); } - })).apply(TextIO.write().to(options.getSingersFilename())); + })).apply(TextIO.write().to(options.getSingersFilename()).withoutSharding()); albums.apply(MapElements.via(new SimpleFunction() { @@ -140,7 +137,7 @@ public String apply(Struct input) { public String apply(Struct input) { return Joiner.on(DELIMITER).join(input.getLong(0), input.getLong(1), input.getString(2)); } - })).apply(TextIO.write().to(options.getAlbumsFilename())); + })).apply(TextIO.write().to(options.getAlbumsFilename()).withoutSharding()); p.run().waitUntilFinish(); diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java index 80a0c2e3215..b316ff4c976 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -33,8 +33,8 @@ public class SpannerGroupWriteIT { - private final String instanceId = "mairbek-deleteme"; - private final String databaseId = "test2"; + final String instanceId = "mairbek-deleteme"; + final String databaseId = "test2"; Path tempPath; Spanner spanner; diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java new file mode 100644 index 00000000000..d16c903733a --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java @@ -0,0 +1,166 @@ +package com.example.dataflow; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SpannerReadIT { + + final String instanceId = "mairbek-deleteme"; + final String databaseId = "test2"; + + Spanner spanner; + SpannerOptions spannerOptions; + + @Before + public void setUp() throws Exception { + + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers (singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); + + op.waitFor(); + + List mutations = Arrays.asList( + Mutation.newInsertBuilder("singers") + .set("singerId").to(1L) + .set("firstName").to("John") + .set("lastName").to("Lennon") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(2L) + .set("firstName").to("Paul") + .set("lastName").to("Mccartney") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(3L) + .set("firstName").to("George") + .set("lastName").to("Harrison") + .build(), + Mutation.newInsertBuilder("singers") + .set("singerId").to(4L) + .set("firstName").to("Ringo") + .set("lastName").to("Starr") + .build(), + + Mutation.newInsertBuilder("albums") + .set("singerId").to(1L) + .set("albumId").to(1L) + .set("albumTitle").to("Imagine") + .build(), + Mutation.newInsertBuilder("albums") + .set("singerId").to(2L) + .set("albumId").to(1L) + .set("albumTitle").to("Pipes of Peace") + .build() + ); + + + DatabaseClient dbClient = getDbClient(); + + TransactionRunner runner = dbClient.readWriteTransaction(); + runner.run(new TransactionRunner.TransactionCallable() { + @Nullable + @Override + public Void run(TransactionContext tx) throws Exception { + tx.buffer(mutations); + return null; + } + }); + } + + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } + + spanner.close(); + } + + @Test + public void readDbEndToEnd() throws Exception { + Path outPath = Files.createTempFile("out", "txt"); + SpannerReadAll.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--output=" + outPath, + "--runner=DirectRunner" }); + + String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); + + assertEquals("132", content); + } + + @Test + public void readTableEndToEnd() throws Exception { + Path outPath = Files.createTempFile("out", "txt"); + SpannerRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--output=" + outPath, "--table=albums", + "--runner=DirectRunner" }); + + String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); + + assertEquals("53", content); + } + + @Test + public void reaTransactionalReadEndToEnd() throws Exception { + Path singersPath = Files.createTempFile("singers", "txt"); + Path albumsPath = Files.createTempFile("albums", "txt"); + TransactionalRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--singersFilename=" + singersPath, "--albumsFilename="+albumsPath, + "--runner=DirectRunner" }); + + assertEquals(4, Files.readAllLines(singersPath).size()); + assertEquals(2, Files.readAllLines(albumsPath).size()); + } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + +} \ No newline at end of file diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java new file mode 100644 index 00000000000..3c68a1a9ee9 --- /dev/null +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java @@ -0,0 +1,118 @@ +package com.example.dataflow; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SpannerWriteIT { + + final String instanceId = "mairbek-deleteme"; + final String databaseId = "test2"; + + Path singersPath; + Path albumsPath; + Spanner spanner; + SpannerOptions spannerOptions; + + @Before + public void setUp() throws Exception { + + spannerOptions = SpannerOptions.getDefaultInstance(); + spanner = spannerOptions.getService(); + + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Does not exist, ignore. + } + + Operation op = adminClient + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers (singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); + + op.waitFor(); + + String singers = Stream + .of("1\tJohn\tLennon", "2\tPaul\tMccartney", "3\tGeorge\tHarrison", "4\tRingo\tStarr") + .collect(Collectors.joining("\n")); + singersPath = Files.createTempFile("singers", "txt"); + Files.write(singersPath, singers.getBytes()); + + String albums = Stream + .of("1\t1\tImagine", "2\t1\tPipes of Peace", "3\t1\tDark Horse") + .collect(Collectors.joining("\n")); + albumsPath = Files.createTempFile("albums", "txt"); + Files.write(albumsPath, albums.getBytes()); + + + } + + @After + public void tearDown() throws Exception { + DatabaseAdminClient adminClient = spanner.getDatabaseAdminClient(); + try { + adminClient.dropDatabase(instanceId, databaseId); + } catch (SpannerException e) { + // Failed to cleanup. + } + + spanner.close(); + } + + @Test + public void testEndToEnd() throws Exception { + SpannerWrite.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath, + "--runner=DirectRunner" }); + + DatabaseClient dbClient = getDbClient(); + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery( + Statement.of("SELECT COUNT(*) FROM singers")); + assertTrue(rs.next()); + assertEquals(4, rs.getLong(0)); + + } + try (ReadContext context = dbClient.singleUse()) { + ResultSet rs = context.executeQuery(Statement.of("SELECT COUNT(*) FROM albums")); + assertTrue(rs.next()); + assertEquals(3, rs.getLong(0)); + } + } + + private DatabaseClient getDbClient() { + return spanner + .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); + } + +} \ No newline at end of file From 7bad2e16aba3e85529cb6c409771dc4f7d51989a Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Mar 2018 09:34:22 -0700 Subject: [PATCH 6/9] Makes checkstyle happy --- .../example/dataflow/SpannerGroupWrite.java | 1 - .../com/example/dataflow/SpannerReadAll.java | 3 +- .../com/example/dataflow/SpannerWrite.java | 2 +- .../example/dataflow/SpannerGroupWriteIT.java | 35 ++++++++--- .../com/example/dataflow/SpannerReadIT.java | 59 +++++++++++-------- .../com/example/dataflow/SpannerWriteIT.java | 46 ++++++++++----- 6 files changed, 93 insertions(+), 53 deletions(-) diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java index c127115e073..e1db90f1821 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerGroupWrite.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.spanner.MutationGroup; import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java index a96dbc46e76..b0eb417ced5 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerReadAll.java @@ -81,7 +81,8 @@ public static void main(String[] args) { PCollection dbEstimatedSize = allRecords.apply(EstimateSize.create()) .apply(Sum.longsGlobally()); - dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput()).withoutSharding()); + dbEstimatedSize.apply(ToString.elements()).apply(TextIO.write().to(options.getOutput()) + .withoutSharding()); p.run().waitUntilFinish(); } diff --git a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java index fd03c88f764..09e9b3e301a 100644 --- a/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java +++ b/dataflow/spanner-io/src/main/java/com/example/dataflow/SpannerWrite.java @@ -91,7 +91,7 @@ public interface Options extends PipelineOptions { String getDatabaseId(); void setDatabaseId(String value); - } + } @DefaultCoder(AvroCoder.class) static class Singer { diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java index b316ff4c976..0d49b273389 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -1,5 +1,24 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.dataflow; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; @@ -15,11 +34,6 @@ import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.annotation.Nullable; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -27,10 +41,12 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - +@SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerGroupWriteIT { final String instanceId = "mairbek-deleteme"; @@ -57,7 +73,8 @@ public void setUp() throws Exception { Operation op = adminClient .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE users (" + "id STRING(MAX) NOT NULL, state STRING(MAX) NOT NULL) PRIMARY KEY (id)", - "CREATE TABLE PendingReviews (id INT64, action STRING(MAX), note STRING(MAX), userId STRING(MAX),) PRIMARY KEY (id)")); + "CREATE TABLE PendingReviews (id INT64, action STRING(MAX), " + + "note STRING(MAX), userId STRING(MAX),) PRIMARY KEY (id)")); op.waitFor(); diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java index d16c903733a..f08b6a382a9 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java @@ -1,39 +1,46 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.dataflow; +import static org.junit.Assert.assertEquals; + import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Operation; -import com.google.cloud.spanner.ReadContext; -import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import javax.annotation.Nullable; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +@SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerReadIT { final String instanceId = "mairbek-deleteme"; @@ -57,8 +64,11 @@ public void setUp() throws Exception { } Operation op = adminClient - .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers (singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", - "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers " + + "(singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, " + + "lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, " + + "albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); op.waitFor(); @@ -126,8 +136,7 @@ public void tearDown() throws Exception { public void readDbEndToEnd() throws Exception { Path outPath = Files.createTempFile("out", "txt"); SpannerReadAll.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, - "--output=" + outPath, - "--runner=DirectRunner" }); + "--output=" + outPath, "--runner=DirectRunner" }); String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); @@ -138,8 +147,7 @@ public void readDbEndToEnd() throws Exception { public void readTableEndToEnd() throws Exception { Path outPath = Files.createTempFile("out", "txt"); SpannerRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, - "--output=" + outPath, "--table=albums", - "--runner=DirectRunner" }); + "--output=" + outPath, "--table=albums", "--runner=DirectRunner" }); String content = Files.readAllLines(outPath).stream().collect(Collectors.joining("\n")); @@ -150,9 +158,10 @@ public void readTableEndToEnd() throws Exception { public void reaTransactionalReadEndToEnd() throws Exception { Path singersPath = Files.createTempFile("singers", "txt"); Path albumsPath = Files.createTempFile("albums", "txt"); - TransactionalRead.main(new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, - "--singersFilename=" + singersPath, "--albumsFilename="+albumsPath, - "--runner=DirectRunner" }); + TransactionalRead.main( + new String[] { "--instanceId=" + instanceId, "--databaseId=" + databaseId, + "--singersFilename=" + singersPath, "--albumsFilename=" + albumsPath, + "--runner=DirectRunner" }); assertEquals(4, Files.readAllLines(singersPath).size()); assertEquals(2, Files.readAllLines(albumsPath).size()); diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java index 3c68a1a9ee9..9fb4908c79f 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java @@ -1,10 +1,28 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.example.dataflow; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Operation; import com.google.cloud.spanner.ReadContext; import com.google.cloud.spanner.ResultSet; @@ -12,26 +30,17 @@ import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; -import com.google.cloud.spanner.TransactionRunner; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.annotation.Nullable; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - +@SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerWriteIT { final String instanceId = "mairbek-deleteme"; @@ -57,8 +66,13 @@ public void setUp() throws Exception { } Operation op = adminClient - .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers (singerId INT64 NOT NULL, firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) PRIMARY KEY (singerId)", - "CREATE TABLE Albums (singerId INT64 NOT NULL, albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) PRIMARY KEY (singerId, albumId)")); + .createDatabase(instanceId, databaseId, Arrays.asList("CREATE TABLE Singers " + + "(singerId INT64 NOT NULL, " + + "firstName STRING(MAX) NOT NULL, lastName STRING(MAX) NOT NULL,) " + + "PRIMARY KEY (singerId)", + "CREATE TABLE Albums (singerId INT64 NOT NULL, " + + "albumId INT64 NOT NULL, albumTitle STRING(MAX) NOT NULL,) " + + "PRIMARY KEY (singerId, albumId)")); op.waitFor(); From 08c2629f782331e70d1acd8e2ade61d8fdeb1a32 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Mar 2018 09:36:22 -0700 Subject: [PATCH 7/9] Use system properties --- .../java/com/example/dataflow/SpannerGroupWriteIT.java | 6 ++++-- .../src/test/java/com/example/dataflow/SpannerReadIT.java | 6 ++++-- .../src/test/java/com/example/dataflow/SpannerWriteIT.java | 7 +++++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java index 0d49b273389..5b49cb4addc 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -49,8 +49,8 @@ @SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerGroupWriteIT { - final String instanceId = "mairbek-deleteme"; - final String databaseId = "test2"; + String instanceId; + String databaseId; Path tempPath; Spanner spanner; @@ -58,6 +58,8 @@ public class SpannerGroupWriteIT { @Before public void setUp() throws Exception { + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-groupwrite-it"; spannerOptions = SpannerOptions.getDefaultInstance(); spanner = spannerOptions.getService(); diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java index f08b6a382a9..621c00b2018 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerReadIT.java @@ -43,14 +43,16 @@ @SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerReadIT { - final String instanceId = "mairbek-deleteme"; - final String databaseId = "test2"; + String instanceId; + String databaseId; Spanner spanner; SpannerOptions spannerOptions; @Before public void setUp() throws Exception { + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-read-it"; spannerOptions = SpannerOptions.getDefaultInstance(); spanner = spannerOptions.getService(); diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java index 9fb4908c79f..757e2529caa 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java @@ -43,8 +43,8 @@ @SuppressWarnings("checkstyle:abbreviationaswordinname") public class SpannerWriteIT { - final String instanceId = "mairbek-deleteme"; - final String databaseId = "test2"; + String instanceId; + String databaseId; Path singersPath; Path albumsPath; @@ -54,6 +54,9 @@ public class SpannerWriteIT { @Before public void setUp() throws Exception { + instanceId = System.getProperty("spanner.test.instance"); + databaseId = "df-spanner-write-it"; + spannerOptions = SpannerOptions.getDefaultInstance(); spanner = spannerOptions.getService(); From 419500f0249d393cce4cd7bc255805f791566360 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Mar 2018 09:48:49 -0700 Subject: [PATCH 8/9] Add add system property --- dataflow/spanner-io/pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dataflow/spanner-io/pom.xml b/dataflow/spanner-io/pom.xml index 8df1981b89d..165efbffa65 100644 --- a/dataflow/spanner-io/pom.xml +++ b/dataflow/spanner-io/pom.xml @@ -48,6 +48,16 @@ maven-compiler-plugin 3.7.0 + + org.apache.maven.plugins + maven-failsafe-plugin + 2.19.1 + + + default-instance + + + From 56b767e41cba974a0719073d28b8e6c5b735cf42 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Mar 2018 11:47:59 -0700 Subject: [PATCH 9/9] New lines --- .../src/test/java/com/example/dataflow/SpannerGroupWriteIT.java | 2 +- .../src/test/java/com/example/dataflow/SpannerWriteIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java index 5b49cb4addc..c35773fd8a5 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerGroupWriteIT.java @@ -146,4 +146,4 @@ private DatabaseClient getDbClient() { .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); } -} \ No newline at end of file +} diff --git a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java index 757e2529caa..5631ddcbd46 100644 --- a/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java +++ b/dataflow/spanner-io/src/test/java/com/example/dataflow/SpannerWriteIT.java @@ -132,4 +132,4 @@ private DatabaseClient getDbClient() { .getDatabaseClient(DatabaseId.of(spannerOptions.getProjectId(), instanceId, databaseId)); } -} \ No newline at end of file +}