Skip to content

Commit

Permalink
refactor(core): remove usage of ConsumerData in AsyncAnnotationChanne…
Browse files Browse the repository at this point in the history
…lsScanner
  • Loading branch information
timonback committed Dec 16, 2023
1 parent 18abcbc commit 45bb8ab
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.MessageBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.OperationBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelPriority;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.AsyncAnnotationChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ConsumerOperationDataScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.ProducerOperationDataScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.annotation.AsyncAnnotationChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListener;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncPublisher;
Expand Down Expand Up @@ -95,7 +95,7 @@ public AsyncAnnotationChannelsScanner<AsyncListener> asyncListenerAnnotationScan
List<OperationBindingProcessor> operationBindingProcessors,
List<MessageBindingProcessor> messageBindingProcessors) {
return new AsyncAnnotationChannelsScanner<>(
buidAsyncListenerAnnotationProvider(),
buildAsyncListenerAnnotationProvider(),
springwolfClassScanner,
schemasService,
asyncApiDocketService,
Expand Down Expand Up @@ -127,7 +127,8 @@ public AsyncAnnotationChannelsScanner<AsyncPublisher> asyncPublisherAnnotationSc
messageBindingProcessors);
}

private static AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<AsyncListener> buidAsyncListenerAnnotationProvider() {
private static AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<AsyncListener>
buildAsyncListenerAnnotationProvider() {
return new AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<>() {
@Override
public Class<AsyncListener> getAnnotation() {
Expand All @@ -146,7 +147,8 @@ public OperationData.OperationType getOperationType() {
};
}

private static AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<AsyncPublisher> buildAsyncPublisherAnnotationProvider() {
private static AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<AsyncPublisher>
buildAsyncPublisherAnnotationProvider() {
return new AsyncAnnotationChannelsScanner.AsyncAnnotationProvider<>() {
@Override
public Class<AsyncPublisher> getAnnotation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import com.asyncapi.v2._6_0.model.channel.ChannelItem;
import com.asyncapi.v2._6_0.model.channel.operation.Operation;
import com.asyncapi.v2._6_0.model.server.Server;
import com.asyncapi.v2.binding.channel.ChannelBinding;
import com.asyncapi.v2.binding.message.MessageBinding;
import com.asyncapi.v2.binding.operation.OperationBinding;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.MessageBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.bindings.OperationBindingProcessor;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelMerger;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.ChannelsScanner;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.payload.PayloadClassExtractor;
import io.github.stavshamir.springwolf.asyncapi.scanners.classes.ClassScanner;
import io.github.stavshamir.springwolf.asyncapi.types.ConsumerData;
import io.github.stavshamir.springwolf.asyncapi.types.OperationData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.HeaderReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocketService;
import io.github.stavshamir.springwolf.schemas.SchemasService;
Expand All @@ -33,13 +33,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static io.github.stavshamir.springwolf.asyncapi.MessageHelper.toMessageObjectOrComposition;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toList;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -62,189 +58,120 @@ public void setEmbeddedValueResolver(StringValueResolver resolver) {

@Override
public Map<String, ChannelItem> scan() {
Map<String, List<OperationData>> operationDataGroupedByChannelName = this.getOperationData().stream()
.filter(this::allFieldsAreNonNull)
.collect(groupingBy(OperationData::getChannelName));
List<Map.Entry<String, ChannelItem>> channels = classScanner.scan().stream()
.flatMap(this::getAnnotatedMethods)
.map(this::buildChannelItem)
.filter(this::isInvalidChannelItem)
.collect(toList());

return operationDataGroupedByChannelName.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> buildChannel(entry.getValue())));
return ChannelMerger.merge(channels);
}

private boolean allFieldsAreNonNull(OperationData operationData) {
boolean allNonNull = operationData.getChannelName() != null
&& operationData.getPayloadType() != null
&& operationData.getOperationBinding() != null;
private Stream<MethodAndAnnotation<A>> getAnnotatedMethods(Class<?> type) {
Class<A> annotationClass = this.asyncAnnotationProvider.getAnnotation();
log.debug("Scanning class \"{}\" for @\"{}\" annotated methods", type.getName(), annotationClass.getName());

return Arrays.stream(type.getDeclaredMethods())
.filter(method -> !method.isBridge())
.filter(method -> AnnotationUtil.findAnnotation(annotationClass, method) != null)
.peek(method -> log.debug("Mapping method \"{}\" to channels", method.getName()))
.flatMap(method -> AnnotationUtil.findAnnotations(annotationClass, method).stream()
.map(annotation -> new MethodAndAnnotation<>(method, annotation)));
}

private boolean isInvalidChannelItem(Map.Entry<String, ChannelItem> entry) {
Operation publish = entry.getValue().getPublish();
boolean publishBindingExists = publish != null && publish.getBindings() != null;

Operation subscribe = entry.getValue().getSubscribe();
boolean subscribeBindingExists = subscribe != null && subscribe.getBindings() != null;

boolean allNonNull = entry.getKey() != null && (publishBindingExists || subscribeBindingExists);

if (!allNonNull) {
log.warn("Some data fields are null - this method will not be documented: {}", operationData);
log.warn(
"Some data fields are null - method (channel={}) will not be documented: {}",
entry.getKey(),
entry.getValue());
}

return allNonNull;
}

/**
* Creates an asyncapi {@link ChannelItem} using the given list of {@link OperationData}. Expects, that all {@link OperationData}
* items belong to the same channel. Most properties of the resulting {@link ChannelItem} are extracted from the
* first {@link OperationData} item in the list, assuming that all {@link OperationData} contains the same channel
* informations.
*
* @param operationDataList List of all {@link OperationData} items for a single channel.
* @return the resulting {@link ChannelItem}
*/
private ChannelItem buildChannel(List<OperationData> operationDataList) {
// All bindings in the group are assumed to be the same
// AsyncApi does not support multiple bindings on a single channel
Map<String, ? extends ChannelBinding> channelBinding =
operationDataList.get(0).getChannelBinding();
Map<String, ? extends OperationBinding> operationBinding =
operationDataList.get(0).getOperationBinding();
Map<String, Object> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
Map<String, Object> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String operationId = operationDataList.get(0).getChannelName() + "_"
+ this.asyncAnnotationProvider.getOperationType().operationName;
String description = operationDataList.get(0).getDescription();
List<String> servers = operationDataList.get(0).getServers();

if (description.isEmpty()) {
description = "Auto-generated description";
}
private Map.Entry<String, ChannelItem> buildChannelItem(MethodAndAnnotation<A> methodAndAnnotation) {
ChannelItem.ChannelItemBuilder channelBuilder = ChannelItem.builder();

Operation operation = Operation.builder()
.description(description)
.operationId(operationId)
.message(getMessageObject(operationDataList))
.bindings(opBinding)
.build();
AsyncOperation operationAnnotation =
this.asyncAnnotationProvider.getAsyncOperation(methodAndAnnotation.annotation());
String channelName = resolver.resolveStringValue(operationAnnotation.channelName());

ChannelItem.ChannelItemBuilder channelBuilder = ChannelItem.builder().bindings(chBinding);
channelBuilder = switch (this.asyncAnnotationProvider.getOperationType()) {
Operation operation = buildOperation(operationAnnotation, methodAndAnnotation.method(), channelName);
switch (this.asyncAnnotationProvider.getOperationType()) {
case PUBLISH -> channelBuilder.publish(operation);
case SUBSCRIBE -> channelBuilder.subscribe(operation);};
case SUBSCRIBE -> channelBuilder.subscribe(operation);
}
;

// Only set servers if servers are defined. Avoid setting an emtpy list
// because this would generate empty server entries for each channel in the resulting
// async api.
List<String> servers = AsyncAnnotationScannerUtil.getServers(operationAnnotation, resolver);
if (servers != null && !servers.isEmpty()) {
validateServers(servers, operationId);
validateServers(servers, operation.getOperationId());
channelBuilder.servers(servers);
}
return channelBuilder.build();

ChannelItem channelItem = channelBuilder.build();
return Map.entry(channelName, channelItem);
}

private Object getMessageObject(List<OperationData> operationDataList) {
Set<Message> messages =
operationDataList.stream().map(this::buildMessage).collect(toSet());
private Operation buildOperation(AsyncOperation asyncOperation, Method method, String channelName) {
String description = this.resolver.resolveStringValue(asyncOperation.description());
if (!StringUtils.hasText(description)) {
description = "Auto-generated description";
}

String operationId = channelName + "_" + this.asyncAnnotationProvider.getOperationType().operationName;

return toMessageObjectOrComposition(messages);
Map<String, OperationBinding> operationBinding =
AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
Map<String, Object> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;

return Operation.builder()
.description(description)
.operationId(operationId)
.message(buildMessage(asyncOperation, method))
.bindings(opBinding)
.build();
}

private Message buildMessage(OperationData operationData) {
Class<?> payloadType = operationData.getPayloadType();
String modelName = this.schemasService.register(payloadType);
String headerModelName = this.schemasService.register(operationData.getHeaders());
private Message buildMessage(AsyncOperation operationData, Method method) {
Class<?> payloadType = operationData.payloadType() != Object.class
? operationData.payloadType()
: payloadClassExtractor.extractFrom(method);

/*
* Message information can be obtained via a @AsyncMessage annotation on the method parameter, the Payload
* itself or via the Swagger @Schema annotation on the Payload.
*/
String modelName = this.schemasService.register(payloadType);
AsyncHeaders asyncHeaders = AsyncAnnotationScannerUtil.getAsyncHeaders(operationData, resolver);
String headerModelName = this.schemasService.register(asyncHeaders);

var schema = payloadType.getAnnotation(Schema.class);
String description = schema != null ? schema.description() : null;

Map<String, MessageBinding> messageBinding =
AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);

var builder = Message.builder()
.name(payloadType.getName())
.title(payloadType.getSimpleName())
.description(description)
.payload(PayloadReference.fromModelName(modelName))
.headers(HeaderReference.fromModelName(headerModelName))
.bindings(operationData.getMessageBinding());
.bindings(messageBinding);

// Retrieve the Message information obtained from the @AsyncMessage annotation. These values have higher
// priority
// so if we find them, we need to override the default values.
processAsyncMessageAnnotation(operationData.getMessage(), builder);
// priority so if we find them, we need to override the default values.
AsyncAnnotationScannerUtil.processAsyncMessageAnnotation(builder, operationData.message(), this.resolver);

return builder.build();
}

private void processAsyncMessageAnnotation(Message annotationMessage, Message.MessageBuilder builder) {
if (annotationMessage != null) {
builder.messageId(annotationMessage.getMessageId());

var schemaFormat = annotationMessage.getSchemaFormat() != null
? annotationMessage.getSchemaFormat()
: Message.DEFAULT_SCHEMA_FORMAT;
builder.schemaFormat(schemaFormat);

var annotationMessageDescription = annotationMessage.getDescription();
if (StringUtils.hasText(annotationMessageDescription)) {
builder.description(annotationMessageDescription);
}

var name = annotationMessage.getName();
if (StringUtils.hasText(name)) {
builder.name(name);
}

var title = annotationMessage.getTitle();
if (StringUtils.hasText(title)) {
builder.title(title);
}
}
}

protected List<OperationData> getOperationData() {
return classScanner.scan().stream()
.flatMap(this::getAnnotatedMethods)
.flatMap(this::toOperationData)
.toList();
}

private Stream<Method> getAnnotatedMethods(Class<?> type) {
Class<A> annotationClass = this.asyncAnnotationProvider.getAnnotation();
log.debug("Scanning class \"{}\" for @\"{}\" annotated methods", type.getName(), annotationClass.getName());

return Arrays.stream(type.getDeclaredMethods())
.filter(method -> !method.isBridge())
.filter(method -> AnnotationUtil.findAnnotation(annotationClass, method) != null);
}

private Stream<OperationData> toOperationData(Method method) {
Class<A> annotationClass = this.asyncAnnotationProvider.getAnnotation();
log.debug("Mapping method \"{}\" to channels", method.getName());

Map<String, OperationBinding> operationBindings =
AsyncAnnotationScannerUtil.processOperationBindingFromAnnotation(method, operationBindingProcessors);
Map<String, MessageBinding> messageBindings =
AsyncAnnotationScannerUtil.processMessageBindingFromAnnotation(method, messageBindingProcessors);
Message message = AsyncAnnotationScannerUtil.processMessageFromAnnotation(method);

Set<A> annotations = AnnotationUtil.findAnnotations(annotationClass, method);
return annotations.stream()
.map(annotation -> toOperationData(method, operationBindings, messageBindings, message, annotation));
}

private OperationData toOperationData(
Method method,
Map<String, OperationBinding> operationBindings,
Map<String, MessageBinding> messageBindings,
Message message,
A annotation) {
AsyncOperation op = this.asyncAnnotationProvider.getAsyncOperation(annotation);
Class<?> payloadType =
op.payloadType() != Object.class ? op.payloadType() : payloadClassExtractor.extractFrom(method);
return ConsumerData.builder() // temporarily reuse this data type
.channelName(resolver.resolveStringValue(op.channelName()))
.description(resolver.resolveStringValue(op.description()))
.servers(AsyncAnnotationScannerUtil.getServers(op, resolver))
.headers(AsyncAnnotationScannerUtil.getAsyncHeaders(op, resolver))
.payloadType(payloadType)
.operationBinding(operationBindings)
.messageBinding(messageBindings)
.message(message)
.build();
}

/**
* validates the given list of server names (for a specific operation) with the servers defined in the 'servers' part of
* the current AsyncApi.
Expand Down Expand Up @@ -279,4 +206,6 @@ public interface AsyncAnnotationProvider<A> {

OperationData.OperationType getOperationType();
}

private record MethodAndAnnotation<A>(Method method, A annotation) {}
}
Loading

0 comments on commit 45bb8ab

Please sign in to comment.