Skip to content

Commit

Permalink
Bi-direction subscription (#1124)
Browse files Browse the repository at this point in the history
* Bi-di subscription.

Add bidi subscription to validate workflow.

Signed-off-by: Artur Souza <[email protected]>

* Make bi-di subscriber to use Mono

Signed-off-by: Artur Souza <[email protected]>

---------

Signed-off-by: Artur Souza <[email protected]>
  • Loading branch information
artursouza authored Oct 8, 2024
1 parent 7490434 commit cb552ba
Show file tree
Hide file tree
Showing 19 changed files with 838 additions and 613 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
- name: Validate streaming subscription example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ Try the following examples to learn more about Dapr's Java SDK:
* [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc)
* [State management](./examples/src/main/java/io/dapr/examples/state)
* [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/)
* [PubSub with streaming subscription](./examples/src/main/java/io/dapr/examples/pubsub/stream/)
* [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http)
* [Actors](./examples/src/main/java/io/dapr/examples/actors/)
* [Workflows](./examples/src/main/java/io/dapr/examples/workflows/)
* [Secrets management](./examples/src/main/java/io/dapr/examples/secrets)
* [Configuration](./examples/src/main/java/io/dapr/examples/configuration)
* [Distributed tracing with OpenTelemetry SDK](./examples/src/main/java/io/dapr/examples/tracing)
Expand Down
541 changes: 0 additions & 541 deletions examples/README.md

This file was deleted.

122 changes: 122 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Dapr Streaming Subscription Sample

In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture.

Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.

## Pub-Sub Sample using the Java-SDK

This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5).
## Pre-requisites

* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.

### Checking out the code

Clone this repository:

```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```

Then build the Maven project:

```sh
# make sure you are in the `java-sdk` directory.
mvn install
```

Then get into the examples directory:

```sh
cd examples
```
### Initialize Dapr

Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.

### Running the subscriber

The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface.



The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.

In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.

```java
public class Subscriber {

// ...

public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
}
}

// ...
}
```

Execute the following command to run the Subscriber example:

<!-- STEP
name: Run Subscriber
expected_stdout_lines:
- '== APP == Subscriber got: This is message #0'
- '== APP == Subscriber got: This is message #1'
background: true
sleep: 30
-->

```bash
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
```

<!-- END_STEP -->

Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:

<!-- STEP
name: Run Publisher
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
sleep: 15
-->

```bash
dapr run --resources-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
```

<!-- END_STEP -->


Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.stream;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;

/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the subscriber:
* dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
*/
public class Subscriber {

//The title of the topic to be used for publishing
private static final String DEFAULT_TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";

/**
* This is the entry point for this example app, which subscribes to a topic.
* @param args Used to optionally pass a topic name.
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
}
}

/**
* If a topic is specified in args, use that.
* Else, fallback to the default topic.
* @param args program arguments
* @return name of the topic to publish messages to.
*/
private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
}
return DEFAULT_TOPIC_NAME;
}
}
57 changes: 18 additions & 39 deletions examples/src/main/java/io/dapr/examples/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ This example contains the follow classes:
* DemoWorkflow: An example of a Dapr Workflow.
* DemoWorkflowClient: This application will start workflows using Dapr.
* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance.

## Pre-requisites

* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.

### Checking out the code
Expand Down Expand Up @@ -54,8 +54,8 @@ Those examples contain the following workflow patterns:
5. [Sub-workflow Pattern](#sub-workflow-pattern)

### Chaining Pattern
In the chaining pattern, a sequence of activities executes in a specific order.
In this pattern, the output of one activity is applied to the input of another activity.
In the chaining pattern, a sequence of activities executes in a specific order.
In this pattern, the output of one activity is applied to the input of another activity.
The chaining pattern is useful when you need to execute a sequence of activities in a specific order.

The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below:
Expand Down Expand Up @@ -149,6 +149,7 @@ Execute the following script in order to run DemoChainWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
```

Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
```text
== APP == Start workflow runtime
Expand All @@ -162,6 +163,8 @@ java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chai
```
<!-- END_STEP -->



Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
```text
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
Expand Down Expand Up @@ -237,7 +240,7 @@ public class CountWordsActivity implements WorkflowActivity {
}
```
<!-- STEP
name: Run Fan-in/Fan-out Pattern workflow
name: Run Chaining Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
Expand All @@ -255,7 +258,9 @@ Execute the following script in order to run DemoFanInOutWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
```

Execute the following script in order to run DemoFanInOutClient:

```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
```
Expand Down Expand Up @@ -343,7 +348,7 @@ public class CleanUpActivity implements WorkflowActivity {

Once you start the workflow and client using the following commands:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
Expand Down Expand Up @@ -406,36 +411,21 @@ public class DemoExternalEventWorkflow extends Workflow {
}
```
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
```java
client.raiseEvent(instanceId, "Approval", true);
```
Start the workflow and client using the following commands:
<!-- STEP
name: Run Wait External Event Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow'
- 'Waiting for approval...'
- 'approval granted - do the approved action'
- 'Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity'
- 'Running approval activity...'
- 'approval-activity finished'
background: true
sleep: 60
timeout_seconds: 60
-->
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
```
<!-- END_STEP -->
The worker logs:
```text
Expand Down Expand Up @@ -521,25 +511,14 @@ public class ReverseActivity implements WorkflowActivity {
Start the workflow and client using the following commands:
<!-- STEP
name: Run Sub-workflow Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'calling subworkflow with input: Hello Dapr Workflow!'
- 'SubWorkflow finished with: !wolfkroW rpaD olleH'
background: true
sleep: 60
timeout_seconds: 60
-->
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
```
<!-- END_STEP -->
The log from worker:
```text
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<grpc.version>1.64.0</grpc.version>
<protobuf.version>3.25.0</protobuf.version>
<protocCommand>protoc</protocCommand>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto</dapr.proto.baseurl>
<dapr.sdk.version>1.13.0-SNAPSHOT</dapr.sdk.version>
<dapr.sdk.alpha.version>0.13.0-SNAPSHOT</dapr.sdk.alpha.version>
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>
Expand Down
1 change: 1 addition & 0 deletions sdk-autogen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<description>Auto-generated SDK for Dapr</description>

<properties>
<java.version>17</java.version>
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
<protobuf.input.directory>${project.build.directory}/proto</protobuf.input.directory>
<maven.deploy.skip>false</maven.deploy.skip>
Expand Down
Loading

0 comments on commit cb552ba

Please sign in to comment.