Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow StreamWriter settings to override passed in BQ client setting #2001

Merged
merged 8 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,37 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
return builder.client.getSettings();
settingsBuilder = builder.client.getSettings().toBuilder();
} else {
return BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
.build();
settingsBuilder =
new BigQueryWriteSettings.Builder()
.setTransportChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setChannelsPerCpu(1)
.build())
.setCredentialsProvider(
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build())
.setBackgroundExecutorProvider(
BigQueryWriteSettings.defaultExecutorProviderBuilder().build())
.setEndpoint(BigQueryWriteSettings.getDefaultEndpoint());
}
if (builder.channelProvider != null) {
settingsBuilder.setTransportChannelProvider(builder.channelProvider);
}
if (builder.credentialsProvider != null) {
settingsBuilder.setCredentialsProvider(builder.credentialsProvider);
}
if (builder.executorProvider != null) {
settingsBuilder.setBackgroundExecutorProvider(builder.executorProvider);
}
if (builder.endpoint != null) {
GaoleMeng marked this conversation as resolved.
Show resolved Hide resolved
settingsBuilder.setEndpoint(builder.endpoint);
}

return settingsBuilder.build();
}

// Validate whether the fetched connection pool matched certain properties.
Expand Down Expand Up @@ -518,16 +538,13 @@ public static final class Builder {

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;

private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
private String endpoint = null;

private TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
private TransportChannelProvider channelProvider = null;

private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
private CredentialsProvider credentialsProvider = null;

private ExecutorProvider executorProvider =
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
private ExecutorProvider executorProvider = null;

private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;
Expand Down Expand Up @@ -609,7 +626,8 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {

/** {@code ExecutorProvider} to use to create Executor to run background jobs. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
this.executorProvider =
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
Expand Down Expand Up @@ -1318,4 +1321,79 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isUserClosed());
}

@Test(timeout = 10000)
public void testBuilderDefaultSetting() throws Exception {
StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(TEST_STREAM_1);
BigQueryWriteSettings writeSettings = StreamWriter.getBigQueryWriteSettings(writerBuilder);
assertEquals(
BigQueryWriteSettings.defaultExecutorProviderBuilder().build().toString(),
writeSettings.getBackgroundExecutorProvider().toString());
assertEquals(
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build().toString(),
writeSettings.getCredentialsProvider().toString());
assertTrue(
writeSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
assertEquals(
BigQueryWriteSettings.getDefaultEndpoint(), writeSettings.getEndpoint().toString());
}

@Test(timeout = 10000)
public void testBuilderExplicitSetting() throws Exception {
// Client has special seetings.
BigQueryWriteSettings clientSettings =
BigQueryWriteSettings.newBuilder()
.setEndpoint("xxx:345")
.setBackgroundExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
BigQueryWriteClient client = BigQueryWriteClient.create(clientSettings);
StreamWriter.Builder writerWithClient = StreamWriter.newBuilder(TEST_STREAM_1, client);
BigQueryWriteSettings writerSettings = StreamWriter.getBigQueryWriteSettings(writerWithClient);
assertEquals("xxx:345", writerSettings.getEndpoint());
assertTrue(
writerSettings.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
assertEquals(
4,
((InstantiatingExecutorProvider) writerSettings.getBackgroundExecutorProvider())
.getExecutorThreadCount());

// Explicit setting on StreamWriter is respected.
StreamWriter.Builder writerWithClientWithOverrides =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setEndpoint("yyy:345")
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build())
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setKeepAliveTimeout(Duration.ofSeconds(500))
.build())
.setCredentialsProvider(
BigQueryWriteSettings.defaultCredentialsProviderBuilder()
.setScopesToApply(Arrays.asList("A", "B"))
.build());
BigQueryWriteSettings writerSettings2 =
StreamWriter.getBigQueryWriteSettings(writerWithClientWithOverrides);
assertEquals("yyy:345", writerSettings2.getEndpoint());
assertTrue(
writerSettings2.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
assertEquals(
14,
((InstantiatingExecutorProvider) writerSettings2.getBackgroundExecutorProvider())
.getExecutorThreadCount());
assertTrue(
writerSettings2.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
assertEquals(
Duration.ofSeconds(500),
((InstantiatingGrpcChannelProvider) writerSettings2.getTransportChannelProvider())
.getKeepAliveTimeout());
assertTrue(writerSettings2.getCredentialsProvider() instanceof GoogleCredentialsProvider);
assertEquals(
2,
((GoogleCredentialsProvider) writerSettings2.getCredentialsProvider())
.getScopesToApply()
.size());
}
}