We use a {@link PublishSubscribeChannel} which broadcasts messages to every subscriber. In + * this case, every service activator. + */ @Bean public MessageChannel pubsubInputChannel() { return new PublishSubscribeChannel(); } + /** + * Inbound channel adapter that gets activated whenever a new message arrives at a Google Cloud + * Pub/Sub subscription. + * + *
Messages get posted to the specified input channel, which activates the service activators + * below. + * + * @param inputChannel Spring channel that receives messages and triggers attached service + * activators + * @param subscriberFactory creates the subscriber that listens to messages from Google Cloud + * Pub/Sub + * @return the inbound channel adapter for a Google Cloud Pub/Sub subscription + */ @Bean public PubsubInboundChannelAdapter messageChannelAdapter( @Qualifier("pubsubInputChannel") MessageChannel inputChannel, @@ -104,30 +81,46 @@ public PubsubInboundChannelAdapter messageChannelAdapter( return adapter; } + /** + * Message handler that gets triggered whenever a new message arrives at the attached Spring + * channel. + * + *
Just logs the received message. Message acknowledgement mode set to manual above, so the
+ * consumer that allows us to (n)ack is extracted from the message headers and used to ack.
+ */
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver1() {
return message -> {
LOGGER.info("Message arrived! Payload: "
+ ((ByteString) message.getPayload()).toStringUtf8());
- AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(
- GcpHeaders.ACKNOWLEDGEMENT);
+ AckReplyConsumer consumer =
+ (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}
+ /**
+ * Second message handler that also gets messages from the same subscription as above.
+ */
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver2() {
return message -> {
LOGGER.info("Message also arrived here! Payload: "
+ ((ByteString) message.getPayload()).toStringUtf8());
- AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(
- GcpHeaders.ACKNOWLEDGEMENT);
+ AckReplyConsumer consumer =
+ (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}
+ /**
+ * The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub
+ * topic.
+ *
+ * @param pubsubTemplate Spring abstraction to send messages to Google Cloud Pub/Sub topics
+ */
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubsubTemplate pubsubTemplate) {
@@ -136,6 +129,9 @@ public MessageHandler messageSender(PubsubTemplate pubsubTemplate) {
return outboundAdapter;
}
+ /**
+ * A Spring mechanism to write messages to a channel.
+ */
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway {
diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java
new file mode 100644
index 00000000000..9c1283c40e6
--- /dev/null
+++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.spring.pubsub;
+
+import com.example.spring.pubsub.PubsubApplication.PubsubOutboundGateway;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.Topic;
+import com.google.pubsub.v1.TopicName;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.gcp.pubsub.PubsubAdmin;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.view.RedirectView;
+
+@RestController
+public class WebAppController {
+
+ @Autowired
+ private PubsubOutboundGateway messagingGateway;
+
+ @Autowired private PubsubAdmin admin;
+
+ /**
+ * Lists every topic in the project.
+ *
+ * @return a list of the names of every topic in the project
+ */
+ @GetMapping("/listTopics")
+ public List