Skip to content

Commit

Permalink
pubsub: add Publisher.Builder.setCredentialsProvider (#2087)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored May 25, 2017
1 parent 31f044b commit a08d3fd
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -35,10 +37,13 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -53,6 +58,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -95,6 +101,7 @@ public class Publisher {
private final FlowController flowController;
private final ManagedChannel[] channels;
private final AtomicRoundRobin channelIndex;
@Nullable private final CallCredentials callCredentials;

private final ScheduledExecutorService executor;
private final AtomicBoolean shutdown;
Expand Down Expand Up @@ -155,6 +162,10 @@ public void close() {
});
}
channelIndex = new AtomicRoundRobin(channels.length);

Credentials credentials = builder.credentialsProvider.getCredentials();
callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials);

shutdown = new AtomicBoolean(false);
messagesWaiter = new MessageWaiter();
}
Expand Down Expand Up @@ -328,10 +339,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1));
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis());

Futures.addCallback(
PublisherFutureStub stub =
PublisherGrpc.newFutureStub(channels[currentChannel])
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
.publish(publishRequest.build()),
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
}

Futures.addCallback(
stub.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
Expand Down Expand Up @@ -582,6 +598,8 @@ public long nextLong(long least, long bound) {

ChannelProvider channelProvider = TopicAdminSettings.defaultChannelProviderBuilder().build();
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
Expand All @@ -600,6 +618,11 @@ public Builder setChannelProvider(ChannelProvider channelProvider) {
return this;
}

public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
return this;
}

// Batching options
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
Preconditions.checkNotNull(batchingSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package com.google.cloud.pubsub.spi.v1;

import static org.junit.Assert.*;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.spi.v1.Publisher.Builder;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PublishResponse;
Expand All @@ -35,18 +39,15 @@
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ServerImpl;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static org.junit.Assert.*;

@RunWith(JUnit4.class)
public class PublisherImplTest {

Expand Down Expand Up @@ -78,6 +79,16 @@ public ManagedChannel getChannel(Executor executor) {
}
};

// Gax declares a similar type, which can be used after gax is upgraded.
@Deprecated
private static final CredentialsProvider NO_CREDENTIALS_PROVIDER =
new CredentialsProvider() {
@Override
public Credentials getCredentials() {
return null;
}
};

private FakeScheduledExecutorService fakeExecutor;

private FakePublisherServiceImpl testPublisherServiceImpl;
Expand Down Expand Up @@ -427,6 +438,7 @@ public void testPublisherGetters() throws Exception {
.setDelayThreshold(Duration.ofMillis(11))
.setElementCountThreshold(12L)
.build());
builder.setCredentialsProvider(NO_CREDENTIALS_PROVIDER);
builder.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(13)
Expand Down Expand Up @@ -661,6 +673,7 @@ private Builder getTestPublisherBuilder() {
return Publisher.defaultBuilder(TEST_TOPIC)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelProvider(TEST_CHANNEL_PROVIDER)
.setCredentialsProvider(NO_CREDENTIALS_PROVIDER)
.setLongRandom(
new Publisher.LongRandom() {
@Override
Expand Down

0 comments on commit a08d3fd

Please sign in to comment.