Skip to content

Commit

Permalink
Wait for the buffer to drain before calling Disruptor#shutdown
Browse files Browse the repository at this point in the history
Calling Disruptor#shutdown(timeout) while the buffer is not empty causes the disruptor to wait in a busy-loop consommuing a lot of CPU. Instead, wait during the grace period before asking the disruptor to shutdown immediately.

Related issue: logfellow#566
  • Loading branch information
brenuart committed Jul 10, 2021
1 parent c02ab5f commit d80f1e9
Showing 1 changed file with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
*/
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
/**
* Time in nanos to wait between drain attempts during the shutdown phase
*/
private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms

protected static final String APPENDER_NAME_FORMAT = "%1$s";
protected static final String THREAD_INDEX_FORMAT = "%2$d";
Expand Down Expand Up @@ -469,41 +473,56 @@ public void stop() {


/*
* Shutdown disruptor and executorService
* Shutdown Disruptor
*
* Calling Disruptor#shutdown() will wait until all enqueued events are fully processed,
* but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured
* grace period before asking the Disruptor for an immediate shutdown.
*/
boolean errorDuringShutdown = false;
long remainingTime = Math.max(0, getShutdownGracePeriod().getMilliseconds());
long startTime = System.currentTimeMillis();
long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds();
while( !isRingBufferEmpty() && (System.currentTimeMillis()<deadline)) {
LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN);
}

try {
this.disruptor.shutdown(remainingTime, TimeUnit.MILLISECONDS);
this.disruptor.shutdown(0, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
errorDuringShutdown = true;
// ignored
}

if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Shutdown executor service
*/
this.executorService.shutdown();

try {
remainingTime = Math.max(0, remainingTime - (System.currentTimeMillis() - startTime));
if (!this.executorService.awaitTermination(remainingTime, TimeUnit.MILLISECONDS)) {
errorDuringShutdown = true;
}
this.executorService.awaitTermination(System.currentTimeMillis()-deadline, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
errorDuringShutdown = true;
// ignored
}

if (errorDuringShutdown) {
addWarn("Some queued events have not been logged due to requested shutdown");
}



/*
* Notify listeners
*/
fireAppenderStopped();
}


/**
* Test whether the ring buffer is empty or not
*
* @return {@code true} if the ring buffer is empty, {@code false} otherwise
*/
protected boolean isRingBufferEmpty() {
return this.disruptor.getRingBuffer().hasAvailableCapacity(this.getRingBufferSize());
}

@Override
protected void append(Event event) {
long startTime = System.nanoTime();
Expand Down

0 comments on commit d80f1e9

Please sign in to comment.