Skip to content

Commit

Permalink
Add example to Spring Integration with Reactive Streams
Browse files Browse the repository at this point in the history
* Add outbound channel adapter example

* Add an example for the `CustomReactiveMessageHandler` usage

* Emphasize that the examples are for Reactive Streams

* Fix a missing letter that caused the text to get inside the code block

* Emphasize that the first example is an event driven inbound channel adapter

* Fix a redundant question mark from the example code
  • Loading branch information
migroskub authored Feb 14, 2022
1 parent f6bb91c commit 285c380
Showing 1 changed file with 139 additions and 4 deletions.
143 changes: 139 additions & 4 deletions src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,147 @@ When the target protocol for integration provides a Reactive Streams solution, i
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred `Mono` or `Flux` and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a `Mono` returned from the listener method.
This way we have a reactive stream solution encapsulated exactly in this component.
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
This is not always available by the nature (or the current implementation) of `MessageHandler` processor used in the integration flow.

This is not always available by the nature (or with the current implementation) of `MessageHandler` processor used in the integration flow.
This limitation can be handled using thread pools and queues or `FluxMessageChannel` (see above) before and after integration endpoints when there is no reactive implementation.

A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol.
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top.
A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
An example for a reactive **event-driven** inbound channel adapter:
```java
public class CustomReactiveMessageProducer extends MessageProducerSupport {

private final CustomReactiveSource customReactiveSource;

public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}

@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());

subscribeToPublisher(messageFlux);
}
}
```

Usage would look like:

```java
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
```
Or in a declarative way:

```java
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
```
Or even without a channel adapter, we can always use the Java DSL in the following way:
```java
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlows.from(myFlux)
.handle(outputChannel)
.get();
}
}
```

A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol.
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top.
A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.

An example for a reactive outbound channel adapter:
```java
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

private final CustomEntityOperations customEntityOperations;

public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}

@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}

private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}

private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}

public enum Type {
INSERT,
UPDATE,
}
}
```

We will be able to use both of the channel adatpers:
```java
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;

@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
```


Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>> and <<./r2dbc.adoc#r2dbc,R2DBC>>.
The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.
Expand Down

0 comments on commit 285c380

Please sign in to comment.