diff --git a/README.md b/README.md index ee2d3069f1..08735dc791 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.8.0') +implementation platform('com.google.cloud:libraries-bom:26.9.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.1" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ff965f0477..ffc1290a78 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -308,17 +308,37 @@ static boolean isDefaultStream(String streamName) { return streamMatcher.find(); } - private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { + static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { + BigQueryWriteSettings.Builder settingsBuilder = null; if (builder.client != null) { - return builder.client.getSettings(); + settingsBuilder = builder.client.getSettings().toBuilder(); } else { - return BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setBackgroundExecutorProvider(builder.executorProvider) - .setEndpoint(builder.endpoint) - .build(); + settingsBuilder = + new BigQueryWriteSettings.Builder() + .setTransportChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setChannelsPerCpu(1) + .build()) + .setCredentialsProvider( + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build()) + .setBackgroundExecutorProvider( + BigQueryWriteSettings.defaultExecutorProviderBuilder().build()) + .setEndpoint(BigQueryWriteSettings.getDefaultEndpoint()); } + if (builder.channelProvider != null) { + settingsBuilder.setTransportChannelProvider(builder.channelProvider); + } + if (builder.credentialsProvider != null) { + settingsBuilder.setCredentialsProvider(builder.credentialsProvider); + } + if (builder.executorProvider != null) { + settingsBuilder.setBackgroundExecutorProvider(builder.executorProvider); + } + if (builder.endpoint != null) { + settingsBuilder.setEndpoint(builder.endpoint); + } + + return settingsBuilder.build(); } // Validate whether the fetched connection pool matched certain properties. @@ -542,16 +562,13 @@ public static final class Builder { private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES; - private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + private String endpoint = null; - private TransportChannelProvider channelProvider = - BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); + private TransportChannelProvider channelProvider = null; - private CredentialsProvider credentialsProvider = - BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private CredentialsProvider credentialsProvider = null; - private ExecutorProvider executorProvider = - BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); + private ExecutorProvider executorProvider = null; private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; @@ -633,7 +650,8 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { /** {@code ExecutorProvider} to use to create Executor to run background jobs. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { - this.executorProvider = executorProvider; + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 43c5fd2bea..af36273102 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -25,7 +25,10 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.rpc.AbortedException; @@ -1366,4 +1369,79 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isUserClosed()); } + + @Test(timeout = 10000) + public void testBuilderDefaultSetting() throws Exception { + StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(TEST_STREAM_1); + BigQueryWriteSettings writeSettings = StreamWriter.getBigQueryWriteSettings(writerBuilder); + assertEquals( + BigQueryWriteSettings.defaultExecutorProviderBuilder().build().toString(), + writeSettings.getBackgroundExecutorProvider().toString()); + assertEquals( + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build().toString(), + writeSettings.getCredentialsProvider().toString()); + assertTrue( + writeSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider); + assertEquals( + BigQueryWriteSettings.getDefaultEndpoint(), writeSettings.getEndpoint().toString()); + } + + @Test(timeout = 10000) + public void testBuilderExplicitSetting() throws Exception { + // Client has special seetings. + BigQueryWriteSettings clientSettings = + BigQueryWriteSettings.newBuilder() + .setEndpoint("xxx:345") + .setBackgroundExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + BigQueryWriteClient client = BigQueryWriteClient.create(clientSettings); + StreamWriter.Builder writerWithClient = StreamWriter.newBuilder(TEST_STREAM_1, client); + BigQueryWriteSettings writerSettings = StreamWriter.getBigQueryWriteSettings(writerWithClient); + assertEquals("xxx:345", writerSettings.getEndpoint()); + assertTrue( + writerSettings.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); + assertEquals( + 4, + ((InstantiatingExecutorProvider) writerSettings.getBackgroundExecutorProvider()) + .getExecutorThreadCount()); + + // Explicit setting on StreamWriter is respected. + StreamWriter.Builder writerWithClientWithOverrides = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setEndpoint("yyy:345") + .setExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build()) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTimeout(Duration.ofSeconds(500)) + .build()) + .setCredentialsProvider( + BigQueryWriteSettings.defaultCredentialsProviderBuilder() + .setScopesToApply(Arrays.asList("A", "B")) + .build()); + BigQueryWriteSettings writerSettings2 = + StreamWriter.getBigQueryWriteSettings(writerWithClientWithOverrides); + assertEquals("yyy:345", writerSettings2.getEndpoint()); + assertTrue( + writerSettings2.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); + assertEquals( + 14, + ((InstantiatingExecutorProvider) writerSettings2.getBackgroundExecutorProvider()) + .getExecutorThreadCount()); + assertTrue( + writerSettings2.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider); + assertEquals( + Duration.ofSeconds(500), + ((InstantiatingGrpcChannelProvider) writerSettings2.getTransportChannelProvider()) + .getKeepAliveTimeout()); + assertTrue(writerSettings2.getCredentialsProvider() instanceof GoogleCredentialsProvider); + assertEquals( + 2, + ((GoogleCredentialsProvider) writerSettings2.getCredentialsProvider()) + .getScopesToApply() + .size()); + } }