Skip to content

Commit

Permalink
Make PubSub and PubSubRpc extends AutoCloseable, fix grpc settings
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed May 11, 2016
1 parent 215a5df commit 0093ac3
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* @see <a href="https://cloud.google.com/pubsub/">Google Cloud Pub/Sub</a>
*/
public interface PubSub extends Service<PubSubOptions> {
public interface PubSub extends AutoCloseable, Service<PubSubOptions> {

/**
* Class for specifying options for listing topics and subscriptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,9 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
Iterable<String> ackIds) {
return null;
}

@Override
public void close() throws Exception {
rpc.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiException;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.AuthCredentials;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSubException;
import com.google.cloud.pubsub.PubSubOptions;
Expand All @@ -27,8 +29,10 @@
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
Expand All @@ -50,48 +54,78 @@
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;

import autovalue.shaded.com.google.common.common.collect.Sets;
import io.grpc.Status.Code;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

public class DefaultPubSubRpc implements PubSubRpc {

private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;
private final ScheduledExecutorService executor =
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(8));

public DefaultPubSubRpc(PubSubOptions options) throws IOException {
try {
// Provide (and use a common thread-pool).
// This depends on https://github.com/googleapis/gax-java/issues/73
PublisherSettings.Builder pbuilder =
PublisherSettings.defaultBuilder()
.provideChannelWith(options.authCredentials().credentials())
.applyToAllApiMethods(apiCallSettings(options));
publisherApi = PublisherApi.create(pbuilder.build());
SubscriberSettings.Builder sBuilder =
SubscriberSettings.defaultBuilder()
.provideChannelWith(options.authCredentials().credentials())
.applyToAllApiMethods(apiCallSettings(options));
subscriberApi = SubscriberApi.create(sBuilder.build());
PublisherSettings.Builder pubBuilder =
PublisherSettings.defaultBuilder().provideExecutorWith(executor, false);
SubscriberSettings.Builder subBuilder =
SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false);
// todo(mziccard): PublisherSettings should support null/absent credentials for testing
if (options.host().contains("localhost")
|| options.authCredentials().equals(AuthCredentials.noAuth())) {
ManagedChannel channel = NettyChannelBuilder.forTarget(options.host())
.negotiationType(NegotiationType.PLAINTEXT)
.build();
pubBuilder.provideChannelWith(channel, true);
subBuilder.provideChannelWith(channel, true);
} else {
GoogleCredentials credentials = options.authCredentials().credentials();
pubBuilder.provideChannelWith(
credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES));
subBuilder.provideChannelWith(
credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES));
}
pubBuilder.applyToAllApiMethods(apiCallSettings(options));
subBuilder.applyToAllApiMethods(apiCallSettings(options));
publisherApi = PublisherApi.create(pubBuilder.build());
subscriberApi = SubscriberApi.create(subBuilder.build());
} catch (Exception ex) {
throw new IOException(ex);
}
}

private static long translateTimeout(long timeout) {
if (timeout < 0) {
return 20000;
} else if (timeout == 0) {
return Long.MAX_VALUE;
}
return timeout;
}

private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
// TODO: specify timeout these settings:
// retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
RetryParams retryParams = options.retryParams();
long connectTimeout = translateTimeout(options.connectTimeout());
long readTimeout = translateTimeout(options.readTimeout());
long maxTimeout = connectTimeout == Long.MAX_VALUE || readTimeout == Long.MAX_VALUE
? Long.MAX_VALUE : connectTimeout + readTimeout;
final RetrySettings.Builder builder = RetrySettings.newBuilder()
.setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis()))
.setInitialRpcTimeout(Duration.millis(options.connectTimeout()))
.setInitialRpcTimeout(Duration.millis(connectTimeout))
.setRpcTimeoutMultiplier(1.5)
.setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout()))
.setMaxRpcTimeout(Duration.millis(maxTimeout))
.setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis()))
.setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor())
.setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis()));
Expand All @@ -117,7 +151,7 @@ public V apply(ApiException exception) {

@Override
public Future<Topic> create(Topic topic) {
// TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
// TODO: it would be nice if we can get the idempotent information from the ApiCallSettings
// or from the exception
return translate(publisherApi.createTopicCallable().futureCall(topic), true);
}
Expand Down Expand Up @@ -149,7 +183,6 @@ public Future<ListTopicSubscriptionsResponse> list(ListTopicSubscriptionsRequest

@Override
public Future<Empty> delete(DeleteTopicRequest request) {
// TODO: check if null is not going to work for Empty
return translate(publisherApi.deleteTopicCallable().futureCall(request), true,
Code.NOT_FOUND.value());
}
Expand Down Expand Up @@ -195,4 +228,16 @@ public Future<PullResponse> pull(PullRequest request) {
public Future<Empty> modify(ModifyPushConfigRequest request) {
return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false);
}

@Override
public ScheduledExecutorService executor() {
return executor;
}

@Override
public void close() throws Exception {
subscriberApi.close();
publisherApi.close();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import com.google.pubsub.v1.Topic;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

public interface PubSubRpc {
public interface PubSubRpc extends AutoCloseable {

// in all cases root cause of ExecutionException is PubSubException
Future<Topic> create(Topic topic);
Expand Down Expand Up @@ -69,4 +70,6 @@ public interface PubSubRpc {
Future<PullResponse> pull(PullRequest request);

Future<Empty> modify(ModifyPushConfigRequest request);

ScheduledExecutorService executor();
}

0 comments on commit 0093ac3

Please sign in to comment.