Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into trace-id
Browse files Browse the repository at this point in the history
  • Loading branch information
yayi-google committed Mar 2, 2021
2 parents 320c679 + 3b60f44 commit bcbeb1b
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 184 deletions.
2 changes: 1 addition & 1 deletion .github/readme/synth.metadata/synth.metadata
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-bigquerystorage.git",
"sha": "fb369c79db634ed8e457b9cbb3246af239ae95b1"
"sha": "68fcb1936ccc745d4cdf8fdf1c84eaad1e97dca5"
}
},
{
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## [1.13.0](https://www.github.com/googleapis/java-bigquerystorage/compare/v1.12.0...v1.13.0) (2021-03-01)


### Features

* **generator:** update protoc to v3.15.3 ([#898](https://www.github.com/googleapis/java-bigquerystorage/issues/898)) ([2f277d6](https://www.github.com/googleapis/java-bigquerystorage/commit/2f277d650e8f617c6253843baf73d5d220713a61))
* in StreamWriterV2, supports new append, which takes rows and offset ([#894](https://www.github.com/googleapis/java-bigquerystorage/issues/894)) ([f3865b0](https://www.github.com/googleapis/java-bigquerystorage/commit/f3865b06ea7c61e95d3ee9bc7b46857d9d3080cc))
* StreamWriterV2 will handle schema/streamName attachment ([#877](https://www.github.com/googleapis/java-bigquerystorage/issues/877)) ([c54bcfe](https://www.github.com/googleapis/java-bigquerystorage/commit/c54bcfec1706eef58eaf9dad8b49dc79fc8da133))


### Dependencies

* update dependency com.google.cloud:google-cloud-bigquery to v1.127.5 ([#896](https://www.github.com/googleapis/java-bigquerystorage/issues/896)) ([d211c76](https://www.github.com/googleapis/java-bigquerystorage/commit/d211c76dff747121d4560b55818c10bf595ef1c3))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.20.0 ([#892](https://www.github.com/googleapis/java-bigquerystorage/issues/892)) ([438f1c3](https://www.github.com/googleapis/java-bigquerystorage/commit/438f1c3b551e6b97a3241c69f2006a5a6be78c4f))

## [1.12.0](https://www.github.com/googleapis/java-bigquerystorage/compare/v1.11.0...v1.12.0) (2021-02-25)


Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ If you are using Maven without BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>1.12.0</version>
<version>1.13.0</version>
</dependency>

```
Expand All @@ -51,12 +51,12 @@ compile 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies
```Groovy
compile 'com.google.cloud:google-cloud-bigquerystorage:1.12.0'
compile 'com.google.cloud:google-cloud-bigquerystorage:1.13.0'
```

If you are using SBT, add this to your dependencies
```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "1.12.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "1.13.0"
```

## Authentication
Expand Down
20 changes: 10 additions & 10 deletions google-cloud-bigquerystorage-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage-bom</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<version>1.13.0</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<packaging>pom</packaging>
<parent>
<groupId>com.google.cloud</groupId>
Expand Down Expand Up @@ -63,48 +63,48 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1alpha2</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1alpha2:current} -->
<version>0.113.0</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1alpha2:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1beta1</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1beta1:current} -->
<version>0.113.0</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1beta1:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1beta2</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1beta2:current} -->
<version>0.113.0</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1beta2:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1:current} -->
<version>1.13.0</version><!-- {x-version-update:proto-google-cloud-bigquerystorage-v1:current} -->
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigquerystorage-v1alpha2</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1alpha2:current} -->
<version>0.113.0</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1alpha2:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigquerystorage-v1beta1</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1beta1:current} -->
<version>0.113.0</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1beta1:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigquerystorage-v1beta2</artifactId>
<version>0.112.1-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1beta2:current} -->
<version>0.113.0</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1beta2:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigquerystorage-v1</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1:current} -->
<version>1.13.0</version><!-- {x-version-update:grpc-google-cloud-bigquerystorage-v1:current} -->
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<version>1.13.0</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
4 changes: 2 additions & 2 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<version>1.13.0</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<packaging>jar</packaging>
<name>BigQuery Storage</name>
<url>https://github.com/googleapis/java-bigquerystorage</url>
<description>BigQuery Storage</description>
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage-parent</artifactId>
<version>1.12.1-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
<version>1.13.0</version><!-- {x-version-update:google-cloud-bigquerystorage:current} -->
</parent>
<properties>
<site.installationModule>google-cloud-bigquerystorage</site.installationModule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand All @@ -39,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Attach schema.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
Expand All @@ -57,6 +57,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final String streamName;

/*
* The proto schema of rows to write.
*/
private final ProtoSchema writerSchema;

/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
Expand Down Expand Up @@ -138,6 +143,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.traceId = builder.traceId;
Expand Down Expand Up @@ -192,10 +198,52 @@ public void run() {
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (response.hasOffset()) {
* System.out.println("written with offset: " + response.getOffset());
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
* public void onFailure(Throwable t) {
* System.out.println("failed to write: " + t);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*
* @param rows the rows in serialized format to write to BigQuery.
* @param offset the offset of the first row.
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
// TODO: Move this check to builder after the other append is removed.
if (this.writerSchema == null) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
return append(requestBuilder.build());
}

/**
* Schedules the writing of a message.
*
* <p>Example of writing a message.
*
* <pre>{@code
* AppendRowsRequest message;
* ApiFuture<AppendRowsResponse> messageIdFuture = writer.append(message);
* ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<AppendRowsResponse>() {
* public void onSuccess(AppendRowsResponse response) {
* if (!response.hasError()) {
* System.out.println("written with offset: " + response.getAppendResult().getOffset());
* } else {
* System.out.println("received an in stream error: " + response.error().toString());
* System.out.println("received an in stream error: " + response.getError().toString());
* }
* }
*
Expand All @@ -206,8 +254,9 @@ public void run() {
* }</pre>
*
* @param message the message in serialized format to write to BigQuery.
* @return the message ID wrapped in a future.
* @return the append response wrapped in a future.
*/
@Deprecated
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
Expand Down Expand Up @@ -384,6 +433,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
if (this.writerSchema != null) {
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
if (this.traceId != null) {
requestBuilder.setTraceId(this.traceId);
Expand Down Expand Up @@ -480,6 +532,8 @@ public static final class Builder {

private BigQueryWriteClient client;

private ProtoSchema writerSchema = null;

private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS;

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
Expand All @@ -504,6 +558,12 @@ private Builder(String streamName, BigQueryWriteClient client) {
this.client = Preconditions.checkNotNull(client);
}

/** Sets the proto schema of the rows. */
public Builder setWriterSchema(ProtoSchema writerSchema) {
this.writerSchema = writerSchema;
return this;
}

public Builder setMaxInflightRequests(long value) {
this.maxInflightRequest = value;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,68 +834,4 @@ public void testExistingClient() throws Exception {
client.shutdown();
client.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testFlushAll() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

writer.flushAll(100000);

assertTrue(appendFuture3.isDone());

writer.close();
}

@Test
public void testFlushAllFailed() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
try {
writer.flushAll(100000);
fail("Should have thrown an Exception");
} catch (Exception expected) {
if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) {
LOG.info("got: " + expected.toString());
} else {
fail("Unexpected exception:" + expected.toString());
}
}

assertTrue(appendFuture3.isDone());

writer.close();
}
}
Loading

0 comments on commit bcbeb1b

Please sign in to comment.