Skip to content

Commit

Permalink
Create Disruptor with a ThreadFactory instead of ExecutorService (#570)
Browse files Browse the repository at this point in the history
* Create Disruptor with a ThreadFactory instead of ExecutorService
* Remove ExecutorService from AsyncDisruptorAppender and move it inside the AbstractLogstashTcpSocketAppender
  AsyncDisruptorAppender does use the ExecutorService anymore - move it inside classes that need it

See #567
  • Loading branch information
brenuart authored Jul 25, 2021
1 parent 036f0cc commit 6386d2a
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.net.SocketFactory;
Expand Down Expand Up @@ -273,6 +275,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
private volatile CountDownLatch shutdownLatch;

/**
* The {@link ScheduledExecutorService} used to execute house keeping tasks.
*/
private ScheduledThreadPoolExecutor executorService;

/**
* Event handler responsible for performing the TCP transmission.
*/
Expand Down Expand Up @@ -443,7 +450,7 @@ public Void call() throws Exception {
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
getExecutorService().submit(() -> {
executorService.submit(() -> {
/*
* https://github.com/logstash/logstash-logback-encoder/issues/341
*
Expand Down Expand Up @@ -805,7 +812,7 @@ private synchronized void scheduleKeepAlive(long basedOnNanoTime) {
}
long delay = TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) - (System.nanoTime() - basedOnNanoTime);
try {
keepAliveFuture = getExecutorService().schedule(
keepAliveFuture = executorService.schedule(
keepAliveRunnable,
delay,
TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -837,7 +844,7 @@ private synchronized void scheduleWriteTimeout() {
}
long delay = writeTimeout.getMilliseconds();
try {
writeTimeoutFuture = getExecutorService().scheduleWithFixedDelay(
writeTimeoutFuture = executorService.scheduleWithFixedDelay(
writeTimeoutRunnable,
delay,
delay,
Expand Down Expand Up @@ -950,16 +957,17 @@ public synchronized void start() {
}

if (errorCount == 0) {

encoder.setContext(getContext());
if (!encoder.isStarted()) {
encoder.start();
}

/*
* Increase the core size to handle the reader thread
* Start with an initial core size of 1 to handle the Reader thread
*/
int threadPoolCoreSize = getThreadPoolCoreSize() + 1;
int threadPoolCoreSize = 1;

/*
* Increase the core size to handle the keep alive thread
*/
Expand All @@ -972,7 +980,15 @@ public synchronized void start() {
if (isWriteTimeoutEnabled()) {
threadPoolCoreSize++;
}
setThreadPoolCoreSize(threadPoolCoreSize);
this.executorService = new ScheduledThreadPoolExecutor(
threadPoolCoreSize,
getThreadFactory());

/*
* This ensures that cancelled tasks do not hold up shutdown.
*/
this.executorService.setRemoveOnCancelPolicy(true);

this.shutdownLatch = new CountDownLatch(1);
super.start();
}
Expand All @@ -983,15 +999,29 @@ public synchronized void stop() {
if (!isStarted()) {
return;
}

super.stop();

/*
* Stop waiting to reconnect (if reconnect logic is currently waiting)
*/
this.shutdownLatch.countDown();
super.stop();

/*
* Stop executor service
*/
this.executorService.shutdown();
try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
}
}

protected Future<?> scheduleReaderCallable(Callable<Void> readerCallable) {
return getExecutorService().submit(readerCallable);
return executorService.submit(readerCallable);
}

protected void fireEventSent(Socket socket, Event event, long durationInNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -203,16 +201,6 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
private ThreadFactory threadFactory = new WorkerThreadFactory();

/**
* The {@link ScheduledExecutorService} used to execute the handler task.
*/
private ScheduledThreadPoolExecutor executorService;

/**
* Size of the thread pool to create.
*/
private int threadPoolCoreSize = 1;

/**
* The {@link Disruptor} containing the {@link RingBuffer} onto
* which to publish events.
Expand Down Expand Up @@ -437,21 +425,10 @@ public void start() {
getStatusManager().add(statusListener);
}

this.executorService = new ScheduledThreadPoolExecutor(
getThreadPoolCoreSize(),
this.threadFactory);

/*
* This ensures that cancelled tasks
* (such as the keepAlive task in AbstractLogstashTcpSocketAppender)
* do not hold up shutdown.
*/
this.executorService.setRemoveOnCancelPolicy(true);

this.disruptor = new Disruptor<LogEvent<Event>>(
this.eventFactory,
this.ringBufferSize,
this.executorService,
this.threadFactory,
this.producerType,
this.waitStrategy);

Expand Down Expand Up @@ -504,23 +481,6 @@ public void stop() {
if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Shutdown executor service
*/
this.executorService.shutdown();

try {
this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}


/*
* Notify listeners
*/
fireAppenderStopped();
}

Expand Down Expand Up @@ -645,21 +605,10 @@ protected void setEventTranslator(EventTranslatorOneArg<LogEvent<Event>, Event>
this.eventTranslator = eventTranslator;
}

protected ScheduledExecutorService getExecutorService() {
return executorService;
}

protected Disruptor<LogEvent<Event>> getDisruptor() {
return disruptor;
}

protected int getThreadPoolCoreSize() {
return threadPoolCoreSize;
}
protected void setThreadPoolCoreSize(int threadPoolCoreSize) {
this.threadPoolCoreSize = threadPoolCoreSize;
}

public String getThreadNameFormat() {
return threadNameFormat;
}
Expand Down
Loading

0 comments on commit 6386d2a

Please sign in to comment.