Skip to content

Commit

Permalink
GH-3844: Rework messaging annotation with @bean (#3877)
Browse files Browse the repository at this point in the history
* GH-3844: Rework messaging annotation with @bean

Fixes #3844

* Make `MessagingAnnotationPostProcessor` as a `BeanDefinitionRegistryPostProcessor`
to process bean definitions as early as possible and register respective messaging
components at that early phase
* Make bean definitions parsing logic optional for AOT and native mode since beans
have bean parsed during AOT building phase
* Introduce a `BeanDefinitionPropertiesMapper` for easier mapping
of the annotation attributes to the target `BeanDefinition`
* Remove `@Bean`-related logic from method parsing process
* Change the logic for `@Bean`-based endpoint bean names:
since we don't deal with methods on the bean definition phase, then method name
does not make sense.
It even may mislead if we `@Bean` name is based on a method by default, so we end up
with duplicated word in the target endpoint bean name.
Now we don't
* Fix `configuration.adoc` respectively for a new endpoint bean name logic
* In the end the new logic in the `AbstractMethodAnnotationPostProcessor`
is similar to XML parsers: we feed annotation attributes to the
`AbstractStandardMessageHandlerFactoryBean` impls

* * Fix language in docs and exception message
  • Loading branch information
artembilan authored Aug 22, 2022
1 parent c1dbb02 commit ca138c0
Show file tree
Hide file tree
Showing 33 changed files with 1,121 additions and 804 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
import java.util.concurrent.TimeUnit;

import org.aopalliance.intercept.MethodInterceptor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
Expand All @@ -36,7 +33,7 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -59,39 +56,40 @@
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
*
* @since 5.0.1
*
*/
@RunWith(SpringRunner.class)
@SpringJUnitConfig
@DirtiesContext
@RabbitAvailable({
AmqpMessageSourceIntegrationTests.DSL_QUEUE,
AmqpMessageSourceIntegrationTests.INTERCEPT_QUEUE,
AmqpMessageSourceIntegrationTests.DLQ,
AmqpMessageSourceIntegrationTests.NOAUTOACK_QUEUE })
public class AmqpMessageSourceIntegrationTests {

private static final String DSL_QUEUE = "AmqpMessageSourceIntegrationTests";
static final String DSL_QUEUE = "AmqpMessageSourceIntegrationTests";

private static final String QUEUE_WITH_DLQ = "AmqpMessageSourceIntegrationTests.withDLQ";
static final String QUEUE_WITH_DLQ = "AmqpMessageSourceIntegrationTests.withDLQ";

private static final String DLQ = QUEUE_WITH_DLQ + ".dlq";
static final String DLQ = QUEUE_WITH_DLQ + ".dlq";

private static final String INTERCEPT_QUEUE = "AmqpMessageSourceIntegrationTests.channel";
static final String INTERCEPT_QUEUE = "AmqpMessageSourceIntegrationTests.channel";

private static final String NOAUTOACK_QUEUE = "AmqpMessageSourceIntegrationTests.noAutoAck";

@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues(DSL_QUEUE, INTERCEPT_QUEUE, DLQ,
NOAUTOACK_QUEUE);
static final String NOAUTOACK_QUEUE = "AmqpMessageSourceIntegrationTests.noAutoAck";

@Autowired
private Config config;

@Autowired
private ConfigurableApplicationContext context;

@Before
@BeforeEach
public void before() {
RabbitAdmin admin = new RabbitAdmin(this.config.connectionFactory());
Queue queue = QueueBuilder.nonDurable(QUEUE_WITH_DLQ)
Expand All @@ -103,16 +101,11 @@ public void before() {
this.context.start();
}

@After
@AfterEach
public void after() {
this.context.stop();
}

@AfterClass
public static void afterClass() {
brokerRunning.removeTestQueues(QUEUE_WITH_DLQ);
}

@Test
public void testImplicitNackThenAck() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.config.connectionFactory());
Expand Down Expand Up @@ -256,7 +249,7 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw

@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
return new CachingConnectionFactory("localhost");
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,8 +95,7 @@ private void annotated(ConfigurableListableBeanFactory beanFactory,
List<Map<String, String>> idempotentEndpointsMapping, String beanName, BeanDefinition beanDefinition)
throws LinkageError {

if (beanDefinition.getSource() instanceof MethodMetadata) {
MethodMetadata beanMethod = (MethodMetadata) beanDefinition.getSource();
if (beanDefinition.getSource() instanceof MethodMetadata beanMethod) {
String annotationType = IdempotentReceiver.class.getName();
if (beanMethod.isAnnotated(annotationType)) { // NOSONAR never null
Object value = beanMethod.getAnnotationAttributes(annotationType).get("value"); // NOSONAR
Expand All @@ -120,13 +119,7 @@ private void annotated(ConfigurableListableBeanFactory beanFactory,

String endpoint = beanName;
if (!MessageHandler.class.isAssignableFrom(returnType)) {
/*
MessageHandler beans, populated from @Bean methods, have a complex id,
including @Configuration bean name, method name and the Messaging annotation name.
The following pattern matches the bean name, regardless of the annotation name.
*/
endpoint = beanDefinition.getFactoryBeanName() + "." + beanName +
".*" + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
endpoint = beanName + ".*" + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
}

String[] interceptors = (String[]) value;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Map;

import org.springframework.expression.Expression;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.router.AbstractMappingMessageRouter;
Expand Down Expand Up @@ -49,6 +50,10 @@ public class RouterFactoryBean extends AbstractStandardMessageHandlerFactoryBean

private String defaultOutputChannelName;

private String prefix;

private String suffix;

private Boolean resolutionRequired;

private Boolean applySequence;
Expand All @@ -63,6 +68,14 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) {
this.defaultOutputChannelName = defaultOutputChannelName;
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}

public void setResolutionRequired(Boolean resolutionRequired) {
this.resolutionRequired = resolutionRequired;
}
Expand Down Expand Up @@ -106,7 +119,7 @@ protected MessageHandler createMethodInvokingHandler(Object targetObject, String

@Override
protected MessageHandler createExpressionEvaluatingHandler(Expression expression) {
return this.configureRouter(new ExpressionEvaluatingRouter(expression));
return configureRouter(new ExpressionEvaluatingRouter(expression));
}

protected AbstractMappingMessageRouter createMethodInvokingRouter(Object targetObject, String targetMethodName) {
Expand All @@ -116,34 +129,25 @@ protected AbstractMappingMessageRouter createMethodInvokingRouter(Object targetO
}

protected AbstractMessageRouter configureRouter(AbstractMessageRouter router) {
if (this.defaultOutputChannel != null) {
router.setDefaultOutputChannel(this.defaultOutputChannel);
}
if (this.defaultOutputChannelName != null) {
router.setDefaultOutputChannelName(this.defaultOutputChannelName);
}
if (getSendTimeout() != null) {
router.setSendTimeout(getSendTimeout());
}
if (this.applySequence != null) {
router.setApplySequence(this.applySequence);
}
if (this.ignoreSendFailures != null) {
router.setIgnoreSendFailures(this.ignoreSendFailures);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.defaultOutputChannel, router::setDefaultOutputChannel)
.acceptIfNotNull(this.defaultOutputChannelName, router::setDefaultOutputChannelName)
.acceptIfNotNull(getSendTimeout(), router::setSendTimeout)
.acceptIfNotNull(this.applySequence, router::setApplySequence)
.acceptIfNotNull(this.ignoreSendFailures, router::setIgnoreSendFailures);

if (router instanceof AbstractMappingMessageRouter) {
this.configureMappingRouter((AbstractMappingMessageRouter) router);
configureMappingRouter((AbstractMappingMessageRouter) router);
}
return router;
}

protected void configureMappingRouter(AbstractMappingMessageRouter router) {
if (this.channelMappings != null) {
router.setChannelMappings(this.channelMappings);
}
if (this.resolutionRequired != null) {
router.setResolutionRequired(this.resolutionRequired);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.channelMappings, router::setChannelMappings)
.acceptIfNotNull(this.resolutionRequired, router::setResolutionRequired)
.acceptIfHasText(this.prefix, router::setPrefix)
.acceptIfHasText(this.suffix, router::setSuffix);
}

@Override
Expand Down
Loading

0 comments on commit ca138c0

Please sign in to comment.