From 5cc1963d37a046efb1b77964cf8767a742a78ceb Mon Sep 17 00:00:00 2001 From: JonZhang Date: Thu, 25 Nov 2021 15:55:21 +0800 Subject: [PATCH] fix:The program cannot stop after shutdown() --- README.md | 1 - pom.xml | 2 +- .../com/atask/DefaultThreadPoolExecutor.java | 28 +++++++++---------- src/main/java/com/atask/TaskEngine.java | 8 +++--- src/test/java/com/atasktest/ATaskTest.java | 3 +- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index f9ab027..7b03ab3 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,6 @@ TaskEngine engine = new TaskEngine.Builder() .queueCapacity(20) // 设置线程池的任务拒绝策略 .rejectedExecutionHandler(RejectedExecutionHandler) - .completedTaskHandler(handler)// 设置对已完成任务的处理回调 .build(); // 可通过 getRunningTasks() 方法获取当前正在执行的任务 ``` diff --git a/pom.xml b/pom.xml index 635da1b..2aa3357 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.github.jonzhang3 aTask - 1.1.0 + 1.1.1 ATask A simple multi-purpose asynchronous task execution framework. diff --git a/src/main/java/com/atask/DefaultThreadPoolExecutor.java b/src/main/java/com/atask/DefaultThreadPoolExecutor.java index e92bf5b..767ca53 100644 --- a/src/main/java/com/atask/DefaultThreadPoolExecutor.java +++ b/src/main/java/com/atask/DefaultThreadPoolExecutor.java @@ -26,7 +26,7 @@ final class DefaultThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicLong completedTaskNumber = new AtomicLong(0); private final Deque runningQueue = new ConcurrentLinkedDeque<>(); private final Map runningTaskGrous = new ConcurrentHashMap<>(); - private final LinkedBlockingDeque completedQueue = new LinkedBlockingDeque<>(); +// private final LinkedBlockingDeque completedQueue = new LinkedBlockingDeque<>(); private final CompletedTaskHandler completedTaskHandler; @@ -39,7 +39,7 @@ public DefaultThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long kee } else { this.completedTaskHandler = completedTaskHandler; } - startHandleCompletedTask(); +// startHandleCompletedTask(); } public void submit(Task task) { @@ -91,22 +91,22 @@ protected void afterExecute(Runnable r, Throwable t) { CustomFutureTask futureTask = (CustomFutureTask) r; Task task = futureTask.getTask(); runningQueue.remove(task); - completedQueue.offer(task); +// completedQueue.offer(task); completedTaskNumber.incrementAndGet(); } } - private void startHandleCompletedTask() { - new Thread(() -> { - while (true) { - try { - Task take = completedQueue.take(); - DefaultThreadPoolExecutor.this.completedTaskHandler.handle(take); - } catch (Throwable ignore) { - } - } - }).start(); - } +// private void startHandleCompletedTask() { +// new Thread(() -> { +// while (true) { +// try { +// Task take = completedQueue.take(); +// DefaultThreadPoolExecutor.this.completedTaskHandler.handle(take); +// } catch (Throwable ignore) { +// } +// } +// }).start(); +// } // 获取正在运行的任务,包含任务组中的任务 protected final List getRunningTasks() { diff --git a/src/main/java/com/atask/TaskEngine.java b/src/main/java/com/atask/TaskEngine.java index 041ce7f..b746a3a 100644 --- a/src/main/java/com/atask/TaskEngine.java +++ b/src/main/java/com/atask/TaskEngine.java @@ -183,10 +183,10 @@ public Builder rejectedExecutionHandler(RejectedExecutionHandler handler) { return this; } - public Builder completedTaskHandler(CompletedTaskHandler handler) { - this.completedTaskHandler = handler; - return this; - } +// public Builder completedTaskHandler(CompletedTaskHandler handler) { +// this.completedTaskHandler = handler; +// return this; +// } public TaskEngine build() { BlockingQueue queue = createQueue(this.queueCapacity); diff --git a/src/test/java/com/atasktest/ATaskTest.java b/src/test/java/com/atasktest/ATaskTest.java index 9ac0947..6beb8b7 100644 --- a/src/test/java/com/atasktest/ATaskTest.java +++ b/src/test/java/com/atasktest/ATaskTest.java @@ -22,14 +22,13 @@ public class ATaskTest { @Before public void before() { engine = new TaskEngine.Builder() - .completedTaskHandler(System.out::println) .build(); } @After public void after() { engine.shutdown(); - engine = null; +// engine = null; } @Test