Skip to content

Commit

Permalink
fix:The program cannot stop after shutdown()
Browse files Browse the repository at this point in the history
  • Loading branch information
JonZhang3 committed Nov 25, 2021
1 parent 4cdd282 commit 5cc1963
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 22 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ TaskEngine engine = new TaskEngine.Builder()
.queueCapacity(20)
// 设置线程池的任务拒绝策略
.rejectedExecutionHandler(RejectedExecutionHandler)
.completedTaskHandler(handler)// 设置对已完成任务的处理回调
.build();
// 可通过 getRunningTasks() 方法获取当前正在执行的任务
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.jonzhang3</groupId>
<artifactId>aTask</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>

<name>ATask</name>
<description>A simple multi-purpose asynchronous task execution framework.</description>
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/com/atask/DefaultThreadPoolExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class DefaultThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicLong completedTaskNumber = new AtomicLong(0);
private final Deque<Task> runningQueue = new ConcurrentLinkedDeque<>();
private final Map<String, TaskGroup> runningTaskGrous = new ConcurrentHashMap<>();
private final LinkedBlockingDeque<Task> completedQueue = new LinkedBlockingDeque<>();
// private final LinkedBlockingDeque<Task> completedQueue = new LinkedBlockingDeque<>();

private final CompletedTaskHandler completedTaskHandler;

Expand All @@ -39,7 +39,7 @@ public DefaultThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long kee
} else {
this.completedTaskHandler = completedTaskHandler;
}
startHandleCompletedTask();
// startHandleCompletedTask();
}

public void submit(Task task) {
Expand Down Expand Up @@ -91,22 +91,22 @@ protected void afterExecute(Runnable r, Throwable t) {
CustomFutureTask<?> futureTask = (CustomFutureTask<?>) r;
Task task = futureTask.getTask();
runningQueue.remove(task);
completedQueue.offer(task);
// completedQueue.offer(task);
completedTaskNumber.incrementAndGet();
}
}

private void startHandleCompletedTask() {
new Thread(() -> {
while (true) {
try {
Task take = completedQueue.take();
DefaultThreadPoolExecutor.this.completedTaskHandler.handle(take);
} catch (Throwable ignore) {
}
}
}).start();
}
// private void startHandleCompletedTask() {
// new Thread(() -> {
// while (true) {
// try {
// Task take = completedQueue.take();
// DefaultThreadPoolExecutor.this.completedTaskHandler.handle(take);
// } catch (Throwable ignore) {
// }
// }
// }).start();
// }

// 获取正在运行的任务,包含任务组中的任务
protected final List<Task> getRunningTasks() {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/atask/TaskEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ public Builder rejectedExecutionHandler(RejectedExecutionHandler handler) {
return this;
}

public Builder completedTaskHandler(CompletedTaskHandler handler) {
this.completedTaskHandler = handler;
return this;
}
// public Builder completedTaskHandler(CompletedTaskHandler handler) {
// this.completedTaskHandler = handler;
// return this;
// }

public TaskEngine build() {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/atasktest/ATaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ public class ATaskTest {
@Before
public void before() {
engine = new TaskEngine.Builder()
.completedTaskHandler(System.out::println)
.build();
}

@After
public void after() {
engine.shutdown();
engine = null;
// engine = null;
}

@Test
Expand Down

0 comments on commit 5cc1963

Please sign in to comment.