Skip to content

Commit

Permalink
First implementation of Async append blocking until an optional timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
brenuart committed Jul 8, 2021
1 parent b1ec1db commit c02ab5f
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 26 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@
<version>${org.mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
181 changes: 166 additions & 15 deletions src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
import java.util.Arrays;
import java.util.Formatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.util.Duration;

import net.logstash.logback.appender.listener.AppenderListener;
import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.AsyncAppender;
Expand Down Expand Up @@ -250,6 +254,41 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
protected final List<Listener> listeners = new ArrayList<>();

public enum AsyncMode {
/**
* Appender thread is blocked until space is available in the ring buffer
* or the retry timeout expires.
*/
BLOCK,

/**
* Event is dropped when the ring buffer is full
*/
DROP
}
private AsyncMode asyncMode = AsyncMode.DROP;

/**
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when full.
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
*/
private long retryMillis = 100;

/**
* Maximum time to wait for space in the ring buffer before dropping the event.
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
*
* <p>Use {@code -1} for no timeout, i.e. block until space is available.
*/
private Duration retryTimeout = Duration.buildByMilliseconds(1000);

/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);



/**
* Event wrapper object used for each element of the {@link RingBuffer}.
*/
Expand Down Expand Up @@ -422,57 +461,141 @@ public void stop() {
if (!super.isStarted()) {
return;
}

/*
* Don't allow any more events to be appended.
*/
super.stop();


/*
* Shutdown disruptor and executorService
*/
boolean errorDuringShutdown = false;
long remainingTime = Math.max(0, getShutdownGracePeriod().getMilliseconds());
long startTime = System.currentTimeMillis();

try {
this.disruptor.shutdown(1, TimeUnit.MINUTES);
this.disruptor.shutdown(remainingTime, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
addWarn("Some queued events have not been logged due to requested shutdown");
errorDuringShutdown = true;
}

this.executorService.shutdown();

try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
remainingTime = Math.max(0, remainingTime - (System.currentTimeMillis() - startTime));
if (!this.executorService.awaitTermination(remainingTime, TimeUnit.MILLISECONDS)) {
errorDuringShutdown = true;
}
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
errorDuringShutdown = true;
}

if (errorDuringShutdown) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Notify listeners
*/
fireAppenderStopped();
}


@Override
protected void append(Event event) {
long startTime = System.nanoTime();

try {
prepareForDeferredProcessing(event);
} catch (RuntimeException e) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}

if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
} else {
long endTime = System.nanoTime();
if (enqueueEvent(event)) {
// Enqueue success - notify if we had errors previously
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
fireEventAppended(event, endTime - startTime);

// Notify parties
//
fireEventAppended(event, System.nanoTime() - startTime);

} else {
// Log a warning status about the failure
//
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}

// Notify parties
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
}
}

protected void prepareForDeferredProcessing(Event event) {
event.prepareForDeferredProcessing();
}

/**
* Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}.
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
protected boolean enqueueEvent(Event event) {
if (this.asyncMode == AsyncMode.BLOCK) {
return enqueueEventBlock(event);
} else {
return enqueueEventDrop(event);
}
}

/**
* Enqueue the given {@code event} in the ring buffer, blocking until enough space
* is available or the {@link #retryTimeout} expires (if configured).
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
private boolean enqueueEventBlock(Event event) {
long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds();

while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
// Check for timeout
//
if (System.currentTimeMillis() >= timeout) {
return false;
}

// Wait before retry
//
long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout);
if (waitDuration > 0) {
LockSupport.parkNanos(waitDuration * 1_000_000L);
}
}

return true;
}

/**
* Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event
* if the ring buffer is full.
*
* @param event the {@link Event} to enqueue
* @return {@code true} when the even is successfully enqueued in the ring buffer
*/
private boolean enqueueEventDrop(Event event) {
return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event);
}

protected String calculateThreadName() {
List<Object> threadNameFormatParams = getThreadNameFormatParams();
Expand Down Expand Up @@ -581,6 +704,34 @@ public void setWaitStrategyType(String waitStrategyType) {
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
}

public AsyncMode getAsyncMode() {
return asyncMode;
}
public void setAsyncMode(AsyncMode asyncMode) {
this.asyncMode = asyncMode;
}

public long getRetryMillis() {
return retryMillis;
}
public void setRetryMillis(long retryMillis) {
this.retryMillis = retryMillis;
}

public Duration getRetryTimeout() {
return retryTimeout;
}
public void setRetryTimeout(Duration retryTimeout) {
this.retryTimeout = Objects.requireNonNull(retryTimeout);
}

public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod);
}
public Duration getShutdownGracePeriod() {
return shutdownGracePeriod;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Loading

0 comments on commit c02ab5f

Please sign in to comment.