Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Disruptor with a ThreadFactory instead of ExecutorService #570

Merged
merged 5 commits into from
Jul 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.net.SocketFactory;
Expand Down Expand Up @@ -273,6 +275,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
private volatile CountDownLatch shutdownLatch;

/**
* The {@link ScheduledExecutorService} used to execute house keeping tasks.
*/
private ScheduledThreadPoolExecutor executorService;

/**
* Event handler responsible for performing the TCP transmission.
*/
Expand Down Expand Up @@ -443,7 +450,7 @@ public Void call() throws Exception {
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
getExecutorService().submit(() -> {
executorService.submit(() -> {
/*
* https://github.com/logstash/logstash-logback-encoder/issues/341
*
Expand Down Expand Up @@ -805,7 +812,7 @@ private synchronized void scheduleKeepAlive(long basedOnNanoTime) {
}
long delay = TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) - (System.nanoTime() - basedOnNanoTime);
try {
keepAliveFuture = getExecutorService().schedule(
keepAliveFuture = executorService.schedule(
keepAliveRunnable,
delay,
TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -837,7 +844,7 @@ private synchronized void scheduleWriteTimeout() {
}
long delay = writeTimeout.getMilliseconds();
try {
writeTimeoutFuture = getExecutorService().scheduleWithFixedDelay(
writeTimeoutFuture = executorService.scheduleWithFixedDelay(
writeTimeoutRunnable,
delay,
delay,
Expand Down Expand Up @@ -950,16 +957,17 @@ public synchronized void start() {
}

if (errorCount == 0) {

encoder.setContext(getContext());
if (!encoder.isStarted()) {
encoder.start();
}

/*
* Increase the core size to handle the reader thread
* Start with an initial core size of 1 to handle the Reader thread
*/
int threadPoolCoreSize = getThreadPoolCoreSize() + 1;
int threadPoolCoreSize = 1;

/*
philsttr marked this conversation as resolved.
Show resolved Hide resolved
* Increase the core size to handle the keep alive thread
*/
Expand All @@ -972,7 +980,15 @@ public synchronized void start() {
if (isWriteTimeoutEnabled()) {
threadPoolCoreSize++;
}
setThreadPoolCoreSize(threadPoolCoreSize);
this.executorService = new ScheduledThreadPoolExecutor(
threadPoolCoreSize,
getThreadFactory());

/*
* This ensures that cancelled tasks do not hold up shutdown.
*/
this.executorService.setRemoveOnCancelPolicy(true);

this.shutdownLatch = new CountDownLatch(1);
super.start();
}
Expand All @@ -983,15 +999,29 @@ public synchronized void stop() {
if (!isStarted()) {
return;
}

super.stop();

/*
* Stop waiting to reconnect (if reconnect logic is currently waiting)
*/
this.shutdownLatch.countDown();
super.stop();

/*
* Stop executor service
*/
this.executorService.shutdown();
try {
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
} catch (InterruptedException e) {
addWarn("Some queued events have not been logged due to requested shutdown", e);
}
}

protected Future<?> scheduleReaderCallable(Callable<Void> readerCallable) {
return getExecutorService().submit(readerCallable);
return executorService.submit(readerCallable);
}

protected void fireEventSent(Socket socket, Event event, long durationInNanos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -203,16 +201,6 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
*/
private ThreadFactory threadFactory = new WorkerThreadFactory();

/**
* The {@link ScheduledExecutorService} used to execute the handler task.
*/
private ScheduledThreadPoolExecutor executorService;

/**
* Size of the thread pool to create.
*/
private int threadPoolCoreSize = 1;

/**
* The {@link Disruptor} containing the {@link RingBuffer} onto
* which to publish events.
Expand Down Expand Up @@ -437,21 +425,10 @@ public void start() {
getStatusManager().add(statusListener);
}

this.executorService = new ScheduledThreadPoolExecutor(
getThreadPoolCoreSize(),
this.threadFactory);

/*
* This ensures that cancelled tasks
* (such as the keepAlive task in AbstractLogstashTcpSocketAppender)
* do not hold up shutdown.
*/
this.executorService.setRemoveOnCancelPolicy(true);

this.disruptor = new Disruptor<LogEvent<Event>>(
this.eventFactory,
this.ringBufferSize,
this.executorService,
this.threadFactory,
this.producerType,
this.waitStrategy);

Expand Down Expand Up @@ -504,23 +481,6 @@ public void stop() {
if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}


/*
* Shutdown executor service
*/
this.executorService.shutdown();

try {
this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}


/*
* Notify listeners
*/
fireAppenderStopped();
}

Expand Down Expand Up @@ -645,21 +605,10 @@ protected void setEventTranslator(EventTranslatorOneArg<LogEvent<Event>, Event>
this.eventTranslator = eventTranslator;
}

protected ScheduledExecutorService getExecutorService() {
return executorService;
}

philsttr marked this conversation as resolved.
Show resolved Hide resolved
protected Disruptor<LogEvent<Event>> getDisruptor() {
return disruptor;
}

protected int getThreadPoolCoreSize() {
return threadPoolCoreSize;
}
protected void setThreadPoolCoreSize(int threadPoolCoreSize) {
this.threadPoolCoreSize = threadPoolCoreSize;
}

public String getThreadNameFormat() {
return threadNameFormat;
}
Expand Down
Loading