Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncAPI v3 #184

Merged
merged 11 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/modules/ROOT/pages/scanner.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ For instance, with Maven, add the following dependency to your POM file:
For more information about the extension configuration please refer to the <<configuration-reference, Configuration Reference>>.

include::includes/quarkus-asyncapi-annotation-scanner.adoc[leveloffset=+1, opts=optional]

== Migration to V1 (switch from AsyncApi v2.6.0 to v3.0.0)
The structure of asyncApi has multiple breaking changes. See https://www.asyncapi.com/docs/migration/migrating-to-v3[migration to V3] for details.
To apply to these changes following changed in the extension-configuration:

* `server.url` is divided into `server.host` and `server.pathname` (see https://www.asyncapi.com/docs/migration/migrating-to-v3#server-url-splitting-up[server-url splitting up])
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<quarkus.version>3.10.0</quarkus.version>
<version.org.assertj>3.25.3</version.org.assertj>
<version.org.slf4j>2.0.13</version.org.slf4j>
<version.asyncapi.core>1.0.0-EAP-2</version.asyncapi.core>
<version.asyncapi.core>1.0.0-RC</version.asyncapi.core>
<version.org.mockito>5.11.0</version.org.mockito>
<version.org.projectlombok>1.18.32</version.org.projectlombok>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.logging.Logger;

import org.eclipse.microprofile.config.ConfigProvider;

import com.asyncapi.v2._0_0.model.AsyncAPI;
import com.asyncapi.v3._0_0.model.AsyncAPI;

import io.quarkiverse.asyncapi.annotation.scanner.config.AsyncApiRuntimeConfig;
import io.quarkus.deployment.annotations.BuildProducer;
Expand All @@ -34,9 +35,19 @@ void scanAsyncAPIs(
AsyncApiRuntimeConfig aConfig,
RecorderContext aRecorderContext) {
aRecorderContext.registerSubstitution(BigDecimal.class, Double.class, BigDecimalSubstitution.class);
AsyncApiBuilder builder = new AsyncApiBuilder();
AsyncAPI asyncAPI = builder.build(aIndex.getIndex(), aConfig);
aRecorder.store(asyncAPI, aConfig);
AsyncApiConfigResolver configResolver = new AsyncApiConfigResolver(aConfig);
AsyncApiAnnotationScanner scanner = new AsyncApiAnnotationScanner(aIndex.getIndex(), configResolver);
AsyncAPI.AsyncAPIBuilder builder = AsyncAPI.builder()
.asyncapi(aConfig.version)
.id(configResolver.getConfiguredKafkaBootstrapServer())
.info(configResolver.getInfo())
.defaultContentType(aConfig.defaultContentType);
builder = scanner.setData(builder);
Map<String, Object> servers = configResolver.getServers();
if (servers != null) {
builder.servers(servers);
}
aRecorder.store(new MyAsyncAPI(builder.build()), aConfig);
}

static class IsEnabled implements BooleanSupplier {
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import org.eclipse.microprofile.config.ConfigProvider;

import com.asyncapi.v2._0_0.model.info.Contact;
import com.asyncapi.v2._0_0.model.info.Info;
import com.asyncapi.v2._0_0.model.info.License;
import com.asyncapi.v2._0_0.model.server.Server;
import com.asyncapi.v3._0_0.model.info.Contact;
import com.asyncapi.v3._0_0.model.info.Info;
import com.asyncapi.v3._0_0.model.info.License;
import com.asyncapi.v3._0_0.model.server.Server;

import io.quarkiverse.asyncapi.annotation.scanner.config.AsyncApiRuntimeConfig;
import io.quarkiverse.asyncapi.annotation.scanner.config.Channel;
Expand Down Expand Up @@ -63,11 +63,16 @@ public String getTopic(boolean aIsEmitter, String aChannel) {
return ConfigProvider.getConfig().getOptionalValue(configKey, String.class).orElse(aChannel);
}

public Optional<String> getGroupId(boolean aIsEmitter, String aChannel) {
String configKey = "mp.messaging." + (aIsEmitter ? "outgoing" : "incoming") + "." + aChannel + ".group.id";
return ConfigProvider.getConfig().getOptionalValue(configKey, String.class);
}

public Channel getChannel(String aChannel) {
return config.channels.get(aChannel);
}

public Map<String, Server> getServers() {
public Map<String, Object> getServers() {
if (config.servers.isEmpty()) {
return null;
}
Expand All @@ -76,10 +81,10 @@ public Map<String, Server> getServers() {
}

Server toAsyncApiServer(io.quarkiverse.asyncapi.annotation.scanner.config.Server aConfigServer) {
return Server.builder()
//TODO
Server.ServerBuilder builder = Server.builder()
.protocol(aConfigServer.protocol)
.url(aConfigServer.url)
.build();
.host(aConfigServer.host);
aConfigServer.pathname.ifPresent(builder::pathname);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
@ApplicationScoped
public class DummyController {

@Inject
@Channel("MyOut")
@Broadcast
@org.eclipse.microprofile.openapi.annotations.media.Schema(description = "my inOut emitter description")
Emitter<TestMessage<TestMessageData>> inOutEmitter;

@Inject
@Channel("channel-x")
@Broadcast
Expand All @@ -40,6 +46,12 @@ public class DummyController {
@Schema(description = "transferEmitter description2")
Emitter<TransferMessage<String>> transferEmitter2;

@Incoming("MyIn")
@Schema(implementation = { GecMessage.class, Part.class })
public void receiveMyIn(String aData) {
//Do nothing
}

@Incoming("incoming-channel-string")
@Schema(implementation = { GecMessage.class, Part.class })
public void receiveMessageString(String aData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.asyncapi.v2._0_0.model.channel.ChannelItem;
import com.asyncapi.v2._0_0.model.channel.message.Message;
import com.asyncapi.v2._0_0.model.channel.operation.Operation;
import com.asyncapi.v2.schema.Schema;
import com.asyncapi.v2.schema.Type;
import com.asyncapi.v3._0_0.model.channel.Channel;
import com.asyncapi.v3._0_0.model.channel.message.Message;
import com.asyncapi.v3.schema.AsyncAPISchema;
import com.asyncapi.v3.schema.Type;
import com.fasterxml.jackson.annotation.JsonView;

import io.quarkus.runtime.annotations.RegisterForReflection;
Expand All @@ -23,32 +22,32 @@
public class MyAsyncApiFilter implements AsyncApiFilter {

@Override
public ChannelItem filterChannelItem(String aChannel, ChannelItem aChannelItem) {
if (aChannel.contains("transfer")) {
Operation operation = aChannelItem.getPublish();

Message message = (Message) operation.getMessage();
Class<?> messageClass = getMessageClass(operation);
public Channel filterChannel(String aName, Channel aChannel) {
if (aName.contains("transfer")) {
Map<String, Object> messages = aChannel.getMessages();
Map.Entry<String, Object> firstMessage = messages.entrySet().iterator().next();
Message message = (Message) firstMessage.getValue();
Class<?> messageClass = getMessageClass(firstMessage.getKey());
if (messageClass != null) {
Schema transferMessagePayload = (Schema) message.getPayload();
recurse(messageClass, (Schema) transferMessagePayload.getProperties().get("value"));
AsyncAPISchema transferMessagePayload = (AsyncAPISchema) message.getPayload();
recurse(messageClass, (AsyncAPISchema) transferMessagePayload.getProperties().get("value"));
}
}
return aChannelItem;
return aChannel;
}

void recurse(Class aClass, Schema aSchema) {
void recurse(Class aClass, AsyncAPISchema aSchema) {
if (aSchema.getProperties() == null) {
return;
}
//get over all fields
Map<String, Schema> filteredPayload = aSchema.getProperties().entrySet().stream()
Map<String, Object> filteredPayload = aSchema.getProperties().entrySet().stream()
.filter(e -> isClassTransferRelevant(aClass) || isFieldTransferRelevant(aClass, e.getKey()))
.peek(e -> {
if (Type.OBJECT.equals(e.getValue().getType())) {
if (Type.OBJECT.equals(((AsyncAPISchema) e.getValue()).getType())) {
Field field = getFieldRecursiv(aClass, e.getKey());
if (field != null) {
recurse(field.getType(), e.getValue());
recurse(field.getType(), (AsyncAPISchema) e.getValue());
}
}
})
Expand All @@ -66,11 +65,10 @@ Field getFieldRecursiv(Class aClass, String aFieldName) {
}
}

Class<?> getMessageClass(Operation aOperation) {
Class<?> getMessageClass(String aOperationId) {
try {
String operationId = aOperation.getOperationId();
Class<?> clazz = Class.forName(operationId.substring(0, operationId.lastIndexOf('.')));
Field field = clazz.getDeclaredField(operationId.substring(operationId.lastIndexOf(".") + 1));
Class<?> clazz = Class.forName(aOperationId.substring(0, aOperationId.lastIndexOf('.')));
Field field = clazz.getDeclaredField(aOperationId.substring(aOperationId.lastIndexOf(".") + 1));
ParameterizedType outerGenericType = (ParameterizedType) field.getGenericType();
ParameterizedType innerGenericType = (ParameterizedType) outerGenericType.getActualTypeArguments()[0];
return (Class<?>) innerGenericType.getActualTypeArguments()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ mp.messaging.outgoing.outgoing-channel-reactive-part.value.serializer=org.apache

mp.messaging.incoming.prices.connector=smallrye-kafka

#in&out
mp.messaging.outgoing.MyOut.topic=inOutTopic
mp.messaging.outgoing.MyOut.connector=smallrye-kafka
mp.messaging.outgoing.MyOut.value.serializer=org.apache.kafka.common.serialization.StringSerializer
quarkus.asyncapi.annotation.scanner.channel.MyOut.description=description of MyOut from application.properties
mp.messaging.incoming.MyIn.topic=inOutTopic
mp.messaging.incoming.MyIn.connector=smallrye-kafka
mp.messaging.incoming.MyIn.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.MyIn.auto.offset.reset=latest
mp.messaging.incoming.MyIn.health-readiness-enabled=false
mp.messaging.incoming.MyIn.broadcast=true
mp.messaging.incoming.MyIn.group.id=inOutGroupId
quarkus.asyncapi.annotation.scanner.channel.MyIn.description=description of MyIn from application.properties

# Set root path to / (all resources - inclusive html pages are)
quarkus.http.root-path=/test-svc
kafka.bootstrap.servers=PLAINTEXT://localhost:9092
Expand Down Expand Up @@ -87,6 +101,7 @@ quarkus.asyncapi.annotation.scanner.channel.transfer-channel2.description=descri
#quarkus.asyncapi.annotation.scanner.channel."channel-name".subscribe.bindings."kafka".groupId

#AsyncApi server
quarkus.asyncapi.annotation.scanner.server.testServer.url=${kafka.bootstrap.servers}
quarkus.asyncapi.annotation.scanner.server.testServer.host=${kafka.bootstrap.servers}
#quarkus.asyncapi.annotation.scanner.server.testServer.pathname
quarkus.asyncapi.annotation.scanner.server.testServer.protocol=kafka

Loading
Loading