Skip to content

Commit

Permalink
Add PubSub subscriber examples over gPRC (#833)
Browse files Browse the repository at this point in the history
* add grpc subscriber

Signed-off-by: MregXN <[email protected]>

* modify README.md

Signed-off-by: MregXN <[email protected]>

* modify README.md in examples

Signed-off-by: MregXN <[email protected]>

* Modify DaprApplication to support examples where protocol is not specified.

Signed-off-by: MregXN <[email protected]>

* modify formatter to pass checkstyle

Signed-off-by: MregXN <[email protected]>

* Update springboot to latest minor.patch version. (#826)

Signed-off-by: MregXN <[email protected]>

* Use runtime 1.10.0-rc.X and CLI 1.10.0-rc.X (#827)

Signed-off-by: Artur Souza <[email protected]>
Signed-off-by: MregXN <[email protected]>

* Upgrade the version to 1.9.0-SNAPSHOT (#829)

Signed-off-by: Artur Souza <[email protected]>
Signed-off-by: MregXN <[email protected]>

* Generate updated javadocs for 1.8.0 (#836)

Signed-off-by: Artur Souza <[email protected]>
Signed-off-by: MregXN <[email protected]>

* Update Dapr runtime and CLI to 1.10. (#837)

Signed-off-by: Artur Souza <[email protected]>
Signed-off-by: MregXN <[email protected]>

* Inject autoconfiguration in the Spring Boot 3 style (#831)

* Bump from spring boot 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <[email protected]>
(cherry picked from commit 9152c91)

* Ensure old versions of spring boot are still compatible

Signed-off-by: Sergio <[email protected]>

---------

Signed-off-by: champel <[email protected]>
Signed-off-by: Sergio <[email protected]>
Signed-off-by: MregXN <[email protected]>

* Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830)

* Bump from reactor 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <[email protected]>

* Simplification

Signed-off-by: Sergio <[email protected]>

---------

Signed-off-by: Sergio <[email protected]>
Signed-off-by: MregXN <[email protected]>

* rerun checks

Signed-off-by: MregXN <[email protected]>

* modify the way of grpc server starts

Signed-off-by: MregXN <[email protected]>

* modify README

Signed-off-by: MregXN <[email protected]>

* Update pom.xml

Signed-off-by: MregXN <[email protected]>

---------

Signed-off-by: MregXN <[email protected]>
Signed-off-by: Artur Souza <[email protected]>
Signed-off-by: champel <[email protected]>
Signed-off-by: Sergio <[email protected]>
Signed-off-by: MregXN <[email protected]>
Co-authored-by: Artur Souza <[email protected]>
Co-authored-by: champel <[email protected]>
Co-authored-by: Mukundan Sundararajan <[email protected]>
  • Loading branch information
4 people authored May 22, 2023
1 parent aaa5f7b commit 375e867
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Try the following examples to learn more about Dapr's Java SDK:
* [Invoking a Http service](./examples/src/main/java/io/dapr/examples/invoke/http)
* [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc)
* [State management](./examples/src/main/java/io/dapr/examples/state)
* [PubSub with subscriber over Http](./examples/src/main/java/io/dapr/examples/pubsub/http)
* [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/)
* [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http)
* [Actors](./examples/src/main/java/io/dapr/examples/actors/)
* [Secrets management](./examples/src/main/java/io/dapr/examples/secrets)
Expand Down
29 changes: 27 additions & 2 deletions examples/src/main/java/io/dapr/examples/DaprApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,42 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Dapr's HTTP callback implementation via SpringBoot.
* Dapr's callback implementation via SpringBoot.
*/
@SpringBootApplication
public class DaprApplication {

/**
* Starts Dapr's callback in a given port.
* Starts Dapr's callback in a given port and specified protocal.
*
* @param port Port to listen to.
* @param protocal select Http or gRPC to run.
*/
public static void start(String protocal, int port) {
SpringApplication app = new SpringApplication(DaprApplication.class);

String args;
if (protocal.equals("grpc")) {
args = String.format("--grpc.server.port=%d", port);
} else if (protocal.equals("http")) {
args = String.format("--server.port=%d", port);
} else {
System.out.println("please select protocal in grpc or http.");
return;
}

app.run(args);
}

/**
* Starts Dapr's callback in a given port. HTTP is used by default.
*
* @param port Port to listen to.
*/
public static void start(int port) {

SpringApplication app = new SpringApplication(DaprApplication.class);

app.run(String.format("--server.port=%d", port));
}

Expand Down
71 changes: 65 additions & 6 deletions examples/src/main/java/io/dapr/examples/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,23 @@ cd examples

### Running the subscriber

The first is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below:
The first is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There are gRPC version and HTTP version of subscriber in grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below:

```java
public class Subscriber {

public static void main(String[] args) throws Exception {
///...
// Start Dapr's callback endpoint.
DaprApplication.start(port);
DaprApplication.start([PROTOCAL],port);
}
}
```
`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request. The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features.
`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implemente the grpc methods that sidecar will call.

This Spring Controller handles the message endpoint, printing the message which is received as the POST body.
**HTTP Version**

The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body.

The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).
Expand Down Expand Up @@ -116,7 +118,9 @@ public class SubscriberController {
}
```

Execute the follow script in order to run the Subscriber example:


Execute the follow script in order to run the HTTP Subscriber example:

<!-- STEP
name: Run Subscriber
Expand All @@ -133,11 +137,66 @@ sleep: 5
-->

```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
```

<!-- END_STEP -->

**gRPC Version**

The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar.

The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get topic and pubsubname that are contained in response before subscription starts. After the pubsub component in sidecar subscribes successfully from the specified topic, message will be sent to the method `onTopicEvent` in request parameter.

```java
@GrpcService
public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
private final List<DaprAppCallbackProtos.TopicSubscription> topicSubscriptionList = new ArrayList<>();
private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer();

@Override
public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus","testingtopic");
try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos.ListTopicSubscriptionsResponse
.newBuilder();
topicSubscriptionList.forEach(builder::addSubscriptions);
DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build();
responseObserver.onNext(response);
} catch (Throwable e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}

@Override
public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
StreamObserver<DaprAppCallbackProtos.TopicEventResponse> responseObserver) {
try {
System.out.println("Subscriber got: " + request.getData());
DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder()
.setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}
///...
}
```


Execute the follow script in order to run the gRPC Subscriber example:

```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000
```


### Running the publisher

Another component is the publisher. It is a simple java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.examples.DaprApplication;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;

/**
* Service for subscriber.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the server:
* dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000
*/
public class Subscriber {

/**
* This is the entry point for this example app, which subscribes to a topic.
* @param args The port this app will listen on.
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "The port this app will listen on");

CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);

// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));

//start a grpc server
Server server = ServerBuilder.forPort(port)
.addService(new SubscriberGrpcService())
.build();
server.start();
server.awaitTermination();

// Start Dapr's callback endpoint.
DaprApplication.start("grpc",port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import com.google.protobuf.Empty;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.DaprAppCallbackProtos;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.List;

/**
* Class that encapsulates all client-side logic for Grpc.
*/
public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
private final List<DaprAppCallbackProtos.TopicSubscription> topicSubscriptionList = new ArrayList<>();

@Override
public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus","testingtopic");
try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos
.ListTopicSubscriptionsResponse.newBuilder();
topicSubscriptionList.forEach(builder::addSubscriptions);
DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build();
responseObserver.onNext(response);
} catch (Throwable e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}

@Override
public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
StreamObserver<DaprAppCallbackProtos.TopicEventResponse> responseObserver) {
try {
String data = request.getData().toStringUtf8().replace("\"", "");
System.out.println("Subscriber got: " + data);
DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder()
.setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}

/**
* Add pubsub name and topic to topicSubscriptionList.
*
* @param topic the topic
* @param pubsubName the pubsub name
*/
public void registerConsumer(String pubsubName, String topic) {
topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription
.newBuilder()
.setPubsubName(pubsubName)
.setTopic(topic)
.build());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
limitations under the License.
*/

package io.dapr.examples.pubsub;
package io.dapr.examples.pubsub.http;

import io.dapr.examples.DaprApplication;
import org.apache.commons.cli.CommandLine;
Expand All @@ -26,7 +26,7 @@
* 2. cd [repo root]/examples
* 3. Run the server:
* dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
*/
public class Subscriber {

Expand All @@ -46,6 +46,6 @@ public static void main(String[] args) throws Exception {
int port = Integer.parseInt(cmd.getOptionValue("port"));

// Start Dapr's callback endpoint.
DaprApplication.start(port);
DaprApplication.start("http",port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
limitations under the License.
*/

package io.dapr.examples.pubsub;
package io.dapr.examples.pubsub.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
Expand Down

0 comments on commit 375e867

Please sign in to comment.