Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1542] SpannerIO sink updates #3221

Closed
wants to merge 26 commits into from
Closed
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8db6693
Initial implementation of SpannerIO.Write
gamolina May 4, 2017
491f047
Minor style, compilation, javadoc fixups
dhalperi May 4, 2017
7a2d09a
SpannerCSVLoader: update for TextIO API Change
dhalperi May 4, 2017
77be276
Fix spanner dependency management
dhalperi May 4, 2017
4049047
Introduced MutationSizeEstimator.
May 12, 2017
4392de9
Initial implementation of SpannerIO.Write
gamolina May 4, 2017
b092be8
Minor style, compilation, javadoc fixups
dhalperi May 4, 2017
e16339a
Fix spanner dependency management
dhalperi May 4, 2017
9798534
Delete SpannerCSVLoader
dhalperi May 4, 2017
f495a41
Refine Spanner API tests
dhalperi May 15, 2017
ce5d04c
SpannerIO.Write cleanup and style fixes
dhalperi May 15, 2017
81e71f6
Merge branch 'takeover-2166' of https://github.com/dhalperi/beam into…
May 15, 2017
16e4b83
Use a new batch size limit.
May 15, 2017
5a2214a
SpannerIO.Write cleanup and style fixes
dhalperi May 15, 2017
5fc07fe
Added a SpannerIO unit test.
May 15, 2017
0b7b84d
Renaming and documenting
May 15, 2017
2b5a10e
Reorganized test.
May 15, 2017
938b92c
Merge remote-tracking branch 'dhalperi/takeover-2166' into mergespanner
May 16, 2017
fd2a76c
Pre PR cleanup
May 16, 2017
e24254c
Sketch out integration test
May 16, 2017
34c7c6f
End to end test for Cloud Spanner Sink
May 16, 2017
f30b520
Make checkstyle happy
May 16, 2017
060ff58
Merge branch 'master' of https://github.com/apache/beam into mergespa…
May 24, 2017
1f256ef
Merge branch 'master' of https://github.com/apache/beam into mergespa…
May 24, 2017
dabda8d
Revert style change
May 24, 2017
c07a32f
Addressing code review comments.
May 24, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@
<google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
<guava.version>20.0</guava.version>
<grpc.version>1.2.0</grpc.version>
<grpc-google-common-protos.version>0.1.0</grpc-google-common-protos.version>
<grpc-google-common-protos.version>0.1.9</grpc-google-common-protos.version>
<hamcrest.version>1.3</hamcrest.version>
<jackson.version>2.8.8</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
@@ -175,8 +175,8 @@
</modules>

<profiles>
<!-- A global profile defined for all modules for release-level verification.
Optional processes such as building source and javadoc should be limited
<!-- A global profile defined for all modules for release-level verification.
Optional processes such as building source and javadoc should be limited
to this profile. -->
<profile>
<id>release</id>
@@ -493,7 +493,7 @@
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
@@ -737,13 +737,13 @@
<artifactId>google-auth-library-credentials</artifactId>
<version>${google-auth.version}</version>
</dependency>

<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>${google-auth.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled in by a transitive
<!-- Exclude an old version of guava that is being pulled in by a transitive
dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
@@ -808,12 +808,24 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-spanner-admin-database-v1</artifactId>
<version>${grpc-google-common-protos.version}</version>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
<version>${grpc-google-common-protos.version}</version>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-storage</artifactId>
<version>${storage.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled in by a transitive
<!-- Exclude an old version of guava that is being pulled in by a transitive
dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
@@ -900,7 +912,7 @@
<artifactId>google-api-services-dataflow</artifactId>
<version>${dataflow.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled in by a transitive
<!-- Exclude an old version of guava that is being pulled in by a transitive
dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
@@ -914,7 +926,7 @@
<artifactId>google-api-services-clouddebugger</artifactId>
<version>${clouddebugger.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled in by a transitive
<!-- Exclude an old version of guava that is being pulled in by a transitive
dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
@@ -1015,7 +1027,7 @@
<artifactId>byte-buddy</artifactId>
<version>1.6.8</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
@@ -1122,7 +1134,7 @@
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
@@ -1393,7 +1405,7 @@
</configuration>
</plugin>

<!-- This plugin's configuration tells the m2e plugin how to import this
<!-- This plugin's configuration tells the m2e plugin how to import this
Maven project into the Eclipse environment. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
@@ -1730,7 +1742,7 @@
</goals>
<configuration>
<outputDirectory>${basedir}/sdks/python</outputDirectory>
<resources>
<resources>
<resource>
<directory>${basedir}</directory>
<includes>
@@ -1739,8 +1751,8 @@
<include>README.md</include>
</includes>
</resource>
</resources>
</configuration>
</resources>
</configuration>
</execution>
</executions>
</plugin>
20 changes: 15 additions & 5 deletions sdks/java/io/google-cloud-platform/pom.xml
Original file line number Diff line number Diff line change
@@ -86,11 +86,6 @@
<artifactId>grpc-core</artifactId>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-common-protos</artifactId>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
@@ -248,6 +243,21 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-spanner-admin-database-v1</artifactId>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.ByteArray;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;

/** Estimates the logical size of {@link com.google.cloud.spanner.Mutation}. */
class MutationSizeEstimator {

// Prevent construction.
private MutationSizeEstimator() {}

/** Estimates a size of mutation in bytes. */
static long sizeOf(Mutation m) {
long result = 0;
for (Value v : m.getValues()) {
switch (v.getType().getCode()) {
case ARRAY:
result += estimateArrayValue(v);
break;
case STRUCT:
throw new IllegalArgumentException("Structs are not supported in mutation.");
default:
result += estimatePrimitiveValue(v);
}
}
return result;
}

private static long estimatePrimitiveValue(Value v) {
switch (v.getType().getCode()) {
case BOOL:
return 1;
case INT64:
case FLOAT64:
return 8;
case DATE:
case TIMESTAMP:
return 12;
case STRING:
return v.isNull() ? 0 : v.getString().length();
case BYTES:
return v.isNull() ? 0 : v.getBytes().length();
}
throw new IllegalArgumentException("Unsupported type " + v.getType());
}

private static long estimateArrayValue(Value v) {
switch (v.getType().getArrayElementType().getCode()) {
case BOOL:
return v.getBoolArray().size();
case INT64:
return 8 * v.getInt64Array().size();
case FLOAT64:
return 8 * v.getFloat64Array().size();
case STRING:
long totalLength = 0;
for (String s : v.getStringArray()) {
if (s == null) {
continue;
}
totalLength += s.length();
}
return totalLength;
case BYTES:
totalLength = 0;
for (ByteArray bytes : v.getBytesArray()) {
if (bytes == null) {
continue;
}
totalLength += bytes.length();
}
return totalLength;
case DATE:
return 12 * v.getDateArray().size();
case TIMESTAMP:
return 12 * v.getTimestampArray().size();
}
throw new IllegalArgumentException("Unsupported type " + v.getType());
}
}
Loading