From 375e867ea5822a0c4d3abd1cfaa45d24cfa38b4a Mon Sep 17 00:00:00 2001 From: MregXN <46479059+MregXN@users.noreply.github.com> Date: Mon, 22 May 2023 22:55:37 +0800 Subject: [PATCH] Add PubSub subscriber examples over gPRC (#833) * add grpc subscriber Signed-off-by: MregXN * modify README.md Signed-off-by: MregXN * modify README.md in examples Signed-off-by: MregXN * Modify DaprApplication to support examples where protocol is not specified. Signed-off-by: MregXN * modify formatter to pass checkstyle Signed-off-by: MregXN * Update springboot to latest minor.patch version. (#826) Signed-off-by: MregXN * Use runtime 1.10.0-rc.X and CLI 1.10.0-rc.X (#827) Signed-off-by: Artur Souza Signed-off-by: MregXN * Upgrade the version to 1.9.0-SNAPSHOT (#829) Signed-off-by: Artur Souza Signed-off-by: MregXN * Generate updated javadocs for 1.8.0 (#836) Signed-off-by: Artur Souza Signed-off-by: MregXN * Update Dapr runtime and CLI to 1.10. (#837) Signed-off-by: Artur Souza Signed-off-by: MregXN * Inject autoconfiguration in the Spring Boot 3 style (#831) * Bump from spring boot 2.3.5.RELEASE to 2.7.8 Signed-off-by: Sergio (cherry picked from commit 9152c91bc1f08ecf2dd3bccf8159fd5d0500e351) * Ensure old versions of spring boot are still compatible Signed-off-by: Sergio --------- Signed-off-by: champel Signed-off-by: Sergio Signed-off-by: MregXN * Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830) * Bump from reactor 2.3.5.RELEASE to 2.7.8 Signed-off-by: Sergio * Simplification Signed-off-by: Sergio --------- Signed-off-by: Sergio Signed-off-by: MregXN * rerun checks Signed-off-by: MregXN * modify the way of grpc server starts Signed-off-by: MregXN * modify README Signed-off-by: MregXN * Update pom.xml Signed-off-by: MregXN <46479059+MregXN@users.noreply.github.com> --------- Signed-off-by: MregXN Signed-off-by: Artur Souza Signed-off-by: champel Signed-off-by: Sergio Signed-off-by: MregXN <46479059+MregXN@users.noreply.github.com> Co-authored-by: Artur Souza Co-authored-by: champel Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com> --- README.md | 2 +- .../io/dapr/examples/DaprApplication.java | 29 ++++++- .../java/io/dapr/examples/pubsub/README.md | 71 +++++++++++++++-- .../dapr/examples/pubsub/grpc/Subscriber.java | 60 +++++++++++++++ .../pubsub/grpc/SubscriberGrpcService.java | 77 +++++++++++++++++++ .../pubsub/{ => http}/Subscriber.java | 6 +- .../{ => http}/SubscriberController.java | 2 +- 7 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java rename examples/src/main/java/io/dapr/examples/pubsub/{ => http}/Subscriber.java (91%) rename examples/src/main/java/io/dapr/examples/pubsub/{ => http}/SubscriberController.java (96%) diff --git a/README.md b/README.md index c4dbd7a52..9c0dfb5ec 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Try the following examples to learn more about Dapr's Java SDK: * [Invoking a Http service](./examples/src/main/java/io/dapr/examples/invoke/http) * [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc) * [State management](./examples/src/main/java/io/dapr/examples/state) -* [PubSub with subscriber over Http](./examples/src/main/java/io/dapr/examples/pubsub/http) +* [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/) * [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http) * [Actors](./examples/src/main/java/io/dapr/examples/actors/) * [Secrets management](./examples/src/main/java/io/dapr/examples/secrets) diff --git a/examples/src/main/java/io/dapr/examples/DaprApplication.java b/examples/src/main/java/io/dapr/examples/DaprApplication.java index b98575a5b..3b9657f0c 100644 --- a/examples/src/main/java/io/dapr/examples/DaprApplication.java +++ b/examples/src/main/java/io/dapr/examples/DaprApplication.java @@ -17,17 +17,42 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** - * Dapr's HTTP callback implementation via SpringBoot. + * Dapr's callback implementation via SpringBoot. */ @SpringBootApplication public class DaprApplication { /** - * Starts Dapr's callback in a given port. + * Starts Dapr's callback in a given port and specified protocal. + * + * @param port Port to listen to. + * @param protocal select Http or gRPC to run. + */ + public static void start(String protocal, int port) { + SpringApplication app = new SpringApplication(DaprApplication.class); + + String args; + if (protocal.equals("grpc")) { + args = String.format("--grpc.server.port=%d", port); + } else if (protocal.equals("http")) { + args = String.format("--server.port=%d", port); + } else { + System.out.println("please select protocal in grpc or http."); + return; + } + + app.run(args); + } + + /** + * Starts Dapr's callback in a given port. HTTP is used by default. + * * @param port Port to listen to. */ public static void start(int port) { + SpringApplication app = new SpringApplication(DaprApplication.class); + app.run(String.format("--server.port=%d", port)); } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/README.md b/examples/src/main/java/io/dapr/examples/pubsub/README.md index 91cb6b56f..4c713af45 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/README.md @@ -40,7 +40,7 @@ cd examples ### Running the subscriber -The first is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: +The first is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There are gRPC version and HTTP version of subscriber in grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: ```java public class Subscriber { @@ -48,13 +48,15 @@ public class Subscriber { public static void main(String[] args) throws Exception { ///... // Start Dapr's callback endpoint. - DaprApplication.start(port); + DaprApplication.start([PROTOCAL],port); } } ``` -`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request. The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features. +`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implemente the grpc methods that sidecar will call. -This Spring Controller handles the message endpoint, printing the message which is received as the POST body. +**HTTP Version** + +The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body. The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in [Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations). @@ -116,7 +118,9 @@ public class SubscriberController { } ``` -Execute the follow script in order to run the Subscriber example: + + +Execute the follow script in order to run the HTTP Subscriber example: ```bash -dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000 +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 ``` +**gRPC Version** + +The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar. + +The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get topic and pubsubname that are contained in response before subscription starts. After the pubsub component in sidecar subscribes successfully from the specified topic, message will be sent to the method `onTopicEvent` in request parameter. + +```java +@GrpcService +public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { + private final List topicSubscriptionList = new ArrayList<>(); + private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer(); + + @Override + public void listTopicSubscriptions(Empty request, + StreamObserver responseObserver) { + registerConsumer("messagebus","testingtopic"); + try { + DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos.ListTopicSubscriptionsResponse + .newBuilder(); + topicSubscriptionList.forEach(builder::addSubscriptions); + DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); + responseObserver.onNext(response); + } catch (Throwable e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, + StreamObserver responseObserver) { + try { + System.out.println("Subscriber got: " + request.getData()); + DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + ///... +} +``` + + +Execute the follow script in order to run the gRPC Subscriber example: + +```bash +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 +``` + + ### Running the publisher Another component is the publisher. It is a simple java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. diff --git a/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java new file mode 100644 index 000000000..e2df82b00 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.pubsub.grpc; + +import io.dapr.examples.DaprApplication; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; + +/** + * Service for subscriber. + * 1. Build and install jars: + * mvn clean install + * 2. cd [repo root]/examples + * 3. Run the server: + * dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- \ + * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 + */ +public class Subscriber { + + /** + * This is the entry point for this example app, which subscribes to a topic. + * @param args The port this app will listen on. + * @throws Exception An Exception on startup. + */ + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addRequiredOption("p", "port", true, "The port this app will listen on"); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + // If port string is not valid, it will throw an exception. + int port = Integer.parseInt(cmd.getOptionValue("port")); + + //start a grpc server + Server server = ServerBuilder.forPort(port) + .addService(new SubscriberGrpcService()) + .build(); + server.start(); + server.awaitTermination(); + + // Start Dapr's callback endpoint. + DaprApplication.start("grpc",port); + } +} diff --git a/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java b/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java new file mode 100644 index 000000000..dc8f86924 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java @@ -0,0 +1,77 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.pubsub.grpc; + +import com.google.protobuf.Empty; +import io.dapr.v1.AppCallbackGrpc; +import io.dapr.v1.DaprAppCallbackProtos; +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Class that encapsulates all client-side logic for Grpc. + */ +public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { + private final List topicSubscriptionList = new ArrayList<>(); + + @Override + public void listTopicSubscriptions(Empty request, + StreamObserver responseObserver) { + registerConsumer("messagebus","testingtopic"); + try { + DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos + .ListTopicSubscriptionsResponse.newBuilder(); + topicSubscriptionList.forEach(builder::addSubscriptions); + DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); + responseObserver.onNext(response); + } catch (Throwable e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, + StreamObserver responseObserver) { + try { + String data = request.getData().toStringUtf8().replace("\"", ""); + System.out.println("Subscriber got: " + data); + DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + + /** + * Add pubsub name and topic to topicSubscriptionList. + * + * @param topic the topic + * @param pubsubName the pubsub name + */ + public void registerConsumer(String pubsubName, String topic) { + topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription + .newBuilder() + .setPubsubName(pubsubName) + .setTopic(topic) + .build()); + } +} + diff --git a/examples/src/main/java/io/dapr/examples/pubsub/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/http/Subscriber.java similarity index 91% rename from examples/src/main/java/io/dapr/examples/pubsub/Subscriber.java rename to examples/src/main/java/io/dapr/examples/pubsub/http/Subscriber.java index cfd0bbd20..b5e9ab3c7 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/Subscriber.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/Subscriber.java @@ -11,7 +11,7 @@ limitations under the License. */ -package io.dapr.examples.pubsub; +package io.dapr.examples.pubsub.http; import io.dapr.examples.DaprApplication; import org.apache.commons.cli.CommandLine; @@ -26,7 +26,7 @@ * 2. cd [repo root]/examples * 3. Run the server: * dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- \ - * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000 + * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 */ public class Subscriber { @@ -46,6 +46,6 @@ public static void main(String[] args) throws Exception { int port = Integer.parseInt(cmd.getOptionValue("port")); // Start Dapr's callback endpoint. - DaprApplication.start(port); + DaprApplication.start("http",port); } } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java similarity index 96% rename from examples/src/main/java/io/dapr/examples/pubsub/SubscriberController.java rename to examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index cbfd4f7bc..e096402de 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -11,7 +11,7 @@ limitations under the License. */ -package io.dapr.examples.pubsub; +package io.dapr.examples.pubsub.http; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.Rule;