From ab3f386dad21ffa0bad69be551f537e134e48252 Mon Sep 17 00:00:00 2001 From: yirutang Date: Mon, 19 Apr 2021 17:46:04 -0700 Subject: [PATCH 01/11] repro --- .../it/ITBigQueryWriteManualClientTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 8d60dd2e96..b3c4dcda35 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -49,6 +49,7 @@ public class ITBigQueryWriteManualClientTest { private static final Logger LOG = Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName()); private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; @@ -56,8 +57,10 @@ public class ITBigQueryWriteManualClientTest { private static BigQueryWriteClient client; private static TableInfo tableInfo; private static TableInfo tableInfo2; + private static TableInfo tableInfoEU; private static String tableId; private static String tableId2; + private static String tableIdEU; private static BigQuery bigquery; @BeforeClass @@ -110,6 +113,21 @@ public static void beforeClass() throws IOException { String.format( "projects/%s/datasets/%s/tables/%s", ServiceOptions.getDefaultProjectId(), DATASET, TABLE2); + DatasetInfo datasetInfoEU = + DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU).setLocation("EU").setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfoEU); + tableInfoEU = + TableInfo.newBuilder( + TableId.of(DATASET_EU, TABLE), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING).build()))) + .build(); + tableIdEU = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE); + bigquery.create(tableInfoEU); } @AfterClass @@ -206,6 +224,53 @@ public void testBatchWriteWithCommittedStream() } } + ProtoRows CreateProtoRows(String[] messages) { + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + return rows.build(); + } + + @Test + public void testBatchWriteWithCommittedStreamEU() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableIdEU) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()).setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build(); + LOG.info("Sending one message"); + + ApiFuture response = + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + + LOG.info("Sending two more messages"); + ApiFuture response1 = + streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1); + ApiFuture response2 = + streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3); + assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData( + tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + } + @Test public void testJsonStreamWriterCommittedStream() throws IOException, InterruptedException, ExecutionException, From 81d44b3582c14d716d7d6fd5a8a786e66a145bfa Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 10:53:58 -0700 Subject: [PATCH 02/11] . --- .../google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 95696a4574..f50ac41a9b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -22,6 +22,7 @@ import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; +import com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; @@ -160,6 +161,7 @@ private StreamWriterV2(Builder builder) throws IOException { .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) + .setHeaderProvider(BigQueryWriteStubSettings.defaultApiClientHeaderProviderBuilder().build()) .build(); this.client = BigQueryWriteClient.create(stubSettings); this.ownsBigQueryWriteClient = true; From a851d0abf292db3c7a81c46fc0e19934393423c7 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 12:13:04 -0700 Subject: [PATCH 03/11] . --- .../storage/v1beta2/StreamWriterV2.java | 5 +++- .../v1beta2/stub/ResourceHeaderTest.java | 30 +++++++++++++++---- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index f50ac41a9b..9aeea88869 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; @@ -156,12 +157,13 @@ private StreamWriterV2(Builder builder) throws IOException { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); if (builder.client == null) { + log.info("here!!!!"); BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder() .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) - .setHeaderProvider(BigQueryWriteStubSettings.defaultApiClientHeaderProviderBuilder().build()) + .setHeaderProvider(FixedHeaderProvider.create("write_stream", this.streamName)) .build(); this.client = BigQueryWriteClient.create(stubSettings); this.ownsBigQueryWriteClient = true; @@ -223,6 +225,7 @@ public void run() { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { + log.info(client.getSettings().getHeaderProvider().toString()); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); if (offset >= 0) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index abfca7b61c..cba28a093b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -22,12 +22,9 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.UnimplementedException; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1beta2.*; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; -import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; -import com.google.cloud.bigquery.storage.v1beta2.ReadSession; -import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest; + import java.util.regex.Pattern; import org.junit.After; import org.junit.AfterClass; @@ -52,6 +49,11 @@ public class ResourceHeaderTest { private static final Pattern READ_SESSION_NAME_PATTERN = Pattern.compile( ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*"); + + private static final Pattern PARENT_PATTERN = + Pattern.compile( + ".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*"); + private static final Pattern READ_STREAM_PATTERN = Pattern.compile(".*" + "read_stream=streamName" + ".*"); private static final Pattern STREAM_NAME_PATTERN = @@ -65,6 +67,7 @@ public class ResourceHeaderTest { private LocalChannelProvider channelProvider; private BigQueryReadClient client; + private BigQueryWriteClient writeClient; @BeforeClass public static void setUpClass() throws Exception { @@ -81,6 +84,11 @@ public void setUp() throws Exception { .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE)) .setTransportChannelProvider(channelProvider); client = BigQueryReadClient.create(settingsBuilder.build()); + BigQueryWriteSettings.Builder writeSettingsBuilder = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider); + writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); } @After @@ -129,6 +137,18 @@ public void splitReadStreamTest() { verifyHeaderSent(STREAM_NAME_PATTERN); } + @Test + public void createWriteStreamTest() { + try { + writeClient.createWriteStream("projects/project/datasets/dataset/tables/table", + WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build()); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, PARENT_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + private void verifyHeaderSent(Pattern... patterns) { for (Pattern pattern : patterns) { boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern); From 033e63fd02cf114ee90e009d3d3359a6e3484e6b Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 12:47:14 -0700 Subject: [PATCH 04/11] . --- .../storage/v1beta2/StreamWriterV2.java | 3 +- .../v1beta2/stub/ResourceHeaderTest.java | 62 +++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 9aeea88869..1c1d95561f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -163,7 +163,7 @@ private StreamWriterV2(Builder builder) throws IOException { .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) - .setHeaderProvider(FixedHeaderProvider.create("write_stream", this.streamName)) + // .setHeaderProvider(FixedHeaderProvider.create("write_stream", this.streamName)) .build(); this.client = BigQueryWriteClient.create(stubSettings); this.ownsBigQueryWriteClient = true; @@ -225,7 +225,6 @@ public void run() { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - log.info(client.getSettings().getHeaderProvider().toString()); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); if (offset >= 0) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index cba28a093b..0b4ebce692 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -20,11 +20,13 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.InProcessServer; import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.BidiStream; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.UnimplementedException; import com.google.cloud.bigquery.storage.v1beta2.*; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; +import java.io.IOException; import java.util.regex.Pattern; import org.junit.After; import org.junit.AfterClass; @@ -40,6 +42,9 @@ public class ResourceHeaderTest { private static final String TEST_TABLE_REFERENCE = "projects/project/datasets/dataset/tables/table"; + private static final String WRITE_STREAM_NAME = + "projects/project/datasets/dataset/tables/table/streams/stream"; + private static final String TEST_STREAM_NAME = "streamName"; private static final String NAME = "resource-header-test:123"; @@ -54,6 +59,14 @@ public class ResourceHeaderTest { Pattern.compile( ".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*"); + private static final Pattern NAME_PATTERN = + Pattern.compile( + ".*" + "name=projects/project/datasets/dataset/tables/table/streams/stream" + ".*"); + + private static final Pattern WRITE_STREAM_PATTERN = + Pattern.compile( + ".*" + "write_stream=projects/project/datasets/dataset/tables/table/streams/stream" + ".*"); + private static final Pattern READ_STREAM_PATTERN = Pattern.compile(".*" + "read_stream=streamName" + ".*"); private static final Pattern STREAM_NAME_PATTERN = @@ -66,6 +79,7 @@ public class ResourceHeaderTest { private static InProcessServer server; private LocalChannelProvider channelProvider; + private LocalChannelProvider channelProvider2; private BigQueryReadClient client; private BigQueryWriteClient writeClient; @@ -84,10 +98,11 @@ public void setUp() throws Exception { .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE)) .setTransportChannelProvider(channelProvider); client = BigQueryReadClient.create(settingsBuilder.build()); + channelProvider2 = LocalChannelProvider.create(NAME); BigQueryWriteSettings.Builder writeSettingsBuilder = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(NoCredentialsProvider.create()) - .setTransportChannelProvider(channelProvider); + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider2); writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); } @@ -145,7 +160,46 @@ public void createWriteStreamTest() { } catch (UnimplementedException e) { // Ignore the error: none of the methods are actually implemented. } - boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, PARENT_PATTERN); + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, PARENT_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + + @Test + public void getWriteStreamTest() { + try { + writeClient.getWriteStream(WRITE_STREAM_NAME); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, NAME_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + + @Test + public void appendRowsTest() { + try { + AppendRowsRequest req = AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); + BidiStream bidiStream = + writeClient.appendRowsCallable().call(); + bidiStream.send(req); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + + @Test + public void appendRowsManualTest() { + try { + StreamWriterV2 streamWriter = StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient).setWriterSchema(ProtoSchema.newBuilder().build()).build(); + streamWriter.append(ProtoRows.newBuilder().build(), 1); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } catch (IOException e) { + // Ignore the error: none of the methods are actually implemented. + } + boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); assertWithMessage("Generated header was sent").that(headerSent).isTrue(); } From e1568673ffadb599e09800992e4cf8cf58a6bbee Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 12:47:54 -0700 Subject: [PATCH 05/11] . --- .../storage/v1beta2/StreamWriterV2.java | 2 - .../it/ITBigQueryWriteManualClientTest.java | 69 ++++++++++--------- .../v1beta2/stub/ResourceHeaderTest.java | 27 +++++--- 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 1c1d95561f..3f39fad981 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -18,12 +18,10 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; -import com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index b3c4dcda35..cfb5570b73 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -114,19 +114,23 @@ public static void beforeClass() throws IOException { "projects/%s/datasets/%s/tables/%s", ServiceOptions.getDefaultProjectId(), DATASET, TABLE2); DatasetInfo datasetInfoEU = - DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU).setLocation("EU").setDescription(DESCRIPTION).build(); + DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU) + .setLocation("EU") + .setDescription(DESCRIPTION) + .build(); bigquery.create(datasetInfoEU); tableInfoEU = TableInfo.newBuilder( - TableId.of(DATASET_EU, TABLE), - StandardTableDefinition.of( - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING).build()))) + TableId.of(DATASET_EU, TABLE), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) .build(); tableIdEU = String.format( - "projects/%s/datasets/%s/tables/%s", - ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE); + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE); bigquery.create(tableInfoEU); } @@ -244,31 +248,32 @@ public void testBatchWriteWithCommittedStreamEU() WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); StreamWriterV2 streamWriter = - StreamWriterV2.newBuilder(writeStream.getName()).setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) - .build(); - LOG.info("Sending one message"); - - ApiFuture response = - streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0); - assertEquals(0, response.get().getAppendResult().getOffset().getValue()); - - LOG.info("Sending two more messages"); - ApiFuture response1 = - streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1); - ApiFuture response2 = - streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3); - assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); - assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); - - TableResult result = - bigquery.listTableData( - tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter = result.getValues().iterator(); - assertEquals("aaa", iter.next().get(0).getStringValue()); - assertEquals("bbb", iter.next().get(0).getStringValue()); - assertEquals("ccc", iter.next().get(0).getStringValue()); - assertEquals("ddd", iter.next().get(0).getStringValue()); - assertEquals(false, iter.hasNext()); + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build(); + LOG.info("Sending one message"); + + ApiFuture response = + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); + + LOG.info("Sending two more messages"); + ApiFuture response1 = + streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1); + ApiFuture response2 = + streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3); + assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData( + tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); } @Test diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index 0b4ebce692..d7e87f882b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -25,7 +25,6 @@ import com.google.api.gax.rpc.UnimplementedException; import com.google.cloud.bigquery.storage.v1beta2.*; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; - import java.io.IOException; import java.util.regex.Pattern; import org.junit.After; @@ -56,8 +55,7 @@ public class ResourceHeaderTest { ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*"); private static final Pattern PARENT_PATTERN = - Pattern.compile( - ".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*"); + Pattern.compile(".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*"); private static final Pattern NAME_PATTERN = Pattern.compile( @@ -65,7 +63,9 @@ public class ResourceHeaderTest { private static final Pattern WRITE_STREAM_PATTERN = Pattern.compile( - ".*" + "write_stream=projects/project/datasets/dataset/tables/table/streams/stream" + ".*"); + ".*" + + "write_stream=projects/project/datasets/dataset/tables/table/streams/stream" + + ".*"); private static final Pattern READ_STREAM_PATTERN = Pattern.compile(".*" + "read_stream=streamName" + ".*"); @@ -100,9 +100,9 @@ public void setUp() throws Exception { client = BigQueryReadClient.create(settingsBuilder.build()); channelProvider2 = LocalChannelProvider.create(NAME); BigQueryWriteSettings.Builder writeSettingsBuilder = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(NoCredentialsProvider.create()) - .setTransportChannelProvider(channelProvider2); + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider2); writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); } @@ -155,14 +155,15 @@ public void splitReadStreamTest() { @Test public void createWriteStreamTest() { try { - writeClient.createWriteStream("projects/project/datasets/dataset/tables/table", + writeClient.createWriteStream( + "projects/project/datasets/dataset/tables/table", WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build()); } catch (UnimplementedException e) { // Ignore the error: none of the methods are actually implemented. } boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, PARENT_PATTERN); assertWithMessage("Generated header was sent").that(headerSent).isTrue(); - } + } @Test public void getWriteStreamTest() { @@ -178,7 +179,8 @@ public void getWriteStreamTest() { @Test public void appendRowsTest() { try { - AppendRowsRequest req = AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); + AppendRowsRequest req = + AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); BidiStream bidiStream = writeClient.appendRowsCallable().call(); bidiStream.send(req); @@ -192,7 +194,10 @@ public void appendRowsTest() { @Test public void appendRowsManualTest() { try { - StreamWriterV2 streamWriter = StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient).setWriterSchema(ProtoSchema.newBuilder().build()).build(); + StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient) + .setWriterSchema(ProtoSchema.newBuilder().build()) + .build(); streamWriter.append(ProtoRows.newBuilder().build(), 1); } catch (UnimplementedException e) { // Ignore the error: none of the methods are actually implemented. From 7a1ab1218f753e58cc03d00e90cdf90e7e0f247c Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:00:59 -0700 Subject: [PATCH 06/11] . --- .../cloud/bigquery/storage/v1beta2/StreamWriterV2.java | 7 +++++-- .../bigquery/storage/v1beta2/stub/ResourceHeaderTest.java | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 3f39fad981..87ac73ccfa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; @@ -155,13 +156,15 @@ private StreamWriterV2(Builder builder) throws IOException { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); if (builder.client == null) { - log.info("here!!!!"); BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder() .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) - // .setHeaderProvider(FixedHeaderProvider.create("write_stream", this.streamName)) + // (b/185842996): Temporily fix this explicitly providing the header. + .setHeaderProvider( + FixedHeaderProvider.create( + "x-goog-request-params", "write_stream=" + this.streamName)) .build(); this.client = BigQueryWriteClient.create(stubSettings); this.ownsBigQueryWriteClient = true; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index d7e87f882b..b335b3c58c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -102,6 +102,7 @@ public void setUp() throws Exception { BigQueryWriteSettings.Builder writeSettingsBuilder = BigQueryWriteSettings.newBuilder() .setCredentialsProvider(NoCredentialsProvider.create()) + .setHeaderProvider(FixedHeaderProvider.create("write_stream", WRITE_STREAM_NAME)) .setTransportChannelProvider(channelProvider2); writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); } From a481106ab5b2e57198fc6605b40878cf275f50c0 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:02:25 -0700 Subject: [PATCH 07/11] . --- .../google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 87ac73ccfa..5d39c04d08 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -161,7 +161,7 @@ private StreamWriterV2(Builder builder) throws IOException { .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) .setEndpoint(builder.endpoint) - // (b/185842996): Temporily fix this explicitly providing the header. + // (b/185842996): Temporily fix this by explicitly providing the header. .setHeaderProvider( FixedHeaderProvider.create( "x-goog-request-params", "write_stream=" + this.streamName)) From e7f6990dd80e638aee95df9a10cc06b34b70f812 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:06:05 -0700 Subject: [PATCH 08/11] . --- .../v1beta2/stub/ResourceHeaderTest.java | 65 +++++++++---------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index b335b3c58c..28e74cff05 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -20,12 +20,10 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.InProcessServer; import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.rpc.BidiStream; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.UnimplementedException; import com.google.cloud.bigquery.storage.v1beta2.*; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; -import java.io.IOException; import java.util.regex.Pattern; import org.junit.After; import org.junit.AfterClass; @@ -177,37 +175,38 @@ public void getWriteStreamTest() { assertWithMessage("Generated header was sent").that(headerSent).isTrue(); } - @Test - public void appendRowsTest() { - try { - AppendRowsRequest req = - AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); - BidiStream bidiStream = - writeClient.appendRowsCallable().call(); - bidiStream.send(req); - } catch (UnimplementedException e) { - // Ignore the error: none of the methods are actually implemented. - } - boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); - assertWithMessage("Generated header was sent").that(headerSent).isTrue(); - } - - @Test - public void appendRowsManualTest() { - try { - StreamWriterV2 streamWriter = - StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient) - .setWriterSchema(ProtoSchema.newBuilder().build()) - .build(); - streamWriter.append(ProtoRows.newBuilder().build(), 1); - } catch (UnimplementedException e) { - // Ignore the error: none of the methods are actually implemented. - } catch (IOException e) { - // Ignore the error: none of the methods are actually implemented. - } - boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); - assertWithMessage("Generated header was sent").that(headerSent).isTrue(); - } + // Following tests will work after b/185842996 is fixed. + // @Test + // public void appendRowsTest() { + // try { + // AppendRowsRequest req = + // AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build(); + // BidiStream bidiStream = + // writeClient.appendRowsCallable().call(); + // bidiStream.send(req); + // } catch (UnimplementedException e) { + // // Ignore the error: none of the methods are actually implemented. + // } + // boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); + // assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + // } + // + // @Test + // public void appendRowsManualTest() { + // try { + // StreamWriterV2 streamWriter = + // StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient) + // .setWriterSchema(ProtoSchema.newBuilder().build()) + // .build(); + // streamWriter.append(ProtoRows.newBuilder().build(), 1); + // } catch (UnimplementedException e) { + // // Ignore the error: none of the methods are actually implemented. + // } catch (IOException e) { + // // Ignore the error: none of the methods are actually implemented. + // } + // boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN); + // assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + // } private void verifyHeaderSent(Pattern... patterns) { for (Pattern pattern : patterns) { From dd7791b7f24653cc54e43cdbae8d0c22674ad9e2 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:09:39 -0700 Subject: [PATCH 09/11] . --- .../storage/v1beta2/stub/ResourceHeaderTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index 28e74cff05..ba44707e11 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -22,8 +22,15 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.UnimplementedException; -import com.google.cloud.bigquery.storage.v1beta2.*; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1beta2.ReadSession; +import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1beta2.WriteStream; import java.util.regex.Pattern; import org.junit.After; import org.junit.AfterClass; From c58f330042ec5cb05b810c1591eac5de4e99cfa0 Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:14:47 -0700 Subject: [PATCH 10/11] format --- .../bigquery/storage/v1beta2/stub/ResourceHeaderTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index ba44707e11..06e69ab5ad 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -23,13 +23,13 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.UnimplementedException; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadSession; import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1beta2.WriteStream; import java.util.regex.Pattern; import org.junit.After; From 62f3d5c6b0c0cfb598d0b18ae010f0cdac6edafa Mon Sep 17 00:00:00 2001 From: yirutang Date: Tue, 20 Apr 2021 13:15:37 -0700 Subject: [PATCH 11/11] . --- .../cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java index 06e69ab5ad..f3471d910f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/stub/ResourceHeaderTest.java @@ -107,7 +107,6 @@ public void setUp() throws Exception { BigQueryWriteSettings.Builder writeSettingsBuilder = BigQueryWriteSettings.newBuilder() .setCredentialsProvider(NoCredentialsProvider.create()) - .setHeaderProvider(FixedHeaderProvider.create("write_stream", WRITE_STREAM_NAME)) .setTransportChannelProvider(channelProvider2); writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build()); }