Skip to content

Commit

Permalink
Code review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoandremartins committed Jul 13, 2017
1 parent 431123d commit a06cd45
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<module>pubsub/cloud-client</module>
<module>spanner/cloud-client</module>
<module>speech/cloud-client</module>
<module>spring/integration/pubsub</module>
<module>spring/pubsub</module>
<module>storage/cloud-client</module>
<module>storage/json-api</module>
<module>storage/storage-transfer</module>
Expand Down
11 changes: 0 additions & 11 deletions spring/integration/pubsub/src/main/resources/static/js/pubsub.js

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>doc-samples</artifactId>
<groupId>com.google.cloud</groupId>
<version>1.0.0</version>
<relativePath>../../..</relativePath>
<relativePath>../..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package com.example.spring.pubsub;
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
package com.example.spring.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.PubsubAdmin;
import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate;
import org.springframework.cloud.gcp.pubsub.support.GcpHeaders;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
Expand All @@ -30,68 +36,39 @@
import org.springframework.integration.gcp.outbound.PubsubMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
@RestController
public class PubsubApplication {

private static final Log LOGGER = LogFactory.getLog(PubsubApplication.class);

@Autowired
private PubsubOutboundGateway messagingGateway;

@Autowired
private PubsubAdmin admin;

public static void main(String[] args) throws IOException {
SpringApplication.run(PubsubApplication.class, args);
}

@GetMapping("/listTopics")
public List<String> listTopics() {
return admin.listTopics().stream()
.map(Topic::getNameAsTopicName)
.map(TopicName::getTopic)
.collect(Collectors.toList());
}

@GetMapping("/listSubscriptions")
public List<String> listSubscriptions() {
return admin.listSubscriptions().stream()
.map(Subscription::getNameAsSubscriptionName)
.map(SubscriptionName::getSubscription)
.collect(Collectors.toList());
}

@PostMapping("/postMessage")
public RedirectView addMessage(@RequestParam("message") String message) {
messagingGateway.sendToPubsub(message);
return new RedirectView("/");
}

@PostMapping("/newTopic")
public RedirectView newTopic(@RequestParam("name") String topicName) {
admin.createTopic(topicName);
return new RedirectView("/");
}

@PostMapping("/newSubscription")
public RedirectView newSubscription(@RequestParam("name") String subscriptionName,
@RequestParam("topic") String topicName) {
admin.createSubscription(subscriptionName, topicName);
return new RedirectView("/");
}

/**
* Spring channel for incoming messages from Google Cloud Pub/Sub.
*
* <p>We use a {@link PublishSubscribeChannel} which broadcasts messages to every subscriber. In
* this case, every service activator.
*/
@Bean
public MessageChannel pubsubInputChannel() {
return new PublishSubscribeChannel();
}

/**
* Inbound channel adapter that gets activated whenever a new message arrives at a Google Cloud
* Pub/Sub subscription.
*
* <p>Messages get posted to the specified input channel, which activates the service activators
* below.
*
* @param inputChannel Spring channel that receives messages and triggers attached service
* activators
* @param subscriberFactory creates the subscriber that listens to messages from Google Cloud
* Pub/Sub
* @return the inbound channel adapter for a Google Cloud Pub/Sub subscription
*/
@Bean
public PubsubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
Expand All @@ -104,30 +81,46 @@ public PubsubInboundChannelAdapter messageChannelAdapter(
return adapter;
}

/**
* Message handler that gets triggered whenever a new message arrives at the attached Spring
* channel.
*
* <p>Just logs the received message. Message acknowledgement mode set to manual above, so the
* consumer that allows us to (n)ack is extracted from the message headers and used to ack.
*/
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver1() {
return message -> {
LOGGER.info("Message arrived! Payload: "
+ ((ByteString) message.getPayload()).toStringUtf8());
AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(
GcpHeaders.ACKNOWLEDGEMENT);
AckReplyConsumer consumer =
(AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}

/**
* Second message handler that also gets messages from the same subscription as above.
*/
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver2() {
return message -> {
LOGGER.info("Message also arrived here! Payload: "
+ ((ByteString) message.getPayload()).toStringUtf8());
AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(
GcpHeaders.ACKNOWLEDGEMENT);
AckReplyConsumer consumer =
(AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}

/**
* The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub
* topic.
*
* @param pubsubTemplate Spring abstraction to send messages to Google Cloud Pub/Sub topics
*/
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubsubTemplate pubsubTemplate) {
Expand All @@ -136,6 +129,9 @@ public MessageHandler messageSender(PubsubTemplate pubsubTemplate) {
return outboundAdapter;
}

/**
* A Spring mechanism to write messages to a channel.
*/
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.spring.pubsub;

import com.example.spring.pubsub.PubsubApplication.PubsubOutboundGateway;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gcp.pubsub.PubsubAdmin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;

@RestController
public class WebAppController {

@Autowired
private PubsubOutboundGateway messagingGateway;

@Autowired private PubsubAdmin admin;

/**
* Lists every topic in the project.
*
* @return a list of the names of every topic in the project
*/
@GetMapping("/listTopics")
public List<String> listTopics() {
return admin
.listTopics()
.stream()
.map(Topic::getNameAsTopicName)
.map(TopicName::getTopic)
.collect(Collectors.toList());
}

/**
* Lists every subscription in the project.
*
* @return a list of the names of every subscription in the project
*/
@GetMapping("/listSubscriptions")
public List<String> listSubscriptions() {
return admin
.listSubscriptions()
.stream()
.map(Subscription::getNameAsSubscriptionName)
.map(SubscriptionName::getSubscription)
.collect(Collectors.toList());
}

/**
* Posts a message to a Google Cloud Pub/Sub topic, through Spring's messaging gateway, and
* redirects the user to the home page.
*
* @param message the message posted to the Pub/Sub topic
*/
@PostMapping("/postMessage")
public RedirectView addMessage(@RequestParam("message") String message) {
messagingGateway.sendToPubsub(message);
return new RedirectView("/");
}

/**
* Creates a new topic on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and
* redirects the user to the home page.
*
* @param topicName the name of the new topic
*/
@PostMapping("/newTopic")
public RedirectView newTopic(@RequestParam("name") String topicName) {
admin.createTopic(topicName);
return new RedirectView("/");
}

/**
* Creates a new subscription on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and
* redirects the user to the home page.
*
* @param topicName the name of the new subscription
*/
@PostMapping("/newSubscription")
public RedirectView newSubscription(
@RequestParam("name") String subscriptionName, @RequestParam("topic") String topicName) {
admin.createSubscription(subscriptionName, topicName);
return new RedirectView("/");
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
<!DOCTYPE html>
<!--
~ Copyright 2017 original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<html lang="en">
<head>
<meta charset="UTF-8">
Expand Down
27 changes: 27 additions & 0 deletions spring/pubsub/src/main/resources/static/js/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

angular.module('pubsub', [])
.controller('listTopics', function($scope, $http) {
$http.get('/listTopics').success(function(data) {
$scope.topics = data;
});
})
.controller('listSubscriptions', function($scope, $http) {
$http.get('/listSubscriptions').success(function(data) {
$scope.subscriptions = data;
})
});
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit a06cd45

Please sign in to comment.