-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1542] SpannerIO sink updates #3221
Conversation
This closes apache#2166.
Also minor cleanup alphabetization in root pom.xml
This closes apache#2166.
Also minor cleanup alphabetization in root pom.xml
This is not appropriate for examples. SpannerIO should be well-javadoced and integration tested.
And remove outdated Bigtable comment
* Rename to Write to match the rest of the SDK. * Convert to AutoValue, delete toString. * Drop .writeTo(), instead use .write() as default constructor. * Temporarily drop withBatchSize, as its existence is not clearly justified.
… mergespanner # Conflicts: # sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java # sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
* Rename to Write to match the rest of the SDK. * Convert to AutoValue, delete toString. * Drop .writeTo(), instead use .write() as default constructor. * Temporarily drop withBatchSize, as its existence is not clearly justified.
Better formatting
…nner # Conflicts: # pom.xml # sdks/java/io/google-cloud-platform/pom.xml # sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
…nner # Conflicts: # pom.xml # sdks/java/io/google-cloud-platform/pom.xml # sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. I'll run the postcommit to make sure the integration test actually passes.
*/ | ||
@Experimental(Experimental.Kind.SOURCE_SINK) | ||
public class SpannerIO { | ||
|
||
@VisibleForTesting | ||
static final int SPANNER_MUTATIONS_PER_COMMIT_LIMIT = 20000; | ||
private static final long DEFAULT_BATCH_SIZE = 1024 * 1024; // 1 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add _BYTES
here and to the relevant builder.
@@ -72,21 +75,32 @@ | |||
* mutations.apply( | |||
* "Write", SpannerIO.write().withInstanceId("instance").withDatabaseId("database")); | |||
* }</pre> | |||
* | |||
* <p>The default size of the batch is set to 1MB, to override this use {@link | |||
* Write#withBatchSize(long)}. Setting batch size to a small value or zero practically disables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withBatchSizeBytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -217,20 +282,19 @@ public void finishBundle() throws Exception { | |||
@Teardown | |||
public void teardown() throws Exception { | |||
if (spanner == null) { | |||
return; | |||
return; | |||
} | |||
spanner.closeAsync().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set spanner to null here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.addIfNotNull(DisplayData.item("databaseId", databaseId) | ||
.withLabel("Database")); | ||
.addIfNotNull(DisplayData.item("instanceId", spec.getInstanceId()).withLabel("Instance")) | ||
.addIfNotNull(DisplayData.item("databaseId", spec.getDatabaseId()).withLabel("Database")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add if not default batchsize
add the service factory or the class of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Description("Instance ID to write to in Spanner") | ||
@Default.String("beam-test") | ||
String getInstanceId(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please delete empty lines between getter and setter in options.
+ " (" | ||
+ " Key INT64," | ||
+ " Value STRING(MAX)," | ||
+ ") PRIMARY KEY (Key)")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way in the spanner client to defend against injection attacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not with DDL...
https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/3916/ is a run of the postcommit against this PR |
R: @dhalperi |
flushBatch(); | ||
} | ||
} | ||
|
||
private String projectId() { | ||
return spec.getProjectId() == null | ||
? ServiceOptions.getDefaultProjectId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukecwik you know a lot about default projects -- does this do the right thing generally? Should we switch to it at a larger scale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to swap to use ServiceOptions.getDefaultProjectId() a while ago and have not been able to perform that migration for GcpOptions. If we can include it here great. This would remove all our hand-coded detect default project code. Note that by doing this we would remove support for gcloud users who have old gcloud SDK installations.
I requested the original change in gcloud-java-core to have this exposed:
googleapis/google-cloud-java#1380
spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService(); | ||
|
||
databaseAdminClient = spanner.getDatabaseAdminClient(); | ||
Operation<Database, CreateDatabaseMetadata> op = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imagining failure modes, let's say for some reason the tearDown
step does not run. We should try dropping the database here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, added a line to to drop the DB.
In case if DB is not found it's a noop.
https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/3919/ is a run of the ITs against the current HEAD |
retest this please |
Can you rebase and squash to a single commit? I'm still not sure why the build is failing |
7af3a76
to
bd3912b
Compare
https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/3935/ is a run of postcommit against current branch |
DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) | ||
.addIfNotNull( | ||
DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")) | ||
.addIfNotNull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this can't be null, it's a
long
. - Use
addIfNotDefault
so it doesn't show up when left to default settings. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java#L212
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is valuable to keep it. -> using add
.
The IT passed, so LGTM. One minor recommendation left. |
Unit and integration test.
Logical mutation size estimation.
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
.<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.