Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
PubsubTemplate subscribe in other projects (#1880)
Browse files Browse the repository at this point in the history
This change to the DefaultSubscriberFactory allows the project id of the
subscription to be overwitten directly from the subscription specification in the
subscription name string.

This allows use-cases like
pubSubTemplate.subscribe("projects/other-project/subscriptions/the-subscription",
messageReceiver).

This change is applied across the board to allow  subscribing to,
creating, and deleting subscriptions using fully-qualified subscription names.

Fixes #1877.
Related to #1678.
  • Loading branch information
meltsufin authored Sep 12, 2019
1 parent 131f08e commit 703c8a0
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 43 deletions.
6 changes: 3 additions & 3 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ By default, the `SimplePubSubMessageConverter` is used to convert payloads of ty

Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic.
`PubSubTemplate` allows you to listen to subscriptions via the `subscribe()` method.
When listening to a subscription, messages will be pulled from Google Cloud Pub/Sub
asynchronously and passed to a user provided message handler.
When listening to a subscription, messages will be pulled from Google Cloud Pub/Sub asynchronously and passed to a user provided message handler.
The subscription name could either be a canonical subscription name within the current project, or the fully-qualified name referring to a subscription in a different project using the `projects/<project_name>/subscriptions/<subscription_name>` format.

===== Example
Subscribe to a subscription with a message handler:
Expand Down Expand Up @@ -193,7 +193,7 @@ flux.doOnNext(AcknowledgeablePubsubMessage::ack);
`PubSubAdmin` is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources.
It allows for the creation, deletion and listing of topics and subscriptions.

NOTE: Generally when referring to topics, you can either use the short canonical topic name within the current project, or the fully-qualified name referring to a topic in a different project using the `projects/<project_name>/topics/<topic_name>` format.
NOTE: Generally when referring to topics and subscriptions, you can either use the short canonical name within the current project, or the fully-qualified name referring to a topic or subscription in a different project using the `projects/<project_name>/(topics|subscriptions)/<name>` format.

`PubSubAdmin` depends on `GcpProjectIdProvider` and either a `CredentialsProvider` or a `TopicAdminClient` and a `SubscriptionAdminClient`.
If given a `CredentialsProvider`, it creates a `TopicAdminClient` and a `SubscriptionAdminClient` with the Google Cloud Java Library for Pub/Sub default settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.firestore.v1.Value;
import io.grpc.stub.StreamObserver;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -50,7 +49,6 @@
* @author Dmitry Solomakha
* @since 1.2
*/
@Ignore
public class FirestoreTemplateTests {

private FirestoreTemplate firestoreTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.cloud.gcp.pubsub.support.PubSubSubscriptionUtils;
import org.springframework.cloud.gcp.pubsub.support.PubSubTopicUtils;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -98,7 +98,7 @@ public PubSubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topi
* Create a new topic on Google Cloud Pub/Sub.
*
* @param topicName the name for the new topic within the current project, or the
* fully-qualified topic name in the projects/&lt;project_name&gt;/topics/&lt;topic_name&gt; format
* fully-qualified topic name in the {@code projects/<project_name>/topics/<topic_name>} format
* @return the created topic
*/
public Topic createTopic(String topicName) {
Expand All @@ -111,7 +111,7 @@ public Topic createTopic(String topicName) {
* Get the configuration of a Google Cloud Pub/Sub topic.
*
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* {@code projects/<project_name>/topics/<topic_name>} format
* @return topic configuration or {@code null} if topic doesn't exist
*/
public Topic getTopic(String topicName) {
Expand All @@ -133,7 +133,7 @@ public Topic getTopic(String topicName) {
* Delete a topic from Google Cloud Pub/Sub.
*
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic
* name in the "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* name in the {@code projects/<project_name>/topics/<topic_name>} format
*/
public void deleteTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");
Expand All @@ -158,8 +158,10 @@ public List<Topic> listTopics() {
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName) {
Expand All @@ -169,8 +171,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
* and 600 seconds. If not provided, set to default of 10 seconds
* @return the created subscription
Expand All @@ -183,8 +187,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param pushEndpoint the URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
Expand All @@ -197,9 +203,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* {@code projects/<project_name>/topics/<topic_name>} format
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
* and 600 seconds. If not provided, set to default of 10 seconds
* @param pushEndpoint the URL of the service receiving the push messages. If not
Expand All @@ -223,7 +230,7 @@ public Subscription createSubscription(String subscriptionName, String topicName
}

return this.subscriptionAdminClient.createSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId),
PubSubTopicUtils.toProjectTopicName(topicName, this.projectId),
pushConfigBuilder.build(),
finalAckDeadline);
Expand All @@ -232,15 +239,16 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Get the configuration of a Google Cloud Pub/Sub subscription.
*
* @param subscriptionName canonical subscription name, e.g., "subscriptionName"
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return subscription configuration or {@code null} if subscription doesn't exist
*/
public Subscription getSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

try {
return this.subscriptionAdminClient.getSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName));
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId));
}
catch (ApiException aex) {
if (aex.getStatusCode().getCode() == StatusCode.Code.NOT_FOUND) {
Expand All @@ -254,13 +262,14 @@ public Subscription getSubscription(String subscriptionName) {
/**
* Delete a subscription from Google Cloud Pub/Sub.
*
* @param subscriptionName canonical subscription name, e.g., "subscriptionName"
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
*/
public void deleteSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

this.subscriptionAdminClient.deleteSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName));
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param payload an object that will be serialized and sent
* @param headers the headers to publish
* @param <T> the type of the payload to publish
Expand All @@ -47,7 +48,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param payload an object that will be serialized and sent
* @param <T> the type of the payload to publish
* @return the listenable future of the call
Expand All @@ -56,7 +58,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param pubsubMessage a Google Cloud Pub/Sub API message
* @return the listenable future of the call
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public interface PubSubSubscriberOperations {
/**
* Subscribe to a subscription with a given message receiver.
*
* @deprecated as of 1.1, use {@link #subscribe(String, Consumer)} instead.
* @param messageReceiver the message receiver with which to subscribe
* @param subscription the subscription to subscribe to
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return the subscriber
* @deprecated as of 1.1, use {@link #subscribe(String, Consumer)} instead.
*/
@Deprecated
Subscriber subscribe(String subscription, MessageReceiver messageReceiver);

/**
* Add a callback method to an existing subscription.
* <p>The created {@link Subscriber} is returned so it can be stopped.
* @param subscription the name of an existing subscription
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param messageConsumer the callback method triggered when new messages arrive
* @return subscriber listening to new messages
* @since 1.1
Expand All @@ -68,7 +70,8 @@ public interface PubSubSubscriberOperations {
* Add a callback method to an existing subscription that receives Pub/Sub messages converted to the requested
* payload type.
* <p>The created {@link Subscriber} is returned so it can be stopped.
* @param subscription the name of an existing subscription
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param messageConsumer the callback method triggered when new messages arrive
* @param payloadType the type to which the payload of the Pub/Sub message should be converted
* @param <T> the type of the payload
Expand All @@ -80,7 +83,8 @@ <T> Subscriber subscribeAndConvert(String subscription,

/**
* Pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -90,7 +94,8 @@ <T> Subscriber subscribeAndConvert(String subscription,

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -101,7 +106,8 @@ <T> Subscriber subscribeAndConvert(String subscription,
/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -116,7 +122,8 @@ <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscri

/**
* Pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return a received message, or {@code null} if none exists in the subscription
*/
PubsubMessage pullNext(String subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.PubSubSubscriptionUtils;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage;
Expand Down Expand Up @@ -135,7 +136,8 @@ public Subscriber subscribe(String subscription,
this.subscriberFactory.createSubscriber(subscription,
(message, ackReplyConsumer) -> messageConsumer.accept(
new PushedAcknowledgeablePubsubMessage(
ProjectSubscriptionName.of(this.subscriberFactory.getProjectId(), subscription),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscription,
this.subscriberFactory.getProjectId()),
message,
ackReplyConsumer)));
subscriber.startAsync();
Expand All @@ -151,7 +153,8 @@ public <T> Subscriber subscribeAndConvert(String subscription,
this.subscriberFactory.createSubscriber(subscription,
(message, ackReplyConsumer) -> messageConsumer.accept(
new ConvertedPushedAcknowledgeablePubsubMessage<>(
ProjectSubscriptionName.of(this.subscriberFactory.getProjectId(), subscription),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscription,
this.subscriberFactory.getProjectId()),
message,
this.getMessageConverter().fromPubSubMessage(message, payloadType),
ackReplyConsumer)));
Expand All @@ -172,8 +175,8 @@ private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList().stream()
.map((message) -> new PulledAcknowledgeablePubsubMessage(
ProjectSubscriptionName.of(
this.subscriberFactory.getProjectId(), pullRequest.getSubscription()),
PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(),
this.subscriberFactory.getProjectId()),
message.getMessage(),
message.getAckId()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -73,7 +72,7 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

/**
* Default {@link DefaultSubscriberFactory} constructor.
* @param projectIdProvider provides the GCP project ID
* @param projectIdProvider provides the default GCP project ID for selecting the subscriptions
*/
public DefaultSubscriberFactory(GcpProjectIdProvider projectIdProvider) {
Assert.notNull(projectIdProvider, "The project ID provider can't be null.");
Expand Down Expand Up @@ -181,7 +180,7 @@ public void setSubscriberStubRetrySettings(RetrySettings subscriberStubRetrySett
@Override
public Subscriber createSubscriber(String subscriptionName, MessageReceiver receiver) {
Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(
ProjectSubscriptionName.of(this.projectId, subscriptionName), receiver);
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId), receiver);

if (this.channelProvider != null) {
subscriberBuilder.setChannelProvider(this.channelProvider);
Expand Down Expand Up @@ -225,7 +224,7 @@ public PullRequest createPullRequest(String subscriptionName, Integer maxMessage

PullRequest.Builder pullRequestBuilder =
PullRequest.newBuilder().setSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName).toString());
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId).toString());

if (maxMessages != null) {
pullRequestBuilder.setMaxMessages(maxMessages);
Expand Down
Loading

0 comments on commit 703c8a0

Please sign in to comment.