Skip to content

Commit

Permalink
First implementation of Async append blocking until an optional timeo…
Browse files Browse the repository at this point in the history
…ut (#565)

* Add ability block with optional timeout when appending to async appenders
* Wait for the buffer to drain before calling Disruptor#shutdown

Calling Disruptor#shutdown(timeout) while the buffer is not empty causes the disruptor to wait in a busy-loop consumuing a lot of CPU. Instead, wait during the grace period before asking the disruptor to shutdown immediately.

Fixes #559 
Fixes #566 
Fixes #569
  • Loading branch information
brenuart authored Jul 14, 2021
1 parent 903b77a commit c7d9781
Show file tree
Hide file tree
Showing 4 changed files with 431 additions and 42 deletions.
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,38 @@ For example:
</appender>
```

The async appenders will never block the logging thread.
#### RingBuffer Full

The async appenders will by default never block the logging thread.
If the RingBuffer is full (e.g. due to slow network, etc), then events will be dropped.

Alternatively, you can configure the appender to wait until space becomes available instead of dropping the events immediately. This may come in handy when you want to rely on the buffering and the async nature of the appender but don't want to loose any event in case of large logging bursts that exceed the size of the RingBuffer.

The behaviour of the appender when the RingBuffer is controlled by the `appendTimeout` configuration property:

| `appendTimeout` | Behaviour when RingBuffer is full |
|-----------------|------------------------------------------------------------------------|
| `< 0` | disable timeout and wait until space is available |
| `0` | no timeout, give up immediately and drop event (this is the *default*) |
| `> 0` | retry during the specified amount of time |


Logging threads waiting for space in the RingBuffer wake up periodically at a frequency defined by `appendRetryFrequency` (default `50ms`). You may increase this frequency for faster reaction time at the expense of higher CPU usage.

When the appender drops an event, it emits a warning status message every `droppedWarnFrequency` consecutive dropped events. Another status message is emitted when the drop period is over and a first event is succesfully enqueued reporting the total number of events that were dropped.


#### Graceful Shutdown

In order to guarantees that logged messages have had a chance to be processed by asynchronous appenders (including the TCP appender) and ensure background threads have been stopped, you'll need to [cleanly shut down logback](http://logback.qos.ch/manual/configuration.html#stopContext) when your application exits.

When gracefully stopped, async appenders wait until all events in the buffer are processed and the buffer is empty.
The maximum time to wait is configured by the `shutdownGracePeriod` parameter and is set to `1 minute` by default.
Events still in the buffer after this period is elapsed are dropped and the appender is stopped.


#### Wait Strategy

By default, the [`BlockingWaitStrategy`](https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/BlockingWaitStrategy.html)
is used by the worker thread spawned by this appender.
The `BlockingWaitStrategy` minimizes CPU utilization, but results in slower latency and throughput.
Expand Down Expand Up @@ -743,7 +772,6 @@ e.g.<br/><tt>phasedBackoff{10,60,seconds,blocking}</tt></td>
See [AsyncDisruptorAppender](/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java)
for other configuration parameters (such as `ringBufferSize`, `producerType`, `threadNamePrefix`, `daemon`, and `droppedWarnFrequency`)

In order to guarantees that logged messages have had a chance to be processed by asynchronous appenders (including the TCP appender) and ensure background threads have been stopped, you'll need to [cleanly shut down logback](http://logback.qos.ch/manual/configuration.html#stopContext) when your application exits.

### Appender Listeners

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@
<version>${org.mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
208 changes: 171 additions & 37 deletions src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.Arrays;
import java.util.Formatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;

import net.logstash.logback.appender.listener.AppenderListener;
Expand All @@ -38,6 +40,7 @@
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.util.Duration;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
Expand All @@ -47,7 +50,6 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
Expand All @@ -62,14 +64,11 @@
* for more information about the advantages of using a {@link RingBuffer} over a {@link BlockingQueue}.
* <p>
*
* This appender will never block the logging thread, since it uses
* {@link RingBuffer#tryPublishEvent(EventTranslatorOneArg, Object)}
* to publish events (rather than {@link RingBuffer#publishEvent(EventTranslatorOneArg, Object)}).
* <p>
*
* If the RingBuffer is full, and the event cannot be published,
* the event will be dropped. A warning message will be logged to
* logback's context every {@link #droppedWarnFrequency} consecutive dropped events.
* The behavior of the appender when the RingBuffer is full and the event cannot be published
* is controlled by the {@link #appendTimeout} configuration parameter.
* By default the appender drops the event immediately, and emits a warning message every
* {@link #droppedWarnFrequency} consecutive dropped events.
* It can also be configured to wait until some space is available, with or without timeout.
* <p>
*
* A single handler thread will be used to handle the actual handling of the event.
Expand Down Expand Up @@ -97,6 +96,10 @@
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
*/
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
/**
* Time in nanos to wait between drain attempts during the shutdown phase
*/
private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms

protected static final String APPENDER_NAME_FORMAT = "%1$s";
protected static final String THREAD_INDEX_FORMAT = "%2$d";
Expand Down Expand Up @@ -251,6 +254,29 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
protected final List<Listener> listeners = new ArrayList<>();

/**
* Maximum time to wait when appending events to the ring buffer when full before the event
* is dropped. Use the following values:
* <ul>
* <li>{@code -1} to disable timeout and wait until space becomes available.
* <li>{@code 0} for no timeout and drop the event immediately when the buffer is full.
* <li>{@code > 0} to retry during the specified amount of time.
* </ul>
*/
private Duration appendTimeout = Duration.buildByMilliseconds(0);

/**
* Delay between consecutive attempts to append an event in the ring buffer when
* full.
*/
private Duration appendRetryFrequency = Duration.buildByMilliseconds(50);

/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);


/**
* Event wrapper object used for each element of the {@link RingBuffer}.
*/
Expand Down Expand Up @@ -364,9 +390,30 @@ public void onShutdown() {

}

@SuppressWarnings("unchecked")
@Override
public void start() {
int errorCount = 0;

if (this.eventHandler == null) {
addError("No eventHandler was configured for appender " + getName());
errorCount++;
}
if (this.appendRetryFrequency.getMilliseconds() <= 0) {
addError("<appendRetryFrequency> must be > 0");
errorCount++;
}
if (this.ringBufferSize <= 0) {
addError("<ringBufferSize> must be > 0");
errorCount++;
}
if (!isPowerOfTwo(this.ringBufferSize)) {
addError("<ringBufferSize> must be a power of 2");
errorCount++;
}
if (errorCount > 0) {
return;
}

if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener();
statusListener.setLevelValue(Status.WARN);
Expand All @@ -376,11 +423,6 @@ public void start() {
getStatusManager().add(statusListener);
}

if (this.eventHandler == null) {
addError("No eventHandler was configured for appender " + name + ".");
return;
}

this.executorService = new ScheduledThreadPoolExecutor(
getThreadPoolCoreSize(),
this.threadFactory);
Expand Down Expand Up @@ -423,58 +465,123 @@ public void stop() {
if (!super.isStarted()) {
return;
}

/*
* Don't allow any more events to be appended.
*/
super.stop();
try {
this.disruptor.shutdown(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {


/*
* Shutdown Disruptor
*
* Calling Disruptor#shutdown() will wait until all enqueued events are fully processed,
* but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured
* grace period before asking the Disruptor for an immediate shutdown.
*/
long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds();
while (!isRingBufferEmpty() && (System.currentTimeMillis() < deadline)) {
LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN);
}

this.disruptor.halt();


if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Shutdown executor service
*/
this.executorService.shutdown();

try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
// ignored
}


/*
* Notify listeners
*/
fireAppenderStopped();
}


/**
* Test whether the ring buffer is empty or not
*
* @return {@code true} if the ring buffer is empty, {@code false} otherwise
*/
protected boolean isRingBufferEmpty() {
return this.disruptor.getRingBuffer().hasAvailableCapacity(this.getRingBufferSize());
}

@Override
protected void append(Event event) {
long startTime = System.nanoTime();

try {
prepareForDeferredProcessing(event);
} catch (RuntimeException e) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}

if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");

// Add event to the buffer, retrying as many times as allowed by the configuration
//
long deadline = this.appendTimeout.getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.appendTimeout.getMilliseconds();

while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
// Wait before retrying
//
long waitDuration = Math.min(this.appendRetryFrequency.getMilliseconds(), deadline - System.currentTimeMillis());
if (waitDuration > 0) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(waitDuration));

} else {
// Log a warning status about the failure
//
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify listeners
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
return;
}
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
} else {
long endTime = System.nanoTime();
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");

// Give up if appender is stopped meanwhile
//
if (!isStarted()) {
// Same message as if Appender#append is called after the appender is stopped...
addWarn("Attempted to append to non started appender [" + this.getName() + "].");
return;
}
fireEventAppended(event, endTime - startTime);
}

// Enqueue success - notify end of error period
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify listeners
//
fireEventAppended(event, System.nanoTime() - startTime);
}

protected void prepareForDeferredProcessing(Event event) {
event.prepareForDeferredProcessing();
}


protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
return String.format(threadNameFormat, threadNameFormatParams.toArray());
Expand Down Expand Up @@ -586,7 +693,28 @@ public void setWaitStrategy(WaitStrategy waitStrategy) {
public void setWaitStrategyType(String waitStrategyType) {
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
}


public Duration getAppendRetryFrequency() {
return appendRetryFrequency;
}
public void setAppendRetryFrequency(Duration appendRetryFrequency) {
this.appendRetryFrequency = Objects.requireNonNull(appendRetryFrequency);
}

public Duration getAppendTimeout() {
return appendTimeout;
}
public void setAppendTimeout(Duration appendTimeout) {
this.appendTimeout = Objects.requireNonNull(appendTimeout);
}

public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod);
}
public Duration getShutdownGracePeriod() {
return shutdownGracePeriod;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down Expand Up @@ -629,4 +757,10 @@ public boolean isAddDefaultStatusListener() {
public void setAddDefaultStatusListener(boolean addDefaultStatusListener) {
this.addDefaultStatusListener = addDefaultStatusListener;
}


private static boolean isPowerOfTwo(int x) {
/* First x in the below expression is for the case when x is 0 */
return x != 0 && ((x & (x - 1)) == 0);
}
}
Loading

0 comments on commit c7d9781

Please sign in to comment.