Skip to content

Commit

Permalink
Updated Spanner Dataflow connector samples (#1059)
Browse files Browse the repository at this point in the history
* Updated Spanner Dataflow connector samples

* Make checkstyle happy

* First test

* Updated the test

* Added tests

* Makes checkstyle happy

* Use system properties

* Add add system property

* New lines
  • Loading branch information
mairbek authored and jsimonweb committed Mar 21, 2018
1 parent ebdc07f commit df10b39
Show file tree
Hide file tree
Showing 10 changed files with 931 additions and 87 deletions.
17 changes: 10 additions & 7 deletions dataflow/spanner-io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<systemPropertyVariables>
<spanner.test.instance>default-instance</spanner.test.instance>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

Expand Down Expand Up @@ -84,13 +94,6 @@
<version>${apache_beam.version}</version>
</dependency>

<!-- Google Cloud -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>0.34.0-beta</version>
</dependency>

<!-- Misc -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

import com.google.cloud.spanner.Struct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

/**
* Estimates the size of the {@code Struct}.
*/
public class EstimateSize extends PTransform<PCollection<Struct>, PCollection<Long>> {

public static EstimateSize create() {
return new EstimateSize();
}

private EstimateSize() {
}

@Override
public PCollection<Long> expand(PCollection<Struct> input) {
return input.apply(ParDo.of(new EstimateStructSizeFn()));
}

/**
* Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
*/
public static class EstimateStructSizeFn extends DoFn<Struct, Long> {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Struct row = c.element();
long sum = 0;
for (int i = 0; i < row.getColumnCount(); i++) {
if (row.isNull(i)) {
continue;
}

switch (row.getColumnType(i).getCode()) {
case BOOL:
sum += 1;
break;
case INT64:
case FLOAT64:
sum += 8;
break;
case TIMESTAMP:
case DATE:
sum += 12;
break;
case BYTES:
sum += row.getBytes(i).length();
break;
case STRING:
sum += row.getString(i).length();
break;
case ARRAY:
throw new IllegalArgumentException("Arrays are not supported :(");
case STRUCT:
throw new IllegalArgumentException("Structs are not supported :(");
default:
throw new IllegalArgumentException("Unsupported type :(");
}
}
c.output(sum);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Mutation;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

/**
* This sample demonstrates how to group together mutations when writing to the Cloud Spanner
* database.
*/
public class SpannerGroupWrite {
public interface Options extends PipelineOptions {

@Description("Spanner instance ID to write to")
@Validation.Required
String getInstanceId();

void setInstanceId(String value);

@Description("Spanner database name to write to")
@Validation.Required
String getDatabaseId();

void setDatabaseId(String value);

@Description("Singers output filename in the format: singer_id\tfirst_name\tlast_name")
@Validation.Required
String getSuspiciousUsersFile();

void setSuspiciousUsersFile(String value);

}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);

String instanceId = options.getInstanceId();
String databaseId = options.getDatabaseId();

String usersIdFile = options.getSuspiciousUsersFile();

PCollection<String> suspiciousUserIds = p.apply(TextIO.read().from(usersIdFile));

final Timestamp timestamp = Timestamp.now();

// [START spanner_dataflow_writegroup]
PCollection<MutationGroup> mutations = suspiciousUserIds
.apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

@Override
public MutationGroup apply(String userId) {
// Immediately block the user.
Mutation userMutation = Mutation.newUpdateBuilder("Users")
.set("id").to(userId)
.set("state").to("BLOCKED")
.build();
long generatedId = Hashing.sha1().newHasher()
.putString(userId, Charsets.UTF_8)
.putLong(timestamp.getSeconds())
.putLong(timestamp.getNanos())
.hash()
.asLong();

// Add an entry to pending review requests.
Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
.set("id").to(generatedId) // Must be deterministically generated.
.set("userId").to(userId)
.set("action").to("REVIEW ACCOUNT")
.set("note").to("Suspicious activity detected.")
.build();

return MutationGroup.create(userMutation, pendingReview);
}
}));

mutations.apply(SpannerIO.write()
.withInstanceId(instanceId)
.withDatabaseId(databaseId)
.grouped());
// [END spanner_dataflow_writegroup]

p.run().waitUntilFinish();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -87,73 +83,33 @@ public interface Options extends PipelineOptions {
void setOutput(String value);
}

/**
* Estimates the size of a Spanner row. For simplicity, arrays and structs aren't supported.
*/
public static class EstimateStructSizeFn extends DoFn<Struct, Long> {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Struct row = c.element();
long sum = 0;
for (int i = 0; i < row.getColumnCount(); i++) {
if (row.isNull(i)) {
continue;
}

switch (row.getColumnType(i).getCode()) {
case BOOL:
sum += 1;
break;
case INT64:
case FLOAT64:
sum += 8;
break;
case TIMESTAMP:
case DATE:
sum += 12;
break;
case BYTES:
sum += row.getBytes(i).length();
break;
case STRING:
sum += row.getString(i).length();
break;
case ARRAY:
throw new IllegalArgumentException("Arrays are not supported :(");
case STRUCT:
throw new IllegalArgumentException("Structs are not supported :(");
default:
throw new IllegalArgumentException("Unsupported type :(");
}
}
c.output(sum);
}
}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);

String instanceId = options.getInstanceId();
String databaseId = options.getDatabaseId();
String query = "SELECT * FROM " + options.getTable();

PCollection<Long> tableEstimatedSize = p
// Query for all the columns and rows in the specified Spanner table
.apply(SpannerIO.read()
// [START spanner_dataflow_read]
// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
SpannerIO.read()
.withInstanceId(instanceId)
.withDatabaseId(databaseId)
.withQuery(query))
.withQuery("SELECT * FROM " + options.getTable()));
// [START spanner_dataflow_read]


PCollection<Long> tableEstimatedSize = records
// Estimate the size of every row
.apply(ParDo.of(new EstimateStructSizeFn()))
.apply(EstimateSize.create())
// Sum all the row sizes to get the total estimated size of the table
.apply(Sum.longsGlobally());

// Write the total size to a file
tableEstimatedSize
.apply(ToString.elements())
.apply(TextIO.write().to(options.getOutput()));
.apply(TextIO.write().to(options.getOutput()).withoutSharding());

p.run().waitUntilFinish();
}
Expand Down
Loading

0 comments on commit df10b39

Please sign in to comment.