Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

PubsubTemplate subscribe in other projects #1880

Merged
merged 4 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ By default, the `SimplePubSubMessageConverter` is used to convert payloads of ty

Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic.
`PubSubTemplate` allows you to listen to subscriptions via the `subscribe()` method.
When listening to a subscription, messages will be pulled from Google Cloud Pub/Sub
asynchronously and passed to a user provided message handler.
When listening to a subscription, messages will be pulled from Google Cloud Pub/Sub asynchronously and passed to a user provided message handler.
The subscription name could either be a canonical subscription name within the current project, or the fully-qualified name referring to a subscription in a different project using the `projects/<project_name>/subscriptions/<subscription_name>` format.

===== Example
Subscribe to a subscription with a message handler:
Expand Down Expand Up @@ -193,7 +193,7 @@ flux.doOnNext(AcknowledgeablePubsubMessage::ack);
`PubSubAdmin` is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources.
It allows for the creation, deletion and listing of topics and subscriptions.

NOTE: Generally when referring to topics, you can either use the short canonical topic name within the current project, or the fully-qualified name referring to a topic in a different project using the `projects/<project_name>/topics/<topic_name>` format.
NOTE: Generally when referring to topics and subscriptions, you can either use the short canonical name within the current project, or the fully-qualified name referring to a topic or subscription in a different project using the `projects/<project_name>/(topics|subscriptions)/<name>` format.

`PubSubAdmin` depends on `GcpProjectIdProvider` and either a `CredentialsProvider` or a `TopicAdminClient` and a `SubscriptionAdminClient`.
If given a `CredentialsProvider`, it creates a `TopicAdminClient` and a `SubscriptionAdminClient` with the Google Cloud Java Library for Pub/Sub default settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.firestore.v1.Value;
import io.grpc.stub.StreamObserver;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -50,7 +49,6 @@
* @author Dmitry Solomakha
* @since 1.2
*/
@Ignore
public class FirestoreTemplateTests {

private FirestoreTemplate firestoreTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.cloud.gcp.pubsub.support.PubSubSubscriptionUtils;
import org.springframework.cloud.gcp.pubsub.support.PubSubTopicUtils;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -98,7 +98,7 @@ public PubSubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topi
* Create a new topic on Google Cloud Pub/Sub.
*
* @param topicName the name for the new topic within the current project, or the
* fully-qualified topic name in the projects/&lt;project_name&gt;/topics/&lt;topic_name&gt; format
* fully-qualified topic name in the {@code projects/<project_name>/topics/<topic_name>} format
* @return the created topic
*/
public Topic createTopic(String topicName) {
Expand All @@ -111,7 +111,7 @@ public Topic createTopic(String topicName) {
* Get the configuration of a Google Cloud Pub/Sub topic.
*
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* {@code projects/<project_name>/topics/<topic_name>} format
* @return topic configuration or {@code null} if topic doesn't exist
*/
public Topic getTopic(String topicName) {
Expand All @@ -133,7 +133,7 @@ public Topic getTopic(String topicName) {
* Delete a topic from Google Cloud Pub/Sub.
*
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic
* name in the "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* name in the {@code projects/<project_name>/topics/<topic_name>} format
*/
public void deleteTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");
Expand All @@ -158,8 +158,10 @@ public List<Topic> listTopics() {
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName) {
Expand All @@ -169,8 +171,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
* and 600 seconds. If not provided, set to default of 10 seconds
* @return the created subscription
Expand All @@ -183,8 +187,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param pushEndpoint the URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
Expand All @@ -197,9 +203,10 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
* {@code projects/<project_name>/topics/<topic_name>} format
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
* and 600 seconds. If not provided, set to default of 10 seconds
* @param pushEndpoint the URL of the service receiving the push messages. If not
Expand All @@ -223,7 +230,7 @@ public Subscription createSubscription(String subscriptionName, String topicName
}

return this.subscriptionAdminClient.createSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId),
PubSubTopicUtils.toProjectTopicName(topicName, this.projectId),
pushConfigBuilder.build(),
finalAckDeadline);
Expand All @@ -232,15 +239,16 @@ public Subscription createSubscription(String subscriptionName, String topicName
/**
* Get the configuration of a Google Cloud Pub/Sub subscription.
*
* @param subscriptionName canonical subscription name, e.g., "subscriptionName"
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return subscription configuration or {@code null} if subscription doesn't exist
*/
public Subscription getSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

try {
return this.subscriptionAdminClient.getSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName));
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId));
}
catch (ApiException aex) {
if (aex.getStatusCode().getCode() == StatusCode.Code.NOT_FOUND) {
Expand All @@ -254,13 +262,14 @@ public Subscription getSubscription(String subscriptionName) {
/**
* Delete a subscription from Google Cloud Pub/Sub.
*
* @param subscriptionName canonical subscription name, e.g., "subscriptionName"
* @param subscriptionName canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
*/
public void deleteSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

this.subscriptionAdminClient.deleteSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName));
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param payload an object that will be serialized and sent
* @param headers the headers to publish
* @param <T> the type of the payload to publish
Expand All @@ -47,7 +48,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param payload an object that will be serialized and sent
* @param <T> the type of the payload to publish
* @return the listenable future of the call
Expand All @@ -56,7 +58,8 @@ public interface PubSubPublisherOperations {

/**
* Send a message to Pub/Sub.
* @param topic the name of an existing topic
* @param topic canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
* {@code projects/<project_name>/topics/<topic_name>} format
* @param pubsubMessage a Google Cloud Pub/Sub API message
* @return the listenable future of the call
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public interface PubSubSubscriberOperations {
/**
* Subscribe to a subscription with a given message receiver.
*
* @deprecated as of 1.1, use {@link #subscribe(String, Consumer)} instead.
* @param messageReceiver the message receiver with which to subscribe
* @param subscription the subscription to subscribe to
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return the subscriber
* @deprecated as of 1.1, use {@link #subscribe(String, Consumer)} instead.
*/
@Deprecated
Subscriber subscribe(String subscription, MessageReceiver messageReceiver);

/**
* Add a callback method to an existing subscription.
* <p>The created {@link Subscriber} is returned so it can be stopped.
* @param subscription the name of an existing subscription
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param messageConsumer the callback method triggered when new messages arrive
* @return subscriber listening to new messages
* @since 1.1
Expand All @@ -68,7 +70,8 @@ public interface PubSubSubscriberOperations {
* Add a callback method to an existing subscription that receives Pub/Sub messages converted to the requested
* payload type.
* <p>The created {@link Subscriber} is returned so it can be stopped.
* @param subscription the name of an existing subscription
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param messageConsumer the callback method triggered when new messages arrive
* @param payloadType the type to which the payload of the Pub/Sub message should be converted
* @param <T> the type of the payload
Expand All @@ -80,7 +83,8 @@ <T> Subscriber subscribeAndConvert(String subscription,

/**
* Pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -90,7 +94,8 @@ <T> Subscriber subscribeAndConvert(String subscription,

/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -101,7 +106,8 @@ <T> Subscriber subscribeAndConvert(String subscription,
/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
Expand All @@ -116,7 +122,8 @@ <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscri

/**
* Pull and auto-acknowledge a message from a Google Cloud Pub/Sub subscription.
* @param subscription the subscription name
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @return a received message, or {@code null} if none exists in the subscription
*/
PubsubMessage pullNext(String subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.PubSubSubscriptionUtils;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage;
Expand Down Expand Up @@ -135,7 +136,8 @@ public Subscriber subscribe(String subscription,
this.subscriberFactory.createSubscriber(subscription,
(message, ackReplyConsumer) -> messageConsumer.accept(
new PushedAcknowledgeablePubsubMessage(
ProjectSubscriptionName.of(this.subscriberFactory.getProjectId(), subscription),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscription,
this.subscriberFactory.getProjectId()),
message,
ackReplyConsumer)));
subscriber.startAsync();
Expand All @@ -151,7 +153,8 @@ public <T> Subscriber subscribeAndConvert(String subscription,
this.subscriberFactory.createSubscriber(subscription,
(message, ackReplyConsumer) -> messageConsumer.accept(
new ConvertedPushedAcknowledgeablePubsubMessage<>(
ProjectSubscriptionName.of(this.subscriberFactory.getProjectId(), subscription),
PubSubSubscriptionUtils.toProjectSubscriptionName(subscription,
this.subscriberFactory.getProjectId()),
message,
this.getMessageConverter().fromPubSubMessage(message, payloadType),
ackReplyConsumer)));
Expand All @@ -172,8 +175,8 @@ private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList().stream()
.map((message) -> new PulledAcknowledgeablePubsubMessage(
ProjectSubscriptionName.of(
this.subscriberFactory.getProjectId(), pullRequest.getSubscription()),
PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(),
this.subscriberFactory.getProjectId()),
message.getMessage(),
message.getAckId()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -73,7 +72,7 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

/**
* Default {@link DefaultSubscriberFactory} constructor.
* @param projectIdProvider provides the GCP project ID
* @param projectIdProvider provides the default GCP project ID for selecting the subscriptions
*/
public DefaultSubscriberFactory(GcpProjectIdProvider projectIdProvider) {
Assert.notNull(projectIdProvider, "The project ID provider can't be null.");
Expand Down Expand Up @@ -181,7 +180,7 @@ public void setSubscriberStubRetrySettings(RetrySettings subscriberStubRetrySett
@Override
public Subscriber createSubscriber(String subscriptionName, MessageReceiver receiver) {
Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(
ProjectSubscriptionName.of(this.projectId, subscriptionName), receiver);
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId), receiver);

if (this.channelProvider != null) {
subscriberBuilder.setChannelProvider(this.channelProvider);
Expand Down Expand Up @@ -225,7 +224,7 @@ public PullRequest createPullRequest(String subscriptionName, Integer maxMessage

PullRequest.Builder pullRequestBuilder =
PullRequest.newBuilder().setSubscription(
ProjectSubscriptionName.of(this.projectId, subscriptionName).toString());
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId).toString());

if (maxMessages != null) {
pullRequestBuilder.setMaxMessages(maxMessages);
Expand Down
Loading