Skip to content

Commit

Permalink
fix: create new methods in BindingFactory instead of changing the exi…
Browse files Browse the repository at this point in the history
…sting ones, move bean ref logic to KafkaBeanRefHelper, implement BindingContext for passing Method and Class context to factories
  • Loading branch information
ruskaof committed Dec 2, 2024
1 parent c0dc484 commit 3667cef
Show file tree
Hide file tree
Showing 26 changed files with 289 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@
import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;

import java.util.Map;

public interface BindingFactory<T> {
default String getChannelId(T annotation, Class<?> component) {
return ReferenceUtil.toValidId(getChannelName(annotation, component));

// maintainer note: replaced by #getChannelId(T, BindingContext)
default String getChannelId(T annotation) {
return ReferenceUtil.toValidId(getChannelName(annotation));
}

String getChannelName(T annotation, Class<?> component);
// maintainer note: replaced by #getChannelName(T, BindingContext)
String getChannelName(T annotation);

default String getChannelId(T annotation, BindingContext bindingContext) {
return getChannelId(annotation);
}

default String getChannelName(T annotation, BindingContext bindingContext) {
return getChannelName(annotation);
}

Map<String, ChannelBinding> buildChannelBinding(T annotation);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.bindings.common;

import java.lang.reflect.Method;

public record BindingContext(Class<?> annotatedClass, Method annotatedMethod) {
public BindingContext {
if (annotatedClass == null && annotatedMethod == null) {
throw new IllegalArgumentException("Either annotatedClass or annotatedMethod must be non-null");
}
}

public Class<?> getClassContext() {
if (annotatedClass != null) {
return annotatedClass;
}
if (annotatedMethod != null) {
return annotatedMethod.getDeclaringClass();
}

throw new IllegalStateException("Either annotatedClass or annotatedMethod must be non-null");
}

public static BindingContext ofAnnotatedMethod(Method annotatedMethod) {
return new BindingContext(null, annotatedMethod);
}

public static BindingContext ofAnnotatedClass(Class<?> annotatedClass) {
return new BindingContext(annotatedClass, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.channels.ChannelsInClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationScannerUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
Expand Down Expand Up @@ -42,16 +43,17 @@ public List<ChannelObject> scan(Class<?> clazz) {
private Stream<ChannelObject> mapClassToChannel(
Class<?> component, Set<MethodAndAnnotation<MethodAnnotation>> annotatedMethods) {
ClassAnnotation classAnnotation = AnnotationUtil.findFirstAnnotationOrThrow(classAnnotationClass, component);
BindingContext bindingContext = BindingContext.ofAnnotatedClass(component);
Set<Method> methods =
annotatedMethods.stream().map(MethodAndAnnotation::method).collect(Collectors.toSet());
Map<String, Message> messages = new HashMap<>(springAnnotationMessagesService.buildMessages(
classAnnotation, component, methods, SpringAnnotationMessagesService.MessageType.CHANNEL));
classAnnotation, bindingContext, methods, SpringAnnotationMessagesService.MessageType.CHANNEL));

return mapClassToChannel(classAnnotation, component, messages);
return mapClassToChannel(classAnnotation, bindingContext, messages);
}

private Stream<ChannelObject> mapClassToChannel(
ClassAnnotation classAnnotation, Class<?> component, Map<String, Message> messages) {
return Stream.of(springAnnotationChannelService.buildChannel(classAnnotation, component, messages));
ClassAnnotation classAnnotation, BindingContext bindingContext, Map<String, Message> messages) {
return Stream.of(springAnnotationChannelService.buildChannel(classAnnotation, bindingContext, messages));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.channels.ChannelsInClassScanner;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationScannerUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
Expand Down Expand Up @@ -48,7 +49,8 @@ private ChannelObject mapMethodToChannel(MethodAndAnnotation<MethodAnnotation> m
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadSchema, headerSchema);
Map<String, Message> messages = Map.of(message.getMessageId(), MessageReference.toComponentMessage(message));

return springAnnotationChannelService.buildChannel(
annotation, method.method().getDeclaringClass(), messages);
BindingContext bindingContext = BindingContext.ofAnnotatedMethod(method.method());

return springAnnotationChannelService.buildChannel(annotation, bindingContext, messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.springwolf.asyncapi.v3.model.channel.message.Message;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -18,10 +19,11 @@ public class SpringAnnotationChannelService<Annotation extends java.lang.annotat

private final BindingFactory<Annotation> bindingFactory;

public ChannelObject buildChannel(Annotation annotation, Class<?> component, Map<String, Message> messages) {
public ChannelObject buildChannel(
Annotation annotation, BindingContext bindingContext, Map<String, Message> messages) {
Map<String, ChannelBinding> channelBinding = bindingFactory.buildChannelBinding(annotation);
Map<String, ChannelBinding> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String channelName = bindingFactory.getChannelName(annotation, component);
String channelName = bindingFactory.getChannelName(annotation, bindingContext);
return ChannelObject.builder()
.channelId(ReferenceUtil.toValidId(channelName))
.address(channelName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.components.ComponentsService;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.headers.AsyncHeadersBuilder;
import io.github.springwolf.core.asyncapi.scanners.common.headers.HeaderClassExtractor;
import io.github.springwolf.core.asyncapi.scanners.common.headers.HeaderSchemaObjectMerger;
Expand Down Expand Up @@ -43,13 +44,16 @@ public enum MessageType {
}

public Map<String, MessageReference> buildMessages(
ClassAnnotation classAnnotation, Class<?> component, Set<Method> methods, MessageType messageType) {
ClassAnnotation classAnnotation,
BindingContext bindingContext,
Set<Method> methods,
MessageType messageType) {
Set<MessageObject> messages = methods.stream()
.map(method -> buildMessage(classAnnotation, method))
.collect(toSet());

if (messageType == MessageType.OPERATION) {
String channelId = bindingFactory.getChannelName(classAnnotation, component);
String channelId = bindingFactory.getChannelName(classAnnotation, bindingContext);
return toOperationsMessagesMap(channelId, messages);
}
return toMessagesMap(messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.message.SpringAnnotationMessageService;
import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject;
import lombok.RequiredArgsConstructor;
Expand All @@ -26,13 +27,13 @@ public class SpringAnnotationOperationService<MethodAnnotation extends Annotatio

public Operation buildOperation(
MethodAnnotation annotation,
Class<?> component,
BindingContext bindingContext,
PayloadSchemaObject payloadType,
SchemaObject headerSchema) {
MessageObject message = springAnnotationMessageService.buildMessage(annotation, payloadType, headerSchema);
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(annotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelId = bindingFactory.getChannelId(annotation, component);
String channelId = bindingFactory.getChannelId(annotation, bindingContext);

return Operation.builder()
.action(OperationAction.RECEIVE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.message.SpringAnnotationMessagesService;
import lombok.RequiredArgsConstructor;

Expand All @@ -23,17 +24,18 @@ public class SpringAnnotationOperationsService<ClassAnnotation extends Annotatio
private final BindingFactory<ClassAnnotation> bindingFactory;
private final SpringAnnotationMessagesService<ClassAnnotation> springAnnotationMessagesService;

public Operation buildOperation(ClassAnnotation classAnnotation, Class<?> component, Set<Method> methods) {
public Operation buildOperation(
ClassAnnotation classAnnotation, BindingContext bindingContext, Set<Method> methods) {
var messages = springAnnotationMessagesService.buildMessages(
classAnnotation, component, methods, SpringAnnotationMessagesService.MessageType.OPERATION);
return buildOperation(classAnnotation, component, messages);
classAnnotation, bindingContext, methods, SpringAnnotationMessagesService.MessageType.OPERATION);
return buildOperation(classAnnotation, bindingContext, messages);
}

private Operation buildOperation(
ClassAnnotation classAnnotation, Class<?> component, Map<String, MessageReference> messages) {
ClassAnnotation classAnnotation, BindingContext bindingContext, Map<String, MessageReference> messages) {
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(classAnnotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelName = bindingFactory.getChannelName(classAnnotation, component);
String channelName = bindingFactory.getChannelName(classAnnotation, bindingContext);
String channelId = ReferenceUtil.toValidId(channelName);

return Operation.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.github.springwolf.asyncapi.v3.model.operation.Operation;
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationScannerUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.MethodAndAnnotation;
Expand Down Expand Up @@ -42,14 +43,16 @@ public Stream<Map.Entry<String, Operation>> scan(Class<?> clazz) {
private Stream<Map.Entry<String, Operation>> mapClassToOperation(
Class<?> component, Set<MethodAndAnnotation<MethodAnnotation>> annotatedMethods) {
ClassAnnotation classAnnotation = AnnotationUtil.findFirstAnnotationOrThrow(classAnnotationClass, component);
BindingContext bindingContext = BindingContext.ofAnnotatedClass(component);

String channelId = bindingFactory.getChannelId(classAnnotation, component);
String channelId = bindingFactory.getChannelId(classAnnotation, bindingContext);
String operationId =
StringUtils.joinWith("_", channelId, OperationAction.RECEIVE.type, component.getSimpleName());

Set<Method> methods =
annotatedMethods.stream().map(MethodAndAnnotation::method).collect(Collectors.toSet());
Operation operation = springAnnotationOperationsService.buildOperation(classAnnotation, component, methods);
Operation operation =
springAnnotationOperationsService.buildOperation(classAnnotation, bindingContext, methods);
annotatedMethods.forEach(
method -> customizers.forEach(customizer -> customizer.customize(operation, method.method())));
return Stream.of(Map.entry(operationId, operation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationScannerUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
import io.github.springwolf.core.asyncapi.scanners.common.annotation.MethodAndAnnotation;
Expand Down Expand Up @@ -42,17 +43,17 @@ public Stream<Map.Entry<String, Operation>> scan(Class<?> clazz) {

private Map.Entry<String, Operation> mapMethodToOperation(MethodAndAnnotation<MethodAnnotation> method) {
MethodAnnotation annotation = AnnotationUtil.findFirstAnnotationOrThrow(methodAnnotationClass, method.method());
BindingContext bindingContext = BindingContext.ofAnnotatedMethod(method.method());

String channelId =
bindingFactory.getChannelId(annotation, method.method().getDeclaringClass());
String channelId = bindingFactory.getChannelId(annotation, bindingContext);
String operationId = StringUtils.joinWith(
"_", channelId, OperationAction.RECEIVE.type, method.method().getName());

PayloadSchemaObject payloadSchema = payloadMethodParameterService.extractSchema(method.method());
SchemaObject headerSchema = headerClassExtractor.extractHeader(method.method(), payloadSchema);

Operation operation = springAnnotationOperationService.buildOperation(
annotation, method.method().getDeclaringClass(), payloadSchema, headerSchema);
annotation, bindingContext, payloadSchema, headerSchema);
customizers.forEach(customizer -> customizer.customize(operation, method.method()));
return Map.entry(operationId, operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ static class TestBindingFactory implements BindingFactory<TestClassListener> {
Map.of(CHANNEL_ID, new TestBindingFactory.TestOperationBinding());

@Override
public String getChannelName(TestClassListener annotation, Class<?> component) {
public String getChannelName(TestClassListener annotation) {
return CHANNEL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ static class TestBindingFactory implements BindingFactory<TestChannelListener> {
Map.of(CHANNEL_ID, new TestBindingFactory.TestOperationBinding());

@Override
public String getChannelName(TestChannelListener annotation, Class<?> component) {
public String getChannelName(TestChannelListener annotation) {
return CHANNEL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.github.springwolf.asyncapi.v3.model.operation.OperationAction;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingFactory;
import io.github.springwolf.core.asyncapi.scanners.bindings.common.BindingContext;
import io.github.springwolf.core.asyncapi.scanners.common.message.SpringAnnotationMessageService;
import io.github.springwolf.core.asyncapi.scanners.common.payload.PayloadSchemaObject;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -60,7 +61,10 @@ void scan_componentHasTestListenerMethods() throws NoSuchMethodException {

// when
Operation operations = springAnnotationOperationService.buildOperation(
annotation, ClassWithTestListenerAnnotation.class, payloadSchemaName, headerSchema);
annotation,
BindingContext.ofAnnotatedClass(ClassWithTestListenerAnnotation.class),
payloadSchemaName,
headerSchema);

// then
Operation expectedOperation = Operation.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.examples.kafka.consumers;

import io.github.springwolf.examples.kafka.dtos.AnotherPayloadDto;
import io.github.springwolf.examples.kafka.dtos.ExamplePayloadDto;
import io.github.springwolf.examples.kafka.producers.AnotherProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@RequiredArgsConstructor
@Slf4j
public class ExampleBeanRefKafkaListener {

@SuppressWarnings("unused")
public final String TOPIC_NAME = "example-topic-from-bean-ref";

private final AnotherProducer anotherProducer;

@KafkaListener(topics = "#{myListener.TOPIC_NAME}", beanRef = "myListener")
public void receiveExamplePayload(
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.OFFSET) Integer offset,
@Payload ExamplePayloadDto payload) {
log.info("Received new message in example-topic: {}", payload.toString());

AnotherPayloadDto example = new AnotherPayloadDto();
example.setExample(payload);
example.setFoo("foo");

anotherProducer.sendMessage(example);
}

@KafkaListener(topicPattern = "another-topic", groupId = "example-group-id", batch = "true")
public void receiveAnotherPayloadBatched(List<AnotherPayloadDto> payloads) {
log.info("Received new message in another-topic: {}", payloads.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public AmqpBindingFactory(
}

@Override
public String getChannelName(RabbitListener annotation, Class<?> component) {
public String getChannelName(RabbitListener annotation) {
return RabbitListenerUtil.getChannelName(annotation, stringValueResolver);
}

@Override
public String getChannelId(RabbitListener annotation, Class<?> component) {
public String getChannelId(RabbitListener annotation) {
return RabbitListenerUtil.getChannelId(annotation, stringValueResolver);
}

Expand Down
Loading

0 comments on commit 3667cef

Please sign in to comment.