Spring Cloud GCP provides an abstraction layer to publish to and subscribe from Google Cloud Pub/Sub topics and to create, list or delete Google Cloud Pub/Sub topics and subscriptions.
A Spring Boot starter is provided to auto-configure the various required Pub/Sub components.
Maven coordinates, using Spring Cloud GCP BOM:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
Gradle coordinates:
dependencies {
implementation("org.springframework.cloud:spring-cloud-gcp-starter-pubsub")
}
This starter is also available from Spring Initializr through the GCP Messaging
entry.
The Spring Boot starter for Google Cloud Pub/Sub provides the following configuration options.
This section describes options for enabling the integration, specifying the GCP project and credentials, and setting whether the APIs should connect to an emulator for local testing.
Name |
Description |
Required |
Default value |
|
Enables or disables Pub/Sub auto-configuration |
No |
|
|
GCP project ID where the Google Cloud Pub/Sub API is hosted, if different from the one in the Spring Cloud GCP Core Module |
No |
|
|
OAuth2 credentials for authenticating with the Google Cloud Pub/Sub API, if different from the ones in the Spring Cloud GCP Core Module |
No |
|
|
The host and port of the local running emulator. If provided, this will setup the client to connect against a running Google Cloud Pub/Sub Emulator. |
No |
|
|
Base64-encoded contents of OAuth2 account private key for authenticating with the Google Cloud Pub/Sub API, if different from the ones in the Spring Cloud GCP Core Module |
No |
|
|
OAuth2 scope for Spring Cloud GCP Pub/Sub credentials |
No |
This section describes configuration options to customize the behavior of the application’s Pub/Sub publishers and subscribers.
Name |
Description |
Required |
Default value |
|
The number of pull workers |
No |
1 |
|
The maximum period a message ack deadline will be extended, in seconds |
No |
0 |
|
The endpoint for synchronous pulling messages |
No |
pubsub.googleapis.com:443 |
|
Number of threads used by |
No |
4 |
|
Maximum number of outstanding elements to keep in memory before enforcing flow control. |
No |
unlimited |
|
Maximum number of outstanding bytes to keep in memory before enforcing flow control. |
No |
unlimited |
|
The behavior when the specified limits are exceeded. |
No |
Block |
|
The element count threshold to use for batching. |
No |
1 (batching off) |
|
The request byte threshold to use for batching. |
No |
1 byte (batching off) |
|
The delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. |
No |
1 ms (batching off) |
|
Enables batching. |
No |
false |
The Pub/Sub API uses the GRPC protocol to send API requests to the Pub/Sub service. This section describes configuration options for customizing the GRPC behavior.
Note
|
The properties that refer to retry control the RPC retries for transient failures during the gRPC call to Cloud Pub/Sub server.
They do not control message redelivery; only message acknowledgement deadline can be used to extend or shorten the amount of time until Pub/Sub attempts redelivery.
|
Name |
Description |
Required |
Default value |
|
Determines frequency of keepalive gRPC ping |
No |
|
|
TotalTimeout has ultimate control over how long the logic should keep trying the remote call until it gives up completely. The higher the total timeout, the more retries can be attempted. |
No |
0 |
|
InitialRetryDelay controls the delay before the first retry. Subsequent retries will use this value adjusted according to the RetryDelayMultiplier. |
No |
0 |
|
RetryDelayMultiplier controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call. |
No |
1 |
|
MaxRetryDelay puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount. |
No |
0 |
|
MaxAttempts defines the maximum number of attempts to perform. If this value is greater than 0, and the number of attempts reaches this limit, the logic will give up retrying even if the total retry time is still lower than TotalTimeout. |
No |
0 |
|
Jitter determines if the delay time should be randomized. |
No |
true |
|
InitialRpcTimeout controls the timeout for the initial RPC. Subsequent calls will use this value adjusted according to the RpcTimeoutMultiplier. |
No |
0 |
|
RpcTimeoutMultiplier controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. |
No |
1 |
|
MaxRpcTimeout puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount. |
No |
0 |
If you are using Spring Boot Actuator, you can take advantage of the Cloud Pub/Sub health indicator called pubsub
.
The health indicator will verify whether Cloud Pub/Sub is up and accessible by your application.
To enable it, all you need to do is add the Spring Boot Actuator to your project.
The pubsub
indicator will then roll up to the overall application status visible at http://localhost:8080/actuator/health (use the management.endpoint.health.show-details
property to view per-indicator details).
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Note
|
If your application already has actuator and Cloud Pub/Sub starters, this health indicator is enabled by default.
To disable the Cloud Pub/Sub indicator, set management.health.pubsub.enabled to false .
|
PubSubOperations
is an abstraction that allows Spring users to use Google Cloud Pub/Sub without depending on any Google Cloud Pub/Sub API semantics.
It provides the common set of operations needed to interact with Google Cloud Pub/Sub.
PubSubTemplate
is the default implementation of PubSubOperations
and it uses the Google Cloud Java Client for Pub/Sub to interact with Google Cloud Pub/Sub.
PubSubTemplate
provides asynchronous methods to publish messages to a Google Cloud Pub/Sub topic.
The publish()
method takes in a topic name to post the message to, a payload of a generic type and, optionally, a map with the message headers.
The topic name could either be a canonical topic name within the current project, or the fully-qualified name referring to a topic in a different project using the projects/<project_name>/topics/<topic_name>
format.
Here is an example of how to publish a message to a Google Cloud Pub/Sub topic:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
By default, the SimplePubSubMessageConverter
is used to convert payloads of type byte[]
, ByteString
, ByteBuffer
, and String
to Pub/Sub messages.
Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic.
PubSubTemplate
allows you to listen to subscriptions via the subscribe()
method.
When listening to a subscription, messages will be pulled from Google Cloud Pub/Sub asynchronously and passed to a user provided message handler.
The subscription name could either be a canonical subscription name within the current project, or the fully-qualified name referring to a subscription in a different project using the projects/<project_name>/subscriptions/<subscription_name>
format.
Subscribe to a subscription with a message handler:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
PubSubTemplate
provides the following subscribe methods:
subscribe(String subscription, Consumer<BasicAcknowledgeablePubsubMessage> messageConsumer) |
asynchronously pulls messages and passes them to |
---|---|
subscribeAndConvert(String subscription, Consumer<ConvertedBasicAcknowledgeablePubsubMessage<T>> messageConsumer, Class<T> payloadType) |
same as |
Note
|
As of version 1.2, subscribing by itself is not enough to keep an application running.
For a command-line application, you may want to provide your own ThreadPoolTaskScheduler bean named pubsubSubscriberThreadPool , which by default creates non-daemon threads that will keep an application from stopping.
This default behavior has been overridden in Spring Cloud GCP for consistency with Cloud Pub/Sub client library, and to avoid holding up command-line applications that would like to shut down once their work is done.
|
Google Cloud Pub/Sub supports synchronous pulling of messages from a subscription. This is different from subscribing to a subscription, in the sense that subscribing is an asynchronous task.
Pull up to 10 messages:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
PubsubTemplate
provides the following pull methods:
pull(String subscription, Integer maxMessages, Boolean returnImmediately) |
Pulls a number of messages from a subscription, allowing for the retry settings to be configured.
Any messages received by The If |
---|---|
pullAndAck |
Works the same as the |
pullNext |
Allows for a single message to be pulled and automatically acknowledged from a subscription. |
pullAndConvert |
Works the same as the |
There are two ways to acknowledge messages.
-
To acknowledge multiple messages at once, you can use the
PubSubTemplate.ack()
method. You can also use thePubSubTemplate.nack()
for negatively acknowledging messages. Using these methods for acknowledging messages in batches is more efficient than acknowledging messages individually, but they require the collection of messages to be from the same project. -
To acknowledge messages individually you can use the
ack()
ornack()
method on each of them (to acknowledge or negatively acknowledge, correspondingly).
Note
|
All ack() , nack() , and modifyAckDeadline() methods on messages, as well as PubSubSubscriberTemplate , are implemented asynchronously, returning a ListenableFuture<Void> to enable asynchronous processing.
|
For serialization and deserialization of POJOs using Jackson JSON, configure a PubSubMessageConverter
bean, and the Spring Boot starter for GCP Pub/Sub will automatically wire it into the PubSubTemplate
.
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
Note
|
Alternatively, you can set it directly by calling the setMessageConverter() method on the PubSubTemplate .
Other implementations of the PubSubMessageConverter can also be configured in the same manner.
|
Assuming you have the following class defined:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
You can serialize objects to JSON on publish automatically:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
And that’s how you convert messages to objects on pull:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
Please refer to our Pub/Sub JSON Payload Sample App as a reference for using this functionality.
It is also possible to acquire a reactive stream backed by a subscription.
To do so, a Project Reactor dependency (io.projectreactor:reactor-core
) must be added to the project.
The combination of the Pub/Sub starter and the Project Reactor dependencies will then make a PubSubReactiveFactory
bean available, which can then be used to get a Publisher
.
@Autowired
PubSubReactiveFactory reactiveFactory;
// ...
Flux<AcknowledgeablePubsubMessage> flux
= reactiveFactory.poll("exampleSubscription", 1000);
The Flux
then represents an infinite stream of GCP Pub/Sub messages coming in through the specified subscription.
For unlimited demand, the Pub/Sub subscription will be polled regularly, at intervals determined by pollingPeriodMs
parameter passed in when creating the Flux
.
For bounded demand, the pollingPeriodMs
parameter is unused.
Instead, as many messages as possible (up to the requested number) are delivered immediately, with the remaining messages delivered as they become available.
Any exceptions thrown by the underlying message retrieval logic will be passed as an error to the stream.
The error handling operators (Flux#retry()
, Flux#onErrorResume()
etc.) can be used to recover.
The full range of Project Reactor operations can be applied to the stream.
For example, if you only want to fetch 5 messages, you can use limitRequest
operation to turn the infinite stream into a finite one:
Flux<AcknowledgeablePubsubMessage> fiveMessageFlux = flux.limitRequest(5);
Messages flowing through the Flux
should be manually acknowledged.
flux.doOnNext(AcknowledgeablePubsubMessage::ack);
PubSubAdmin
is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources.
It allows for the creation, deletion and listing of topics and subscriptions.
Note
|
Generally when referring to topics and subscriptions, you can either use the short canonical name within the current project, or the fully-qualified name referring to a topic or subscription in a different project using the projects/<project_name>/(topics|subscriptions)/<name> format.
|
PubSubAdmin
depends on GcpProjectIdProvider
and either a CredentialsProvider
or a TopicAdminClient
and a SubscriptionAdminClient
.
If given a CredentialsProvider
, it creates a TopicAdminClient
and a SubscriptionAdminClient
with the Google Cloud Java Library for Pub/Sub default settings.
The Spring Boot starter for GCP Pub/Sub auto-configures a PubSubAdmin
object using the GcpProjectIdProvider
and the CredentialsProvider
auto-configured by the Spring Boot GCP Core starter.
PubSubAdmin
implements a method to create topics:
public Topic createTopic(String topicName)
Here is an example of how to create a Google Cloud Pub/Sub topic:
public void newTopic() {
pubSubAdmin.createTopic("topicName");
}
PubSubAdmin
implements a method to delete topics:
public void deleteTopic(String topicName)
Here is an example of how to delete a Google Cloud Pub/Sub topic:
public void deleteTopic() {
pubSubAdmin.deleteTopic("topicName");
}
PubSubAdmin
implements a method to list topics:
public List<Topic> listTopics
Here is an example of how to list every Google Cloud Pub/Sub topic name in a project:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
PubSubAdmin
implements a method to create subscriptions to existing topics:
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)
Here is an example of how to create a Google Cloud Pub/Sub subscription:
public void newSubscription() {
pubSubAdmin.createSubscription("subscriptionName", "topicName", 10, “https://my.endpoint/push”);
}
Alternative methods with default settings are provided for ease of use.
The default value for ackDeadline
is 10 seconds.
If pushEndpoint
isn’t specified, the subscription uses message pulling, instead.
public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, String pushEndpoint)
PubSubAdmin
implements a method to delete subscriptions:
public void deleteSubscription(String subscriptionName)
Here is an example of how to delete a Google Cloud Pub/Sub subscription:
public void deleteSubscription() {
pubSubAdmin.deleteSubscription("subscriptionName");
}
PubSubAdmin
implements a method to list subscriptions:
public List<Subscription> listSubscriptions()
Here is an example of how to list every subscription name in a project:
link:{project-root}/docs/src/test/java/org/springframework/cloud/gcp/autoconfigure/pubsub/it/PubSubTemplateDocumentationTests.java[role=include]
Sample applications for using the template and using a subscription-backed reactive stream are available.