Skip to content

Latest commit

 

History

History
386 lines (295 loc) · 15 KB

File metadata and controls

386 lines (295 loc) · 15 KB

安全取消任务

简单的任务取消方法

最简单的任务取消方法:自己设置一个取消标记:private volatile boolean cancelled;,然后子啊运行任务过程中不断的循环判断这个标记,然后用另一个线程改变这个标记,一旦当前线程检测到这个取消标记发生变化,就退出停止执行。

示例:

public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {            // 这里循环判断这个标记
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;        // 用另一个线程设置这个取消标记
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }
}

不过这种方法是有缺陷的,一旦正在执行的任务发生了阻塞,并且该阻塞状态一直没有解除,那么它将不再有机会判断取消标记,这样即使令 cancelled = true 了,需要被取消的线程也检测不到。就像下面这段代码这样:

class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled)
                // 如果队列已经满了,而且也没有消费者从队列中take元素了
                // 这个线程将一直卡在这里,没有机会去判断cancelled的状态
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {}
    }

    public void cancel() {
        cancelled = true;
    }
}

中断!!!

通过上一节的讲解我们发现,自己设置一个标记用来判断线程是否要被取消实在不太好用,所以我们就不自己定义一个 boolean 表示线程的状态了,而是直接用 Java 给我们提供的中断机制,其实,每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true,也就是说: 中断并不会真正地停止一个正在运行的线程,只是将被中断线程的中断标记设置为 true,然后由线程自己在一个合适的时刻检查自己的中断标记中断自己(这些时刻也被称为取消点),这样可以防止线程在不应该被中断的地方强制停止执行。

中断方法

方法 说明
public void interrupt() 中断目标线程(将当前线程的中断标记设置为 true)
public boolean isInterrupted() 返回目标线程的中断状态,执行后中断标记还保持它原来的值
public static boolean interrupted() 返回目标线程的中断状态,执行后将中断标记设置为为 false

注意:

  • interrupted() 方法是能清除中断状态的唯一方法;
  • 在调用 interrupted() 返回值为 true 时,除非我们想要屏蔽这个中断,否则必须对它进行处理。有以下两种处理方式:(这两种方法也是正确的中断处理方法!)
    • 抛出 InterruptedException
    • 再次调用 interrupt() 方法,将中断标记恢复为 true

中断是如何解决简单的任务取消方法中的阻塞问题的?

阻塞方法一般都会 throws InterruptedException ,它们在中断标记被修改并且被它们检测到后会:

  • 清除中断标记,即把中断标记设置为 false;
  • 抛出 InterruptedException 异常,停止线程执行。

注意: JVM 并不保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

ThreadPoolExecutor 拥有的线程检测到中断时的操作

检查线程池是否正在关闭:

  • 如果是,在结束之前执行一些线程池清理工作;
  • 如果不是,创建一个新线程将线程池恢复到合理的规模。

Future 实现计时运行

需求: 给一个 Runnable r 和时间 long timeout,解决“最多花 timeout 分钟运行 Runnable,没运行完就取消”这种要求。

private static final ExecutorService cancelExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
    Future<?> task = cancelExec.submit(r);
    try {
        task.get(timeout, unit);
    } catch (TimeoutException e) {
        // 如果超时,抛出超时异常
    } catch (ExecutionException e) {
        // 如果任务运行出现了异常,抛出任务的异常
        throw launderThrowable(e.getCause());
    } finally {
        // 如果任务已经结束,这句没影响
        // 如果任务还在运行,这句会中断任务
        task.cancel(true);
    }
}

安全停止基于线程的服务

线程的所有权

线程的所有者: 创建这个线程的类(线程池是其工作者线程的所有者,如果要中断这些线程,要通过线程池)

线程的所有权是不可传递的:

  • 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程,而是要通过服务来停止。
  • 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。 服务可以通过生命周期方法关闭它所拥有的线程。

一个有问题的日志服务

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private final PrintWriter writer;
    private boolean isShutdown;  // 新加一个关闭判断
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        if (!isShutdown) {  // 如果关了,就抛异常
            queue.put(msg); // 但是这一行和上一行不是一个原子操作,有可能会丢失日志
            			   // 我们又不能给这个方法加synchronized,因为给一个阻塞方法加锁是很危险的!
        } else {
            throw new IllegalStateException("logger is shut down");
        }
    }

    private class LoggerThread extends Thread {
        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

方法一:BlockingQueue + isShutdown + count

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    private boolean isShutdown;
    // 这个计数器的作用是:如果queue满了,有线程阻塞着,
    // 它可以保证所有在关日志前添加的日志都记录了再真正关闭日志服务
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown)
                throw new IllegalStateException(/*...*/);
            ++reservations; // 记录待处理的日志数量
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break; // 只有当isShutdown == true并且没有待处理的日志时才能关闭日志服务
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations; // 处理完一个,待处理的日志数就-1
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

方法二:线程池 ExecutorService

ExecutorService 的关闭方法:

  • shutdown
    • 会把当前执行的和尚未启动的任务清单中的程序执行完再关闭
    • 关闭速度慢但安全
  • shutdownNow
    • 首先关闭当前正在执行的任务,然后任何返回任务清单中尚未执行的任务

使用 SingleThreadExecutor 线程池构建日志服务:

public class LogService {
    private final ExecutorService exec = Executors.newSingleThreadExecutor();
    private final PrintWriter writer;
    public void start() {}
    public void stop() {
        try {
            // 下两行经常一起用!
            exec.shutdown();
            exec.awaitTermination(TIMEOUT, UNIT);
        } finally {
            write.close();
        }
    }
    public void log(String msg) {
        try {
            exec.execute(new WriteTask(msg));
        } catch (RejectedExecutionException e) {}
    }
}

毒丸对象

另一种关闭生产者一消费者服务的方式就是使用“毒丸(Poison Pill)”对象。 “毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。 在 FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。

注意:

  • 只有在无界队列中,“毒丸”对象才能可靠地工作,否则可能无法将毒丸对象 put 到队列中去。
  • 只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象,否则无法判断应该使用几个毒丸对象。
  • 扩展到多个生产者:每个生产者都向队列中放入一个“毒丸”对象,并且消费者仅当在接收到生产者数量个“毒丸”对象时才停止。
  • 扩展到多个消费者的情况:生产者将消费者数量个“毒丸”对象放入队列。

处理 RuntimeException

RuntimeException 通常不会被捕捉,是导致线程死亡的最主要原因。

处理方法

方法一:在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。

public void run() {
    Throwable thrown = null;
    try {
        while (!isInterrupted()) {
            runTask(getTaskFromWorkQueue());
        }
    } catch (Throwable e) {
        thrown = e;
    } finally {
        threadExited(this, thrown);
    }
}

方法二: implements Thread.UncaughtException 接口

public interface UncaughtExceptionHandler {
    void uncaughtException(Thread t, Throwable e);
}

令会抛出 RuntimeException 异常的的类实现 UncaughtException 接口,这样当该类抛出未捕获的异常时,会执行 uncaughtException(Thread t, Throwable e) 方法,我们可以在这个方法中将错误信息写入日志,或者尝试重新启动线程,关闭应用程序等。

注意: 只有通过 execute 提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过 submit 提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由 submit 提交的任务由于抛出了异常而结束,那么这个异常将被 Future.get 封装在 ExecutionException 中重新抛出。

JVM 关闭

关闭钩子

JVM 关闭流程:

  • 在正常关闭中,JVM 首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过 Runtime.addShutdownHook(Thread) 注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。
  • 当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

应用:通过注册一个关闭钩子来停止日志服务

public void start(){
    Runtime.getRuntime().addshutdownHook(new Thread() {
        public void run() {
            try { Logservice. this. stop(); }
            catch (InterruptedException ignored) {}
        }
    });
}

注意:Runtime 是个单例类

守护线程 Daemon

定义: 一种特殊的线程,当进程中不存在非守护线程了,守护线程自动销毁。

应用: 垃圾回收线程,当进程中没有非守护线程了,垃圾回收线程就没有存在的必要了,会自动销毁。

设置方法: thread.setDaemon(true);