diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java
index 0904f2fe7a..a9c844b765 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java
@@ -126,7 +126,7 @@ public class AmqpAppender extends AbstractAppender {
/**
* The template.
*/
- private final RabbitTemplate rabbitTemplate = new RabbitTemplate();
+ private RabbitTemplate rabbitTemplate;
/**
* Where LoggingEvents are queued to send.
@@ -250,18 +250,12 @@ public static Builder newBuilder() {
* Submit the required number of senders into the pool.
*/
private void startSenders() {
- this.rabbitTemplate.setConnectionFactory(this.manager.connectionFactory);
if (this.manager.async) {
+ this.manager.senderPool = Executors.newCachedThreadPool();
for (int i = 0; i < this.manager.senderPoolSize; i++) {
this.manager.senderPool.submit(new EventSender());
}
}
- else if (this.manager.maxSenderRetries > 0) {
- RetryTemplate retryTemplate = new RetryTemplate();
- RetryPolicy retryPolicy = new SimpleRetryPolicy(this.manager.maxSenderRetries);
- retryTemplate.setRetryPolicy(retryPolicy);
- this.rabbitTemplate.setRetryTemplate(retryTemplate);
- }
}
@Override
@@ -287,6 +281,22 @@ protected Message postProcessMessageBeforeSend(Message message, Event event) {
}
protected void sendEvent(Event event, Map, ?> properties) {
+ synchronized (this) {
+ if (this.rabbitTemplate == null) {
+ if (this.manager.activateOptions()) {
+ this.rabbitTemplate = new RabbitTemplate(this.manager.connectionFactory);
+ if (!this.manager.async && this.manager.maxSenderRetries > 0) {
+ RetryTemplate retryTemplate = new RetryTemplate();
+ RetryPolicy retryPolicy = new SimpleRetryPolicy(this.manager.maxSenderRetries);
+ retryTemplate.setRetryPolicy(retryPolicy);
+ this.rabbitTemplate.setRetryTemplate(retryTemplate);
+ }
+ }
+ else {
+ throw new AmqpException("Cannot create template");
+ }
+ }
+ }
LogEvent logEvent = event.getEvent();
String name = logEvent.getLoggerName();
Level level = logEvent.getLevel();
@@ -645,7 +655,7 @@ protected AmqpManager(LoggerContext loggerContext, String name) {
super(loggerContext, name);
}
- private boolean activateOptions() {
+ boolean activateOptions() {
ConnectionFactory rabbitConnectionFactory = createRabbitConnectionFactory();
if (rabbitConnectionFactory != null) {
Assert.state(this.applicationId != null, "applicationId is required");
@@ -667,7 +677,6 @@ private boolean activateOptions() {
this.clientConnectionProperties);
}
setUpExchangeDeclaration();
- this.senderPool = Executors.newCachedThreadPool();
return true;
}
return false;
@@ -1182,11 +1191,8 @@ public AmqpAppender build() {
}
AmqpAppender appender = buildInstance(this.name, this.filter, theLayout, this.ignoreExceptions, manager, eventQueue);
- if (manager.activateOptions()) {
- appender.startSenders();
- return appender;
- }
- return null;
+ appender.startSenders();
+ return appender;
}
/**
diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java
index fc9e7a1295..9c2321b2ec 100644
--- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java
+++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
@@ -45,6 +46,7 @@
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
+import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
@@ -173,31 +175,32 @@ public void testSaslConfig() {
Logger logger = LogManager.getLogger("sasl");
AmqpAppender appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("sasl1");
- assertThat(TestUtils.getPropertyValue(appender, "manager.connectionFactory.rabbitConnectionFactory",
- ConnectionFactory.class).getSaslConfig())
+ assertThat(RabbitUtils.stringToSaslConfig(TestUtils.getPropertyValue(appender, "manager.saslConfig",
+ String.class), mock(ConnectionFactory.class)))
.isInstanceOf(DefaultSaslConfig.class)
.hasFieldOrPropertyWithValue("mechanism", "PLAIN");
appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("sasl2");
- assertThat(TestUtils.getPropertyValue(appender, "manager.connectionFactory.rabbitConnectionFactory",
- ConnectionFactory.class).getSaslConfig())
+ assertThat(RabbitUtils.stringToSaslConfig(TestUtils.getPropertyValue(appender, "manager.saslConfig",
+ String.class), mock(ConnectionFactory.class)))
.isInstanceOf(DefaultSaslConfig.class)
.hasFieldOrPropertyWithValue("mechanism", "EXTERNAL");
appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("sasl3");
- assertThat(TestUtils.getPropertyValue(appender, "manager.connectionFactory.rabbitConnectionFactory",
- ConnectionFactory.class).getSaslConfig())
+ assertThat(RabbitUtils.stringToSaslConfig(TestUtils.getPropertyValue(appender, "manager.saslConfig",
+ String.class), mock(ConnectionFactory.class)))
.isInstanceOf(JDKSaslConfig.class);
appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("sasl4");
- assertThat(TestUtils.getPropertyValue(appender, "manager.connectionFactory.rabbitConnectionFactory",
- ConnectionFactory.class).getSaslConfig())
+ assertThat(RabbitUtils.stringToSaslConfig(TestUtils.getPropertyValue(appender, "manager.saslConfig",
+ String.class), mock(ConnectionFactory.class)))
.isInstanceOf(CRDemoMechanism.CRDemoSaslConfig.class);
}
@Test
- public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() {
+ public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() throws InterruptedException {
Logger logger = LogManager.getLogger("default_queue_logger");
+ logger.info("test");
AmqpAppender appender = (AmqpAppender) TestUtils.getPropertyValue(logger, "context.configuration.appenders",
Map.class).get("rabbitmq_default_queue");
@@ -207,6 +210,12 @@ public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() {
assertThat(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class)).isTrue();
assertThat(events.getClass()).isEqualTo(LinkedBlockingQueue.class);
+ BlockingQueue> queue = (BlockingQueue>) events;
+ int n = 0;
+ while (n++ < 100 && queue.size() > 0) {
+ Thread.sleep(100);
+ }
+ assertThat(queue).hasSize(0);
}
@Test
diff --git a/spring-rabbit/src/test/resources/log4j2-amqp-appender.xml b/spring-rabbit/src/test/resources/log4j2-amqp-appender.xml
index f826e48a61..61de69eb20 100644
--- a/spring-rabbit/src/test/resources/log4j2-amqp-appender.xml
+++ b/spring-rabbit/src/test/resources/log4j2-amqp-appender.xml
@@ -14,7 +14,8 @@
connectionName="log4j2Appender"
clientConnectionProperties="foo:bar,baz:qux"
async="false"
- senderPoolSize="3" maxSenderRetries="5"
+ senderPoolSize="3"
+ maxSenderRetries="5"
bufferSize="10">
@@ -23,6 +24,8 @@
host="localhost" port="5672" user="guest" password="guest" applicationId="testAppId" charset="UTF-8"
routingKeyPattern="%X{applicationId}.%c.%p"
exchange="log4j2Test_default_queue" deliveryMode="NON_PERSISTENT"
+ async="true"
+ senderPoolSize="2"
addMdcAsHeaders="true">