From c02ab5f653d628be919e86cc89cbd54f88c7d055 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Wed, 7 Jul 2021 18:09:35 +0200 Subject: [PATCH] First implementation of Async append blocking until an optional timeout See #559 --- pom.xml | 6 + .../appender/AsyncDisruptorAppender.java | 181 ++++++++++++++-- .../appender/AsyncDisruptorAppenderTest.java | 201 +++++++++++++++++- 3 files changed, 362 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 8be022a0..d8dc95f3 100644 --- a/pom.xml +++ b/pom.xml @@ -211,6 +211,12 @@ ${org.mockito.version} test + + org.awaitility + awaitility + 4.1.0 + test + diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index a4a5730b..bef2d01c 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Formatter; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -26,9 +27,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import ch.qos.logback.core.status.OnConsoleStatusListener; import ch.qos.logback.core.status.Status; +import ch.qos.logback.core.util.Duration; + import net.logstash.logback.appender.listener.AppenderListener; import ch.qos.logback.access.spi.IAccessEvent; import ch.qos.logback.classic.AsyncAppender; @@ -250,6 +254,41 @@ public abstract class AsyncDisruptorAppender listeners = new ArrayList<>(); + public enum AsyncMode { + /** + * Appender thread is blocked until space is available in the ring buffer + * or the retry timeout expires. + */ + BLOCK, + + /** + * Event is dropped when the ring buffer is full + */ + DROP + } + private AsyncMode asyncMode = AsyncMode.DROP; + + /** + * Delay (in millis) between consecutive attempts to append an event in the ring buffer when full. + * Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}. + */ + private long retryMillis = 100; + + /** + * Maximum time to wait for space in the ring buffer before dropping the event. + * Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}. + * + *

Use {@code -1} for no timeout, i.e. block until space is available. + */ + private Duration retryTimeout = Duration.buildByMilliseconds(1000); + + /** + * How long to wait for in-flight events during shutdown. + */ + private Duration shutdownGracePeriod = Duration.buildByMinutes(1); + + + /** * Event wrapper object used for each element of the {@link RingBuffer}. */ @@ -422,50 +461,82 @@ public void stop() { if (!super.isStarted()) { return; } + /* * Don't allow any more events to be appended. */ super.stop(); + + + /* + * Shutdown disruptor and executorService + */ + boolean errorDuringShutdown = false; + long remainingTime = Math.max(0, getShutdownGracePeriod().getMilliseconds()); + long startTime = System.currentTimeMillis(); + try { - this.disruptor.shutdown(1, TimeUnit.MINUTES); + this.disruptor.shutdown(remainingTime, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - addWarn("Some queued events have not been logged due to requested shutdown"); + errorDuringShutdown = true; } this.executorService.shutdown(); try { - if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) { - addWarn("Some queued events have not been logged due to requested shutdown"); + remainingTime = Math.max(0, remainingTime - (System.currentTimeMillis() - startTime)); + if (!this.executorService.awaitTermination(remainingTime, TimeUnit.MILLISECONDS)) { + errorDuringShutdown = true; } } catch (InterruptedException e) { - addWarn("Some queued events have not been logged due to requested shutdown", e); + errorDuringShutdown = true; } + + if (errorDuringShutdown) { + addWarn("Some queued events have not been logged due to requested shutdown"); + } + + + /* + * Notify listeners + */ fireAppenderStopped(); } + @Override protected void append(Event event) { long startTime = System.nanoTime(); + try { prepareForDeferredProcessing(event); } catch (RuntimeException e) { - addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e); + addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e); } - if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { - long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet(); - if ((consecutiveDropped) % this.droppedWarnFrequency == 1) { - addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]"); - } - fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION); - } else { - long endTime = System.nanoTime(); + if (enqueueEvent(event)) { + // Enqueue success - notify if we had errors previously + // long consecutiveDropped = this.consecutiveDroppedCount.get(); if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) { addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]"); } - fireEventAppended(event, endTime - startTime); + + // Notify parties + // + fireEventAppended(event, System.nanoTime() - startTime); + + } else { + // Log a warning status about the failure + // + long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet(); + if ((consecutiveDropped % this.droppedWarnFrequency) == 1) { + addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]"); + } + + // Notify parties + // + fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION); } } @@ -473,6 +544,58 @@ protected void prepareForDeferredProcessing(Event event) { event.prepareForDeferredProcessing(); } + /** + * Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}. + * + * @param event the {@link Event} to enqueue + * @return {@code true} when the even is successfully enqueued in the ring buffer + */ + protected boolean enqueueEvent(Event event) { + if (this.asyncMode == AsyncMode.BLOCK) { + return enqueueEventBlock(event); + } else { + return enqueueEventDrop(event); + } + } + + /** + * Enqueue the given {@code event} in the ring buffer, blocking until enough space + * is available or the {@link #retryTimeout} expires (if configured). + * + * @param event the {@link Event} to enqueue + * @return {@code true} when the even is successfully enqueued in the ring buffer + */ + private boolean enqueueEventBlock(Event event) { + long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds(); + + while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { + // Check for timeout + // + if (System.currentTimeMillis() >= timeout) { + return false; + } + + // Wait before retry + // + long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout); + if (waitDuration > 0) { + LockSupport.parkNanos(waitDuration * 1_000_000L); + } + } + + return true; + } + + /** + * Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event + * if the ring buffer is full. + * + * @param event the {@link Event} to enqueue + * @return {@code true} when the even is successfully enqueued in the ring buffer + */ + private boolean enqueueEventDrop(Event event) { + return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event); + } protected String calculateThreadName() { List threadNameFormatParams = getThreadNameFormatParams(); @@ -581,6 +704,34 @@ public void setWaitStrategyType(String waitStrategyType) { setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType)); } + public AsyncMode getAsyncMode() { + return asyncMode; + } + public void setAsyncMode(AsyncMode asyncMode) { + this.asyncMode = asyncMode; + } + + public long getRetryMillis() { + return retryMillis; + } + public void setRetryMillis(long retryMillis) { + this.retryMillis = retryMillis; + } + + public Duration getRetryTimeout() { + return retryTimeout; + } + public void setRetryTimeout(Duration retryTimeout) { + this.retryTimeout = Objects.requireNonNull(retryTimeout); + } + + public void setShutdownGracePeriod(Duration shutdownGracePeriod) { + this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod); + } + public Duration getShutdownGracePeriod() { + return shutdownGracePeriod; + } + public ThreadFactory getThreadFactory() { return threadFactory; } diff --git a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java index af92ceb4..7d024760 100644 --- a/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java @@ -16,6 +16,7 @@ package net.logstash.logback.appender; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -23,16 +24,30 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import net.logstash.logback.appender.AsyncDisruptorAppender.AsyncMode; import net.logstash.logback.appender.AsyncDisruptorAppender.LogEvent; import net.logstash.logback.appender.listener.AppenderListener; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.BasicStatusManager; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.status.Status; +import ch.qos.logback.core.status.StatusManager; +import com.lmax.disruptor.EventHandler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -41,24 +56,18 @@ import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Context; -import ch.qos.logback.core.status.Status; -import ch.qos.logback.core.status.StatusManager; - -import com.lmax.disruptor.EventHandler; - @ExtendWith(MockitoExtension.class) public class AsyncDisruptorAppenderTest { private static final int VERIFICATION_TIMEOUT = 1000 * 30; @InjectMocks - private AsyncDisruptorAppender> appender = new AsyncDisruptorAppender>() {}; + private AsyncDisruptorAppender> appender = new AsyncDisruptorAppender>() { }; @Mock private EventHandler> eventHandler; @@ -66,8 +75,8 @@ public class AsyncDisruptorAppenderTest { @Mock(lenient = true) private Context context; - @Mock - private StatusManager statusManager; + @Spy + private StatusManager statusManager = new BasicStatusManager(); @Mock private ILoggingEvent event1; @@ -200,7 +209,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Assertions.assertTrue(statusCaptor.getValue().getMessage().startsWith("Dropped")); eventHandlerWaiter.countDown(); - } @SuppressWarnings("unchecked") @@ -222,4 +230,175 @@ public void testEventHandlerThrowsException() throws Exception { } + /* + * Appender is configured to block indefinitely when ring buffer is full + */ + @Test + public void appendBlockingWhenFull() { + CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAsyncMode(AsyncMode.BLOCK); + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event blocks the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further + + /* + * Publishing the second event is blocked until the first is released (buffer full) + */ + Future future = execute(() -> appender.append(event2)); + + /* + * Release the handler -> both events are now unblocked + */ + eventHandlerWaiter.countDown(); + + await().untilAsserted(() -> assertThat(eventHandler.getEvents()).containsExactly(event1, event2)); + assertThat(future).isDone(); + assertThat(statusManager.getCopyOfStatusList()).isEmpty(); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + /* + * Appender configured in async "block" mode -> assert appending threads are blocked for the + * configured timeout. + */ + @Test + public void appendBlockingWithTimeout() throws Exception { + // Block for the specified timeout + final Duration timeout = Duration.ofMillis(150); + + final CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAsyncMode(AsyncMode.BLOCK); + appender.setRetryTimeout(toLogback(timeout)); + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event blocks the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further + + + /* + * Second event is blocked until the first is released (buffer full) - but no more than the configured timeout + */ + Future future = execute(() -> appender.append(event2)); + + // wait for the timeout + await().atLeast(timeout).and().atMost(timeout.plusMillis(100)).until(future::isDone); + + // a WARN status is logged + assertThat(statusManager.getCopyOfStatusList()) + .hasSize(1) + .allMatch(s -> s.getMessage().startsWith("Dropped")); + + // listeners invoked with appendFailed + verify(listener).eventAppendFailed(eq(appender), eq(event2), any()); + + + /* + * Unlock the handler and assert only the first event went through + */ + eventHandlerWaiter.countDown(); + await().untilAsserted( () -> assertThat(eventHandler.getEvents()).containsExactly(event1) ); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + /* + * Appender configured in async "block" mode -> assert threads blocked waiting for free space are + * released when the appender is stopped + */ + @Test + public void appendBlockingReleasedOnStop() { + final CountDownLatch eventHandlerWaiter = new CountDownLatch(1); + + try { + TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter); + appender.setRingBufferSize(1); + appender.setAsyncMode(AsyncMode.BLOCK); + appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events... + appender.setEventHandler(eventHandler); + appender.start(); + + /* + * First event will block the ring buffer until eventHandlerWaiter is released + */ + appender.append(event1); + await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further + + /* + * Publishing the second event is blocked until the first is released (buffer full) + */ + Future future = execute(() -> appender.append(event2)); + + /* + * Stop appender + */ + appender.stop(); + + // appending thread is released + await().until(future::isDone); + + // no events handled + assertThat(eventHandler.getEvents()).isEmpty(); + + // no listener invoked + verify(listener, times(0)).eventAppendFailed(eq(appender), eq(event2), any()); + + } finally { + eventHandlerWaiter.countDown(); + } + } + + + private ExecutorService executorService = Executors.newCachedThreadPool(); + + private Future execute(Runnable runnable) { + return executorService.submit(runnable); + } + + private static class TestEventHandler implements EventHandler> { + private final List events = new ArrayList<>(); + private final CountDownLatch waiter; + + public TestEventHandler(CountDownLatch waiter) { + this.waiter = waiter; + } + @Override + public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception { + if (waiter != null ) { + waiter.await(); + } + this.events.add(event.event); + } + + public List getEvents() { + return events; + } + } + + private static ch.qos.logback.core.util.Duration toLogback(Duration duration) { + return ch.qos.logback.core.util.Duration.buildByMilliseconds(duration.toMillis()); + } }