Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Messaging codestart #21068

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{#include readme-header /}

{#each input.selected-extensions-ga}
{#switch it}
{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'}
[Related Apache Kafka guide section...](https://quarkus.io/guides/kafka-reactive-getting-started)

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'}
[Related Apache AMQP 1.0 guide section...](https://quarkus.io/guides/amqp)

{/switch}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{#each input.selected-extensions-ga}
{#switch it}
{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'}
mp:
messaging:
outgoing:
source-out:
connector: smallrye-kafka
topic: word
uppercase-out:
connector: smallrye-kafka
topic: uppercase-word
incoming:
source-in:
connector: smallrye-kafka
topic: word
uppercase-in:
connector: smallrye-kafka
topic: uppercase-word

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-mqtt'}
mp:
messaging:
outgoing:
source-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-out:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word
incoming:
source-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: word
uppercase-in:
connector: smallrye-mqtt
host: localhost
port: '1883'
topic: uppercase-word

{#case 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'}
mp:
messaging:
outgoing:
source-out:
address: word
connector: smallrye-amqp
uppercase-out:
connector: smallrye-amqp
address: uppercase-word
incoming:
source-in:
connector: smallrye-amqp
address: word
uppercase-in:
address: uppercase-word
connector: smallrye-amqp

{/switch}
{/each}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: reactive-messaging-codestart
ref: reactive-messaging
tags: extension-codestart
type: code
metadata:
title: Reactive Messaging codestart
description: Use SmallRye Reactive Messaging
language:
base:
dependencies:
test-dependencies:
- io.smallrye.reactive:smallrye-reactive-messaging-in-memory
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.acme;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.acme;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySink;
import io.smallrye.reactive.messaging.connectors.InMemorySource;
import org.junit.jupiter.api.Test;

import javax.enterprise.inject.Any;
import javax.inject.Inject;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@QuarkusTest
@QuarkusTestResource(MyReactiveMessagingApplicationTest.InMemoryChannelTestResource.class)
class MyReactiveMessagingApplicationTest {

@Inject
@Any
InMemoryConnector connector;

@Test
void test() {
InMemorySource<String> source = connector.source("source-in");
InMemorySink<String> uppercase = connector.sink("uppercase-out");

source.send("Hello");
source.send("In-memory");
source.send("Connectors");

assertEquals(3, uppercase.received().size());
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("HELLO")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("IN-MEMORY")));
assertTrue(uppercase.received().stream().anyMatch(message -> message.getPayload().equals("CONNECTORS")));

}

public static class InMemoryChannelTestResource implements QuarkusTestResourceLifecycleManager {

@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory("source-in"));
env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase-out"));
return env;
}

@Override
public void stop() {
InMemoryConnector.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ metadata:
- "web"
- "reactive"
- "messaging"
status: "experimental"
status: "experimental"
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.amqp."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ metadata:
- "mp.messaging."
- "quarkus.reactive-messaging."
- "quarkus.kafka."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ metadata:
config:
- "mp.messaging."
- "quarkus.reactive-messaging."
codestart:
name: "reactive-messaging"
languages:
- "java"
artifact: "io.quarkus:quarkus-project-core-extension-codestarts"
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractPathAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
Expand Down Expand Up @@ -279,8 +283,12 @@ private AbstractPathAssert<?> checkGeneratedSource(String sourceDir, Language la
}

private String getTestId() {
String tool = buildTool != null ? buildTool.getKey() + "-" : "";
return tool + String.join("-", codestarts);
final String id = Stream.of(
buildTool != null ? Stream.of(buildTool.getKey()) : Stream.<String> empty(),
this.extensions.stream().map(ArtifactCoords::getArtifactId),
this.codestarts.stream(),
Stream.of(UUID.randomUUID().toString())).flatMap(Function.identity()).collect(Collectors.joining("-"));
return id;
}

private void generateRealDataProjectIfNeeded(Path path, Language language) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ class QuarkusCodestartBuildIT extends PlatformAwareTestBase {

private static final Path testDirPath = Paths.get("target/quarkus-codestart-build-test");

private static final Set<String> EXCLUDED = Sets.newHashSet("spring-web-codestart", "picocli-codestart",
"hibernate-orm-codestart");
private static final Set<String> EXCLUDED = Sets.newHashSet(
"spring-web-codestart",
"picocli-codestart",
"hibernate-orm-codestart",
"reactive-messaging-codestart");

@BeforeAll
static void setUp() throws IOException {
Expand Down Expand Up @@ -135,7 +138,7 @@ private String genName(String buildtool, String language, List<String> codestart
if (codestarts.isEmpty()) {
name += "-default";
} else if (codestarts.size() > 2) {
name += "-" + UUID.randomUUID().toString();
name += "-" + UUID.randomUUID();
} else {
name += "-" + String.join("-", codestarts);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.devtools.codestarts.quarkus;

import static io.quarkus.devtools.codestarts.quarkus.QuarkusCodestartCatalog.Language.JAVA;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.devtools.testing.codestarts.QuarkusCodestartTest;
import io.quarkus.maven.ArtifactKey;

public class ReactiveMessagingCodestartIT {

@RegisterExtension
public static QuarkusCodestartTest kafkaCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-kafka"))
.languages(JAVA)
.build();

@RegisterExtension
public static QuarkusCodestartTest amqpCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-amqp"))
.languages(JAVA)
.build();

@RegisterExtension
public static QuarkusCodestartTest mqttCodestartTest = QuarkusCodestartTest.builder()
.extension(ArtifactKey.fromString("io.quarkus:quarkus-smallrye-reactive-messaging-mqtt"))
.languages(JAVA)
.build();

@Test
void testKafkaContent() throws Throwable {
kafkaCodestartTest.checkGeneratedSource("org.acme.MyReactiveMessagingApplication");
kafkaCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
kafkaCodestartTest.checkGeneratedTestSource("org.acme.MyReactiveMessagingApplicationTest");
}

@Test
void testMQTTContent() throws Throwable {
mqttCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
}

@Test
void testAMQPContent() throws Throwable {
amqpCodestartTest.assertThatGeneratedFileMatchSnapshot(JAVA, "src/main/resources/application.properties");
}

@Test
@DisabledOnOs(OS.WINDOWS)
void buildKafka() throws Throwable {
kafkaCodestartTest.buildAllProjects();
}

@Test
@DisabledOnOs(OS.WINDOWS)
void buildMQTT() throws Throwable {
mqttCodestartTest.buildAllProjects();
}

@Test
@DisabledOnOs(OS.WINDOWS)
void buildAMQP() throws Throwable {
amqpCodestartTest.buildAllProjects();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mp.messaging.outgoing.uppercase-out.connector=smallrye-amqp
mp.messaging.outgoing.uppercase-out.address=uppercase-word
mp.messaging.incoming.source-in.connector=smallrye-amqp
mp.messaging.incoming.uppercase-in.address=uppercase-word
mp.messaging.outgoing.source-out.address=word
mp.messaging.incoming.source-in.address=word
mp.messaging.outgoing.source-out.connector=smallrye-amqp
mp.messaging.incoming.uppercase-in.connector=smallrye-amqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ilove.quark.us;

import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.stream.Stream;

@ApplicationScoped
public class MyReactiveMessagingApplication {

@Inject
@Channel("source-out")
Emitter<String> emitter;

/** Sends message to the source channel, can be used from a JAX-RS resource or any bean of your application **/
void onStart(@Observes StartupEvent ev) {
Stream.of("Hello", "with", "SmallRye", "reactive", "message").forEach(string -> emitter.send(string));
}

/** Consume the message from the source channel, uppercase it and send it to the uppercase channel **/
@Incoming("source-in")
@Outgoing("uppercase-out")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}

/** Consume the uppercase channel and print the message **/
@Incoming("uppercase-in")
public void sink(String word) {
System.out.println(">> " + word);
}
}
Loading