From a08d3fdf2b28eb49d4003bbfe77b4b730be2812a Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 25 May 2017 15:34:11 +1000 Subject: [PATCH] pubsub: add Publisher.Builder.setCredentialsProvider (#2087) --- .../google/cloud/pubsub/spi/v1/Publisher.java | 29 +++++++++++++++++-- .../pubsub/spi/v1/PublisherImplTest.java | 23 +++++++++++---- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index f776634ee432..8d36d413fb84 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -22,10 +22,12 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -35,10 +37,13 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; +import io.grpc.CallCredentials; import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.auth.MoreCallCredentials; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -53,6 +58,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.threeten.bp.Duration; /** @@ -95,6 +101,7 @@ public class Publisher { private final FlowController flowController; private final ManagedChannel[] channels; private final AtomicRoundRobin channelIndex; + @Nullable private final CallCredentials callCredentials; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; @@ -155,6 +162,10 @@ public void close() { }); } channelIndex = new AtomicRoundRobin(channels.length); + + Credentials credentials = builder.credentialsProvider.getCredentials(); + callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials); + shutdown = new AtomicBoolean(false); messagesWaiter = new MessageWaiter(); } @@ -328,10 +339,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1)); rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis()); - Futures.addCallback( + PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel]) - .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) - .publish(publishRequest.build()), + .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); + if (callCredentials != null) { + stub = stub.withCallCredentials(callCredentials); + } + + Futures.addCallback( + stub.publish(publishRequest.build()), new FutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -582,6 +598,8 @@ public long nextLong(long least, long bound) { ChannelProvider channelProvider = TopicAdminSettings.defaultChannelProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; + CredentialsProvider credentialsProvider = + TopicAdminSettings.defaultCredentialsProviderBuilder().build(); private Builder(TopicName topic) { this.topicName = Preconditions.checkNotNull(topic); @@ -600,6 +618,11 @@ public Builder setChannelProvider(ChannelProvider channelProvider) { return this; } + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider); + return this; + } + // Batching options public Builder setBatchingSettings(BatchingSettings batchingSettings) { Preconditions.checkNotNull(batchingSettings); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index 5d101f01fbab..56ea6b81a120 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -16,14 +16,18 @@ package com.google.cloud.pubsub.spi.v1; +import static org.junit.Assert.*; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; +import com.google.auth.Credentials; import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PublishResponse; @@ -35,6 +39,8 @@ import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.ServerImpl; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,11 +48,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; - -import static org.junit.Assert.*; - @RunWith(JUnit4.class) public class PublisherImplTest { @@ -78,6 +79,16 @@ public ManagedChannel getChannel(Executor executor) { } }; + // Gax declares a similar type, which can be used after gax is upgraded. + @Deprecated + private static final CredentialsProvider NO_CREDENTIALS_PROVIDER = + new CredentialsProvider() { + @Override + public Credentials getCredentials() { + return null; + } + }; + private FakeScheduledExecutorService fakeExecutor; private FakePublisherServiceImpl testPublisherServiceImpl; @@ -427,6 +438,7 @@ public void testPublisherGetters() throws Exception { .setDelayThreshold(Duration.ofMillis(11)) .setElementCountThreshold(12L) .build()); + builder.setCredentialsProvider(NO_CREDENTIALS_PROVIDER); builder.setFlowControlSettings( FlowControlSettings.newBuilder() .setMaxOutstandingRequestBytes(13) @@ -661,6 +673,7 @@ private Builder getTestPublisherBuilder() { return Publisher.defaultBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider(TEST_CHANNEL_PROVIDER) + .setCredentialsProvider(NO_CREDENTIALS_PROVIDER) .setLongRandom( new Publisher.LongRandom() { @Override