From c7d97813481e42cd3552980a59c063a758ef748b Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Wed, 14 Jul 2021 21:11:14 +0200 Subject: [PATCH] First implementation of Async append blocking until an optional timeout (#565) * Add ability block with optional timeout when appending to async appenders * Wait for the buffer to drain before calling Disruptor#shutdown Calling Disruptor#shutdown(timeout) while the buffer is not empty causes the disruptor to wait in a busy-loop consumuing a lot of CPU. Instead, wait during the grace period before asking the disruptor to shutdown immediately. Fixes #559 Fixes #566 Fixes #569 --- README.md | 32 ++- pom.xml | 6 + .../appender/AsyncDisruptorAppender.java | 208 +++++++++++++--- .../appender/AsyncDisruptorAppenderTest.java | 227 +++++++++++++++++- 4 files changed, 431 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index e8d3c470..ad630033 100644 --- a/README.md +++ b/README.md @@ -625,9 +625,38 @@ For example: ``` -The async appenders will never block the logging thread. +#### RingBuffer Full + +The async appenders will by default never block the logging thread. If the RingBuffer is full (e.g. due to slow network, etc), then events will be dropped. +Alternatively, you can configure the appender to wait until space becomes available instead of dropping the events immediately. This may come in handy when you want to rely on the buffering and the async nature of the appender but don't want to loose any event in case of large logging bursts that exceed the size of the RingBuffer. + +The behaviour of the appender when the RingBuffer is controlled by the `appendTimeout` configuration property: + +| `appendTimeout` | Behaviour when RingBuffer is full | +|-----------------|------------------------------------------------------------------------| +| `< 0` | disable timeout and wait until space is available | +| `0` | no timeout, give up immediately and drop event (this is the *default*) | +| `> 0` | retry during the specified amount of time | + + +Logging threads waiting for space in the RingBuffer wake up periodically at a frequency defined by `appendRetryFrequency` (default `50ms`). You may increase this frequency for faster reaction time at the expense of higher CPU usage. + +When the appender drops an event, it emits a warning status message every `droppedWarnFrequency` consecutive dropped events. Another status message is emitted when the drop period is over and a first event is succesfully enqueued reporting the total number of events that were dropped. + + +#### Graceful Shutdown + +In order to guarantees that logged messages have had a chance to be processed by asynchronous appenders (including the TCP appender) and ensure background threads have been stopped, you'll need to [cleanly shut down logback](http://logback.qos.ch/manual/configuration.html#stopContext) when your application exits. + +When gracefully stopped, async appenders wait until all events in the buffer are processed and the buffer is empty. +The maximum time to wait is configured by the `shutdownGracePeriod` parameter and is set to `1 minute` by default. +Events still in the buffer after this period is elapsed are dropped and the appender is stopped. + + +#### Wait Strategy + By default, the [`BlockingWaitStrategy`](https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/BlockingWaitStrategy.html) is used by the worker thread spawned by this appender. The `BlockingWaitStrategy` minimizes CPU utilization, but results in slower latency and throughput. @@ -743,7 +772,6 @@ e.g.
phasedBackoff{10,60,seconds,blocking} See [AsyncDisruptorAppender](/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java) for other configuration parameters (such as `ringBufferSize`, `producerType`, `threadNamePrefix`, `daemon`, and `droppedWarnFrequency`) -In order to guarantees that logged messages have had a chance to be processed by asynchronous appenders (including the TCP appender) and ensure background threads have been stopped, you'll need to [cleanly shut down logback](http://logback.qos.ch/manual/configuration.html#stopContext) when your application exits. ### Appender Listeners diff --git a/pom.xml b/pom.xml index 66e8de35..5f78408d 100644 --- a/pom.xml +++ b/pom.xml @@ -211,6 +211,12 @@ ${org.mockito.version} test + + org.awaitility + awaitility + 4.1.0 + test + diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index e246111a..0d6525c1 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Formatter; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import net.logstash.logback.appender.listener.AppenderListener; @@ -38,6 +40,7 @@ import ch.qos.logback.core.spi.DeferredProcessingAware; import ch.qos.logback.core.status.OnConsoleStatusListener; import ch.qos.logback.core.status.Status; +import ch.qos.logback.core.util.Duration; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; @@ -47,7 +50,6 @@ import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -62,14 +64,11 @@ * for more information about the advantages of using a {@link RingBuffer} over a {@link BlockingQueue}. *

* - * This appender will never block the logging thread, since it uses - * {@link RingBuffer#tryPublishEvent(EventTranslatorOneArg, Object)} - * to publish events (rather than {@link RingBuffer#publishEvent(EventTranslatorOneArg, Object)}). - *

- * - * If the RingBuffer is full, and the event cannot be published, - * the event will be dropped. A warning message will be logged to - * logback's context every {@link #droppedWarnFrequency} consecutive dropped events. + * The behavior of the appender when the RingBuffer is full and the event cannot be published + * is controlled by the {@link #appendTimeout} configuration parameter. + * By default the appender drops the event immediately, and emits a warning message every + * {@link #droppedWarnFrequency} consecutive dropped events. + * It can also be configured to wait until some space is available, with or without timeout. *

* * A single handler thread will be used to handle the actual handling of the event. @@ -97,6 +96,10 @@ * @param type of event ({@link ILoggingEvent} or {@link IAccessEvent}). */ public abstract class AsyncDisruptorAppender> extends UnsynchronizedAppenderBase { + /** + * Time in nanos to wait between drain attempts during the shutdown phase + */ + private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms protected static final String APPENDER_NAME_FORMAT = "%1$s"; protected static final String THREAD_INDEX_FORMAT = "%2$d"; @@ -251,6 +254,29 @@ public abstract class AsyncDisruptorAppender listeners = new ArrayList<>(); + /** + * Maximum time to wait when appending events to the ring buffer when full before the event + * is dropped. Use the following values: + *

+ */ + private Duration appendTimeout = Duration.buildByMilliseconds(0); + + /** + * Delay between consecutive attempts to append an event in the ring buffer when + * full. + */ + private Duration appendRetryFrequency = Duration.buildByMilliseconds(50); + + /** + * How long to wait for in-flight events during shutdown. + */ + private Duration shutdownGracePeriod = Duration.buildByMinutes(1); + + /** * Event wrapper object used for each element of the {@link RingBuffer}. */ @@ -364,9 +390,30 @@ public void onShutdown() { } - @SuppressWarnings("unchecked") @Override public void start() { + int errorCount = 0; + + if (this.eventHandler == null) { + addError("No eventHandler was configured for appender " + getName()); + errorCount++; + } + if (this.appendRetryFrequency.getMilliseconds() <= 0) { + addError(" must be > 0"); + errorCount++; + } + if (this.ringBufferSize <= 0) { + addError(" must be > 0"); + errorCount++; + } + if (!isPowerOfTwo(this.ringBufferSize)) { + addError(" must be a power of 2"); + errorCount++; + } + if (errorCount > 0) { + return; + } + if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) { LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener(); statusListener.setLevelValue(Status.WARN); @@ -376,11 +423,6 @@ public void start() { getStatusManager().add(statusListener); } - if (this.eventHandler == null) { - addError("No eventHandler was configured for appender " + name + "."); - return; - } - this.executorService = new ScheduledThreadPoolExecutor( getThreadPoolCoreSize(), this.threadFactory); @@ -423,58 +465,123 @@ public void stop() { if (!super.isStarted()) { return; } + /* * Don't allow any more events to be appended. */ super.stop(); - try { - this.disruptor.shutdown(1, TimeUnit.MINUTES); - } catch (TimeoutException e) { + + + /* + * Shutdown Disruptor + * + * Calling Disruptor#shutdown() will wait until all enqueued events are fully processed, + * but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured + * grace period before asking the Disruptor for an immediate shutdown. + */ + long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds(); + while (!isRingBufferEmpty() && (System.currentTimeMillis() < deadline)) { + LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN); + } + + this.disruptor.halt(); + + + if (!isRingBufferEmpty()) { addWarn("Some queued events have not been logged due to requested shutdown"); } + + /* + * Shutdown executor service + */ this.executorService.shutdown(); try { - if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) { - addWarn("Some queued events have not been logged due to requested shutdown"); - } + this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - addWarn("Some queued events have not been logged due to requested shutdown", e); + // ignored } + + + /* + * Notify listeners + */ fireAppenderStopped(); } + + /** + * Test whether the ring buffer is empty or not + * + * @return {@code true} if the ring buffer is empty, {@code false} otherwise + */ + protected boolean isRingBufferEmpty() { + return this.disruptor.getRingBuffer().hasAvailableCapacity(this.getRingBufferSize()); + } + @Override protected void append(Event event) { long startTime = System.nanoTime(); + try { prepareForDeferredProcessing(event); } catch (RuntimeException e) { - addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e); + addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e); } - if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { - long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet(); - if ((consecutiveDropped) % this.droppedWarnFrequency == 1) { - addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]"); + + // Add event to the buffer, retrying as many times as allowed by the configuration + // + long deadline = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds(); + + while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { + // Wait before retrying + // + long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), deadline - System.currentTimeMillis()); + if (waitDuration > 0) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(waitDuration)); + + } else { + // Log a warning status about the failure + // + long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet(); + if ((consecutiveDropped % this.droppedWarnFrequency) == 1) { + addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]"); + } + + // Notify listeners + // + fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION); + return; } - fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION); - } else { - long endTime = System.nanoTime(); - long consecutiveDropped = this.consecutiveDroppedCount.get(); - if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) { - addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]"); + + // Give up if appender is stopped meanwhile + // + if (!isStarted()) { + // Same message as if Appender#append is called after the appender is stopped... + addWarn("Attempted to append to non started appender [" + this.getName() + "]."); + return; } - fireEventAppended(event, endTime - startTime); } + + // Enqueue success - notify end of error period + // + long consecutiveDropped = this.consecutiveDroppedCount.get(); + if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) { + addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]"); + } + + // Notify listeners + // + fireEventAppended(event, System.nanoTime() - startTime); } protected void prepareForDeferredProcessing(Event event) { event.prepareForDeferredProcessing(); } - - + + protected String calculateThreadName() { List threadNameFormatParams = getThreadNameFormatParams(); return String.format(threadNameFormat, threadNameFormatParams.toArray()); @@ -586,7 +693,28 @@ public void setWaitStrategy(WaitStrategy waitStrategy) { public void setWaitStrategyType(String waitStrategyType) { setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType)); } - + + public Duration getAppendRetryFrequency() { + return appendRetryFrequency; + } + public void setAppendRetryFrequency(Duration appendRetryFrequency) { + this.appendRetryFrequency = Objects.requireNonNull(appendRetryFrequency); + } + + public Duration getAppendTimeout() { + return appendTimeout; + } + public void setAppendTimeout(Duration appendTimeout) { + this.appendTimeout = Objects.requireNonNull(appendTimeout); + } + + public void setShutdownGracePeriod(Duration shutdownGracePeriod) { + this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod); + } + public Duration getShutdownGracePeriod() { + return shutdownGracePeriod; + } + public ThreadFactory getThreadFactory() { return threadFactory; } @@ -629,4 +757,10 @@ public boolean isAddDefaultStatusListener() { public void setAddDefaultStatusListener(boolean addDefaultStatusListener) { this.addDefaultStatusListener = addDefaultStatusListener; } + + + private static boolean isPowerOfTwo(int x) { + /* First x in the below expression is for the case when x is 0 */ + return x != 0 && ((x & (x - 1)) == 0); + } } diff --git a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java index a060caf9..ea97615f 100644 --- a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java @@ -16,6 +16,7 @@ package net.logstash.logback.appender; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -23,10 +24,17 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -34,6 +42,7 @@ import net.logstash.logback.appender.listener.AppenderListener; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.BasicStatusManager; import ch.qos.logback.core.Context; import ch.qos.logback.core.status.Status; import ch.qos.logback.core.status.StatusManager; @@ -46,6 +55,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; @@ -64,8 +74,8 @@ public class AsyncDisruptorAppenderTest { @Mock(lenient = true) private Context context; - @Mock - private StatusManager statusManager; + @Spy + private StatusManager statusManager = new BasicStatusManager(); @Mock private ILoggingEvent event1; @@ -76,6 +86,9 @@ public class AsyncDisruptorAppenderTest { @Mock private AppenderListener listener; + private ExecutorService executorService = Executors.newCachedThreadPool(); + + @BeforeEach public void setup() { when(context.getStatusManager()).thenReturn(statusManager); @@ -87,6 +100,7 @@ public void setup() { @AfterEach public void tearDown() { appender.stop(); + executorService.shutdownNow(); } @SuppressWarnings("unchecked") @@ -198,7 +212,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Assertions.assertTrue(statusCaptor.getValue().getMessage().startsWith("Dropped")); eventHandlerWaiter.countDown(); - } @SuppressWarnings("unchecked") @@ -220,4 +233,212 @@ public void testEventHandlerThrowsException() throws Exception { } + /* + * Appender is configured to block indefinitely when ring buffer is full + */ + @Test + public void appendBlockingWhenFull() { + CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event blocks the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount() == 1); // wait until the handler is actually invoked before going any further + + /* + * Publishing the second event is blocked until the first is released (buffer full) + */ + Future future = execute(() -> appender.append(event2)); + + /* + * Release the handler -> both events are now unblocked + */ + eventHandlerWaiter.countDown(); + + await().untilAsserted(() -> assertThat(eventHandler.getEvents()).containsExactly(event1, event2)); + assertThat(future).isDone(); + assertThat(statusManager.getCopyOfStatusList()).isEmpty(); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + /* + * Appender configured to block with a timeout -> assert appending threads are blocked for the + * configured timeout. + */ + @Test + public void appendBlockingWithTimeout() throws Exception { + // Block for the specified timeout + final Duration timeout = Duration.ofMillis(150); + + final CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAppendTimeout(toLogback(timeout)); + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event blocks the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount() == 1); // wait until the handler is actually invoked before going any further + + + /* + * Second event is blocked until the first is released (buffer full) - but no more than the configured timeout + */ + Future future = execute(() -> appender.append(event2)); + + // wait for the timeout + await().atLeast(timeout).and().atMost(timeout.plusMillis(100)).until(future::isDone); + + // a WARN status is logged + assertThat(statusManager.getCopyOfStatusList()) + .hasSize(1) + .allMatch(s -> s.getMessage().startsWith("Dropped")); + + // listeners invoked with appendFailed + verify(listener).eventAppendFailed(eq(appender), eq(event2), any()); + + + /* + * Unlock the handler and assert only the first event went through + */ + eventHandlerWaiter.countDown(); + await().untilAsserted(() -> assertThat(eventHandler.getEvents()).containsExactly(event1)); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + /* + * Appender configured to block until space is available -> assert threads blocked waiting for free space are + * released when the appender is stopped + */ + @Test + public void appendBlockingReleasedOnStop() { + final CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAppendTimeout(toLogback(Duration.ofMillis(-1))); // block until space is available + appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events... + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event will block the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount() == 1); // wait until the handler is actually invoked before going any further + + /* + * Publishing the second event is blocked until the first is released (buffer full) + */ + Future future = execute(() -> appender.append(event2)); + + /* + * Stop appender + */ + appender.stop(); + + // appending thread is released + await().until(future::isDone); + + // no events handled + assertThat(eventHandler.getEvents()).isEmpty(); + + // no listener invoked + verify(listener, times(0)).eventAppendFailed(eq(appender), eq(event2), any()); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + @Test + public void configRingBufferSize_negative() { + appender.setRingBufferSize(-1); + appender.start(); + + assertThat(appender.isStarted()).isFalse(); + + assertThat(statusManager.getCopyOfStatusList()) + .anyMatch(s -> s.getMessage().startsWith(" must be > 0") && s.getLevel() == Status.ERROR); + } + + + @Test + public void configRingBufferSize_powerOfTwo() { + appender.setRingBufferSize(3); + appender.start(); + + assertThat(appender.isStarted()).isFalse(); + + assertThat(statusManager.getCopyOfStatusList()) + .anyMatch(s -> s.getMessage().startsWith(" must be a power of 2") && s.getLevel() == Status.ERROR); + } + + + @Test + public void configAppendRetryFrequency() { + appender.setAppendRetryFrequency(toLogback(Duration.ofMillis(-1))); + appender.start(); + + assertThat(appender.isStarted()).isFalse(); + + assertThat(statusManager.getCopyOfStatusList()) + .anyMatch(s -> s.getMessage().startsWith(" must be > 0") && s.getLevel() == Status.ERROR); + } + + + + // -------------------------------------------------------------------------------------------- + + + private Future execute(Runnable runnable) { + return executorService.submit(runnable); + } + + private static class TestEventHandler implements EventHandler> { + private final List events = new ArrayList<>(); + private final CountDownLatch waiter; + + TestEventHandler(CountDownLatch waiter) { + this.waiter = waiter; + } + @Override + public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { + if (waiter != null) { + waiter.await(); + } + this.events.add(event.event); + } + + public List getEvents() { + return events; + } + } + + private static ch.qos.logback.core.util.Duration toLogback(Duration duration) { + return ch.qos.logback.core.util.Duration.buildByMilliseconds(duration.toMillis()); + } }