Skip to content

Latest commit

 

History

History
1174 lines (731 loc) · 49.6 KB

3.Java并发.md

File metadata and controls

1174 lines (731 loc) · 49.6 KB

现今 Java 中线程的本质,其实就是操作系统中的线程,其线程库和线程模型很大程度上依赖于操作系统(宿主系统)的具体实现,比如在 Windows 中 Java 就是基于 Win32 线程库来管理线程,且 Windows 采用的是一对一的线程模型。

Java内部采用抢占式调度,而非协同式调度。

Java 内存模型(JMM)

JMM 规定了所有的变量都存储在主内存(Main Memory)中,每条线程还有自己的工作内存(Working Memory)。

线程的工作内存中保存了被该线程使用的变量的主内存副本,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的数据。

这里的变量包括实例字段、静态字段和构成数组对象的元素,但是不包括局部变量与方法参数,因为后面这俩是线程私有的,不会被共享,自然就不会存在竞争问题。

原子性

img

JMM 中定义了以下 8 种操作规范来完成一个变量从主内存拷贝到工作内存、以及从工作内存同步回主内存这一类的实现细节。Java 虚拟机实现时必须保证下面提及的每一种操作都是原子的、不可再分的。

  • lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。
  • unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
  • read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。
  • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
  • use(使用):作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
  • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
  • store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用。
  • write(写入):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量

无法保证一段代码执行结果的一定性(正确性),就把这段代码称为线程不安全的:

一段代码在被多个线程访问后,它仍然能够进行正确的行为,那这段代码就是线程安全的

一段代码线程不安全的原因,就涉及到非原子操作

对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性。

如何保证原子性

在处理器和 Java 编程语言层面,它们都提供了一些有效的措施,比如处理器提供了总线锁和缓存锁,Java 提供了锁和循环 CAS 的方式

  • JMM原子操作:由 Java 内存模型来直接保证的原子性变量操作包括 readloadassignusestorewrite 这 6 个,我们大致可以认为,基本数据类型的访问、读写都是具备原子性的(例外就是 long 和 double 的非原子性协定)。

  • synchronized 关键字:如果应用场景需要一个更大范围的原子性保证,Java 内存模型还提供了 lockunlock 操作来满足这种需求。尽管 JVM 并没有把 lockunlock 操作直接开放给用户使用,但是却提供了更高层次的字节码指令 monitorentermonitorexit 来隐式地使用这两个操作。这两个字节码指令反映到 Java 代码中就是同步块 — synchronized 关键字,因此在 synchronized 块之间的操作也具备原子性。

  • Lock 接口:除了 synchronized 关键字这种 Java 语言层面的锁,juc 并发包中的 java.util.concurrent.locks.Lock 接口也提供了一些类库层面的锁,比如 ReentrantLock

  • CAS 操作:随着硬件指令集的发展,在 JDK 5 之后,Java 类库中开始使用基于 cmpxchg 指令的 CAS 操作,该操作由 sun.misc.Unsafe 类里面的 compareAndSwapInt()compareAndSwapLong() 等几个方法包装提供。不过在 JDK 9 之前 Unsafe 类是不开放给用户使用的,只有 Java 类库可以使用,譬如 juc 包里面的整数原子类,其中的 compareAndSet()getAndIncrement() 等方法都使用了 Unsafe 类的 CAS 操作来实现。使用这种 CAS 措施的代码也常被称为无锁编程(Lock-Free)

可见性

一个线程修改了共享变量的值时,其他线程能够立即得知这个修改

如何保证可见性

使用volatilesunchronizedfinal关键字

JMM 规定了在执行 8 种基本原子操作时必须满足的一系列规则,这其中有一条规则正是 sychronized 能够保证原子性的理论支撑:

对一个变量执行 unlock 操作之前,必须先把此变量同步回主内存中(执行 store、write 操作)

也就是说 synchronized在修改了工作内存中的变量后,解锁前会将工作内存修改的内容刷新到主内存中,确保了共享变量的值是最新的,也就保证了可见性。

final 修饰的字段在构造器中一旦被初始化完成,并且构造器没有把 this 的引用传递出去,那么在其他线程中就能看见 final 字段的值。

有序性

为了使 CPU 内部的运算单元能尽量被充分利用,CPU 可能会对输入代码进行乱序执行优化,CPU 会在计算之后将乱序执行的结果重组,保证该结果与顺序执行的结果是一致的,但并不保证程序中各个语句计算的先后顺序与输入代码中的顺序一致,因此如果存在一个计算任务依赖另外一个计算任务的中间结果,那么其顺序性并不能靠代码的先后顺序来保证。

与之类似的,Java 的编译器也有这样的一种优化手段:指令重排序(Instruction Reorder)

在重排序的时候,CPU 和编译器都需要遵守一个规矩,这个规矩就是 as-if-serial 语义

不管怎么重排序,单线程环境下程序的执行结果不能被改变。

为了遵守 as-if-serial 语义,单线程环境下 CPU 和编译器不会对存在数据依赖关系的操作做重排序

数据依赖性:如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。

但多线程环境下,由于重排序的存在,就可能导致程序运行结果出现错误

如何保证有序性

使用volatilesynchronized 两个关键字来保证线程之间操作的有序性。

volatile 本身除了保证可见性的语义外,还包含了禁止指令重排序的语义,所以天生就具有保证有序性的功能。

synchronized 保证有序性的理论支撑,仍然是 JMM 规定在执行 8 种基本原子操作时必须满足的一系列规则中的某一个提供的:

一个变量在同一个时刻只允许一条线程对其进行 lock 操作

这个规则决定了持有同一个锁的两个 synchronized 同步块只能串行地进入。

遗憾的是,如果仅仅依靠这俩个关键字来保证有序性的话,编码将会变得非常繁琐,为此,Happens-before 原则应运而生。

Happens-before 先行发生原则

从 JMM 设计者的角度来看,可见性和有序性其实是互相矛盾的两点:

  • 一方面,对于程序员来说,我们希望内存模型易于理解、易于编程,为此 JMM 的设计者要为程序员提供足够强的内存可见性保证,专业术语称之为 “强内存模型”。
  • 而另一方面,编译器和处理器则希望内存模型对它们的束缚越少越好,这样它们就可以做尽可能多的优化(比如重排序)来提高性能,因此 JMM 的设计者对编译器和处理器的限制要尽可能地放松,专业术语称之为 “弱内存模型”。

从 JDK 5 开始,也就是在 JSR-133 内存模型中,终于给出了一套完美的解决方案,那就是 先行发生(Happens-before) 原则:

  1. 如果一个操作 Happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。

  2. 两个操作之间存在 Happens-before 关系,并不意味着 Java 平台的具体实现必须要按照 Happens-before 关系指定的顺序来执行。如果重排序之后的执行结果,与按 Happens-before 关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM 允许这种重排序)

第 1 条定义是 JMM 对程序员强内存模型的承诺。从程序员的角度来说,可以这样理解 Happens-before 关系:如果 A Happens-before B,那么 JMM 将向程序员保证 — A 操作的结果将对 B 可见,且 A 的执行顺序排在 B 之前。

不同于 as-if-serial 语义只能作用在单线程,这里提到的两个操作 A 和 B 既可以是在一个线程之内,也可以是在不同线程之间。也就是说,Happens-before 提供跨线程的内存可见性保证

Happens-before 是 JMM 的灵魂,它是判断数据是否存在竞争,线程是否安全的非常有用的手段。

一个 Happens-before 规则就对应于一个或多个编译器和处理器的重排序规则

8 条 Happens-before 规则

《JSR-133:Java Memory Model and Thread Specification》定义了如下 Happens-before 规则, 这些就是 JMM 中“天然的” Happens-before 关系,这些 Happens-before 关系无须任何同步器协助就已经存在,可以在编码中直接使用。如果两个操作之间的关系不在此列,并且无法从下列规则推导出来,则它们就没有顺序性保障,JVM 可以对它们随意地进行重排序。

  1. 程序次序规则(Program Order Rule)

    在一个线程内,按照控制流顺序,书写在前面的操作 Happens-before 于书写在后面的操作。注意,这里说的是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构。

    这个很好理解,符合我们的逻辑思维。比如我们上面举的例子:

    int a = 1; 		// A
    int b = 2;		// B
    int c = a + b;	// C

    根据程序次序规则,上述代码存在 3 个 Happens-before 关系:

    • A Happens-before B
    • B Happens-before C
    • A Happens-before C
  2. 管程锁定规则(Monitor Lock Rule)

    一个 unlock 操作 Happens-before 于后面对同一个锁的 lock 操作。这里必须强调的是 “同一个锁”,而 “后面” 是指时间上的先后。

  3. volatile 变量规则(Volatile Variable Rule)

    对一个 volatile 变量的写操作 Happens-before 于后面对这个变量的读操作,这里的 “后面” 同样是指时间上的先后。

  4. 线程启动规则(Thread Start Rule)

    Thread 对象的 start() 方法 Happens-before 于此线程的每一个动作。

    比如说主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的所有操作。

  5. 线程终止规则(Thread Termination Rule)

    线程中的所有操作都 Happens-before 于对此线程的终止检测,我们可以通过 Thread 对象的 join() 方法是否结束、Thread 对象的 isAlive() 的返回值等手段检测线程是否已经终止执行。

  6. 线程中断规则(Thread Interruption Rule)

    对线程 interrupt() 方法的调用 Happens-before 于被中断线程的代码检测到中断事件的发生,可以通过 Thread 对象的 interrupted() 方法检测到是否有中断发生。

  7. 对象终结规则(Finalizer Rule)

    一个对象的初始化完成(构造函数执行结束) Happens-before 于它的 finalize() 方法的开始。

  8. 传递性(Transitivity)

    如果操作 A Happens-before 于操作 B,操作 B Happens-before 于操作 C,那就可以得出操作 A Happens-before 于操作 C 的结论。

eg.:

private int value = 0;

// 线程 A 调用
pubilc void setValue(int value){    
    this.value = value;
}

// 线程 B 调用
public int getValue(){
    return value;
}

假设存在线程 A 和 B,线程 A 先(时间上的先后)调用了 setValue(1),然后线程 B 调用了同一个对象的 getValue() ,那么线程 B 收到的返回值是什么?

我们根据上述 Happens-before 的 8 大规则依次分析一下:

由于两个方法分别由线程 A 和 B 调用,不在同一个线程中,所以程序次序规则在这里不适用;

由于没有 synchronized 同步块,自然就不会发生 lock 和 unlock 操作,所以管程锁定规则在这里不适用;

同样的,volatile 变量规则,线程启动、终止、中断规则和对象终结规则也和这里完全没有关系。

因为没有一个适用的 Happens-before 规则,所以第 8 条规则传递性也无从谈起。

因此我们可以判定,尽管线程 A 在操作时间上来看是先于线程 B 的,但是并不能说 A Happens-before B,也就是 A 线程操作的结果 B 不一定能看到。所以,这段代码是线程不安全的。

想要修复这个问题也很简单?既然不满足 Happens-before 原则,那我修改下让它满足不就行了。比如说把 Getter/Setter 方法都用 synchronized 修饰,这样就可以套用管程锁定规则;再比如把 value 定义为 volatile 变量,这样就可以套用 volatile 变量规则等。

Happens-before 原则与时间先后顺序之间基本没有因果关系,所以我们在衡量并发安全问题的时候,尽量不要受时间顺序的干扰,一切必须以 Happens-before 原则为准。

本质上来说 Happens-before 关系和 as-if-serial 语义是一回事,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度。只不过后者只能作用在单线程,而前者可以作用在正确同步的多线程环境下。as-if-serial 语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。Happens-before 关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按 Happens-before 指定的顺序来执行的。

创建线程

  1. 线程与任务合并 — 直接继承 Thread 类

    class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("start new thread!");
        }
    }
    // Thread 类提供了一个构造函数,可以为某个线程指定名字
    MyThread t = new MyThread();
    MyThread t1 = new MyThread("t1");
    // 匿名内部类简化版本
    Thread t1 = new Thread("t1") {
    	@Override
    	// run 方法内实现了要执行的任务
    	public void run() {
    		// 线程需要执行的任务
        	......
     	}
    };
  2. 线程与任务分离 — Thread + 实现 Runnable 接口

    假如有多个线程,这些线程执行的任务都是一样的,那按照上述方法一的话就需要写很多重复代码。所以,我们考虑把线程执行的任务与线程本身分离开来。

    除了避免了重复代码,使用实现 Runnable 接口的方式也比方法一的单继承 Thread 类更具灵活性,毕竟一个类只能继承一个父类,如果这个类本身已经继承了其它类,就不能使用第一种方法了。另外,用这种方式,也更容易与线程池等高级 API 相结合。

    线程(Thread)里有任务(Runnable)对象

    //1.直接实现接口
    class MyRunnable implements Runnable {
        @Override
        public void run() {
            // 线程需要执行的任务
        	......
        }
    }
    // 创建任务类对象
    MyRunnable runnable = new MyRunnable();
    // 创建线程对象
    Thread t2 = new Thread(runnable);
    
    // 2.匿名创建任务类对象
    Runnable runnable = new Runnable() {
        public void run(){
            // 要执行的任务
            ......
        }
    };
    // 创建线程对象
    Thread t2 = new Thread(runnable);
  3. 线程与任务分离 — Thread + 实现 Callable 接口

    虽然 Runnable 挺不错的,但是仍然有个缺点,那就是没办法获取任务的执行结果,因为它的 run 方法返回值是 void。

    这样,对于需要获取任务执行结果的线程来说,Callable 就成为了一个完美的选择。

    和 Runnbale 比起来,Callable 不过就是把 run 改成了 call。当然,最重要的是!和 void run 不同,这个 call 方法是拥有返回值的,而且能够抛出异常。这样,一个很自然的想法,就是把 Callable 作为任务对象传给 Thread,然后 Thread 重写 call 方法。但遗憾的是,Thread 类的构造函数里并不接收 Callable 类型的参数。所以,我们需要把 Callable 包装一下,包装成 Runnable 类型,这样就能传给 Thread 构造函数了。为此,FutureTask 成为了最好的选择。

    另外,Callable 和 FutureTask 的泛型填的就是 Callable 任务返回的结果类型(就是 call 方法的返回类型)。

    class MyCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            // 要执行的任务
            ......
            return 100;
        }
    }
    // 将 Callable 包装成 FutureTask,FutureTask也是一种Runnable
    MyCallable callable = new MyCallable();
    FutureTask<Integer> task = new FutureTask<>(callable);
    // 创建线程对象
    Thread t3 = new Thread(task);

    当线程运行起来后,可以通过 FutureTask 的 get 方法获取任务运行结果:

    Integer result = task.get();

    不过,需要注意的是,get 方法会阻塞住当前调用这个方法的线程。比如说我们在主线程中调用了 get 方法去获取 t3 线程的任务运行结果,那么只有这个 call 方法成功返回了,主线程才能够继续往下执行。

    换句话说,如果 call 方法一直得不到结果,那么主线程也就一直无法向下运行。

启动线程

t1.start();
// 线程的优先级
Thread.setPriority(int n) // 1~10, 默认值5

【为什么使用 start 启动线程,而不使用 run 方法启动线程?】:使用 run 方法启动线程任务确实能够被正确执行,但是并不是以多线程的方式,当我们使用 t1.run() 的时候,程序仍然是在创建 t1 线程的 main 线程下运行的,并没有创建出一个新的 t1 线程。

线程的状态

img

在Java程序中,一个线程对象只能调用一次start()方法启动新线程,并在新线程中执行run()方法。一旦run()方法执行完毕,线程就结束了。因此,Java线程的状态有以下几种:

  • New:新建。初始化。
  • Runnable:就绪。JVM完成了方法调用栈和程序计数器的创建。
  • Running:运行。正在执行run()方法的Java代码。
  • Blocked:阻塞。因为某些操作被阻塞而挂起。分为等待阻塞(wait),同步阻塞(锁),其它阻塞(sleep,join,I/O)。
  • Dead:死亡。run()方法执行完毕,异常退出,手动结束。

线程的基本方法

  1. wait:等待。释放锁(如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。)

  2. sleep:睡眠。不释放锁

    sleep() 可能会抛出 InterruptedException。**因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。**线程中抛出的其它异常也同样需要在本地进行处理。

  3. notify:唤醒。唤醒等待或超时等待的线程

  4. yield:让步。让出CPU

  5. join:加入。调用其他线程的join方法,则当前线程阻塞,等待其终止。

  6. interrupt:中断。并不直接中断一个正在运行的线程,只改变内部标志位。

    • 如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。

    • 如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。但是调用 interrupt() 方法会设置线程的中断标记,此时调用 isInterrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

    class SafeInterruptThread extends Thread{
        @Override
        public void run() {
            if(!Thread.currentThread().isInterrupted()){
                try{
                    // 业务逻辑
                } catch (InterruptedException e){
                    Thread.currentThread().interrupt(); // 重新设置中断标志
                }
            }
            else{
                // 处理结束前清理工作,如释放锁,持久化,异常通知
            }
    
        }
    }
    SafeInterruptThread thread = new SafeInterruptThread();
    thread.interrupt();
  7. setDaemon:守护。

    守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。

    当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。

    main() 属于非守护线程。

    在线程启动之前使用 setDaemon() 方法可以将一个线程设置为守护线程。

等待另一个线程直到运行结束

Thread t = new Thread(() -> {
    System.out.println("hello");
});
System.out.println("start");
t.start();
t.join();
System.out.println("end");

线程池 Executor框架

用于管理线程组及其运行状态,以便更好的利用CPU资源。

JVM先根据参数创建一定数量可运行的线程任务,放入队列,启动这些任务。如果超出了最大线程数,则排队等候有任务执行完毕。

线程池核心组件:

  • 线程池管理器:用于创建并管理线程池
  • 工作线程
  • 任务接口:用于定义工作线程的调度和执行策略
  • 任务队列:存放待处理的任务

Executor框架

Java中的线程池通过Executor框架实现:

img

  • Executor:执行器接口,该接口定义执行Runnable任务的方式。

  • ExecutorService:该接口定义提供对Executor的服务。

    shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

    如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。

    Future<?> future = executorService.submit(() -> {
        // ..
    });
    future.cancel(true);
  • ScheduledExecutorService:定时调度接口。

  • AbstractExecutorService:执行框架抽象类。

  • ThreadPoolExecutor:JDK中线程池的具体实现。

    public ThreadPoolExecutor(int corePoolSize, // 线程池中核心线程数量
                              int maximumPoolSize, // 线程池中最大线程数量
                              long keepAliveTime, // 当线程数量超过corePoolSize时,线程存活时间
                              TimeUnit unit, // keepAliveTime的单位。
                              BlockingQueue<Runnable> workQueue, // 缓冲任务队列,被提交但尚未被执行的任务存放的地方
                              ThreadFactory threadFactory, // 线程工厂,用于创建线程,可使用默认的
                              RejectedExecutionHandler handler)  // 任务过多等原因导致线程池无法处理的任务拒绝策略。

    参数详解:

    TimeUnit unit

    java.util.concurrent.TimeUnit类的静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS

    BlockingQueue<Runnable> workQueue

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
    • LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法Excutors.newFixedThreadPool()使用了这个队列
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态工厂方法Excutors.newCachedThreadPool()使用了这个队列
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    使用无界队列(newFixedThreadPool),如果并发任务量巨大,任务逻辑比较耗时,未来得及处理的任务会大量堆积在队列里,导致内存急速飙高,可能导致程序挂掉。解决办法是让线程使用有界阻塞队列,队列满了就暂停放入线程任务;但是这样拒绝策略要设置好,因为默认的是超过队列长度就丢弃的策略AbortPolicy,这样不行,应该使用CallerRunsPolicy策略,直接让调用者执行任务。

    RejectedExecutionHandler handler

    • AbortPolicy:丢弃任务并抛出异常。

    • DiscardPolicy:丢弃任务,但是不抛出异常。

    • DiscardOldestPolicy:如果执行程序尚未关闭,丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。

    • CallerRunsPolicy:由调用线程处理该任务,即直接在 execute 方法的调用线程中运行被拒绝的任务

    ThreadPoolExecutor的使用需要判断:

    • 线程数量 < corePoolSize:即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    • 线程数量 == corePoolSize && workQueue 未满:任务被放入缓冲队列。

    • 线程数量 > corePoolSize && workQueue 满 && 线程数量 < maximumPoolSize:建新的线程来处理被添加的任务。

    • 线程数量 > corePoolSize && workQueue 满 && 线程数量 == maximumPoolSize:通过handler所指定的策略来处理此任务。

    • 线程数量 > corePoolSize && 线程空闲时间 > keepAliveTime:线程将被终止。

  • Executors:线程池工厂类。内部都是由ThreadPoolExecutor实现

    JDK内部提供了五种最常见的线程池。由Executors类的五个静态工厂方法创建。

    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    • newCachedThreadPool:可缓存的。如果有可重用线程则重用。对于执行时间很短的任务能极大提高性能。
    • newFixedThreadPool:固定大小的。
    • newScheduledThreadPool:定长定时线程池,支持定时及周期性任务执行。
    • newSingleThreadExecutor:单个线程,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    • newSingleThreadScheduledExecutor:单个线程定时线程池。
    • newWorkStealingPool:足够大小的线程池。
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}

锁概念

可重入锁(递归锁):支持一个线程对同一个资源执行多次加锁操作。

公平锁和非公平锁:实现竞争的机制是否公平。公平锁先到先得,非公平锁就近或随机但效率高。

乐观锁:读取数据使认为别人不会修改,不会上锁,但在更新时会判断在此期间有没有别人修改。通常比较当前与上次版本号。大部分通过 CAS 操作实现。 悲观锁:每次读写都上锁。大部分基于 AQS 架构实现。

自旋锁:认为如果持有锁的线程能在很短时间内释放资源,那么等待的线程就不需要在内核态和用户态之间切换进入阻塞挂起状态,上cpu的时候等待(自旋)锁释放。为了不永久等待锁释放或造成cpu资源浪费过多,需设定一个自旋等待最大时间。适用于占用锁时间非常短或者竞争不激烈的代码块。在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

独占锁(互斥锁):每次只允许一个线程持有该锁。 共享锁:允许多个线程同时并发访问共享资源。

偏向锁:在某个线程获取锁之后,消除这个线程锁重入的开销。只需要在切换 ThreadID 时执行一次 CAS 原子操作,提高某个线程并发时的性能 轻量级锁:没有多线程竞争(无并行),提高总体并发互斥操作性能。获取和释放需要多次 CAS 原子操作。 重量级锁:基于操作系统的互斥量(Mutex Lock),导致进程在用户态和内核态切换,开销大。

分段锁:将数据分段,单独加锁。把锁细粒度化,提高并发效率

锁实现

volatile

修饰变量

特性:

  • 保证该变量对所有线程可见:在一个线程修改了变量的值后,新的值对于其他线程可以立即获取。

  • 禁止指令重排:不会被缓存在寄存器或对其它处理器不可见的地方(跳过CPU Cache)。

保证并发环境线程安全的使用条件

  • 对变量的写操作不依赖于当前值。
  • 该变量没有被包含在具有其它变量的不变式中,真正独立于程序内的其他内容。

synchronized

属于独占锁,悲观锁,可重入锁,非公平锁,重量级锁(基于监视器锁 Monitor)。

  • 作用于成员变量和非静态方法时,锁住对象实例
  • 作用于静态方法时,锁住class实例
  • 作用于代码块时,锁住代码块中配置的对象
public synchronized void generalMethod1(){}

public static synchronized void generalMethod2(){}

String lockA = "lockA";
synchronized(lockA){}

synchronized (SynchronizedExample.class) {} // 同步一个类

synchronized内部包括ContentionList、EntryList、WaitSet、OnDeck、Owner、!Owner六个区域。在收到锁请求后首先自旋,如果不能获得资源就放入ContentionList中。

synchronized & ReentrantLock 比较

  1. 锁的实现:synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

  2. 性能:新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

  3. 等待可中断:当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。ReentrantLock 可中断,而 synchronized 不行。

  4. 公平锁:synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

  5. 锁绑定多个条件:一个 ReentrantLock 可以同时绑定多个 Condition 对象。

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

Lock 工具类

J.U.C.locks

public interface Lock {
    void lock(); // 获取锁
    void lockInterruptibly() throws InterruptedException; // 以可中断方式获取锁
    boolean tryLock(); // 尝试获取锁,立即返回成功或失败
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock(); // 释放锁
    Condition newCondition(); // 获取锁的控制
}

ReentrantLock 可重入锁

通过 AQS 实现,继承Lock,可重入锁,独占锁

默认非公平锁,可在构造函数中ReentrantLock(boolean fair)传参修改

维护的state初始为0,线程重入持有时+1

public class ReentrantLockDemo implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
    @Override
    public void run() {
        for(int j = 0;j < 10;j++){
            lock.lock();
            // lock.lock(); 可重入锁
            try {
                i++;
            }finally {
                if(lock.isHeldByCurrentThread()){ // 检查线程是否持有该锁
                    lock.unlock();
                	// lock.unlock(); 可重入锁
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
        Thread t = new Thread(reentrantLockDemo);
        t.start();
        t.join();
        System.out.println(i);
    }
}

通过以下方法避免死锁:

  • 响应中断:使用reentrantLock.lockInterruptibly()获取锁(若当前线程未中断),使用reentrantLock.unlock()释放锁,使用thread.interrupt()中断线程

  • 可轮询锁:通过tryLock()获取锁。如果可用则获取并返回true

  • 定时锁: 通过tryLock(time, unit)获取锁。如果在给定时间内获取到了锁,且未被中断,则获取;否则休眠

❓ ReentrantLock 是如何实现可重入性的

ReentrantLock内部持有了一个sync对象,这个对象实现了AQS,并且加锁的时候使用CAS算法,在所对象申请的时候,在锁等待node链表中查看当前申请的锁的对象是否是同一个对象,如果是的话,进行重入。

ReadWriteLock 读写锁

private final Map<String, Object> cache = new HashMap<>();
private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock readLock = rwlock.readLock();
private final Lock writeLock = rwlock.writeLock();
public Object get(String key){
    readLock.lock();
    try{ return cache.get(key); }
    finally { readLock.unlock(); }
}

public Object put(String key, Object value){
    writeLock.lock();
    try{ return cache.put(key, value); }
    finally { writeLock.unlock(); }
}

Condition 线程协调器

await() signal() signalAll()

java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。

相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。

使用 Lock 来获取一个 Condition 对象。

public class AwaitSignalExample {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

CountDownLatch 倒计时锁

通过 AQS 实现,共享锁

同步工具,可以让一个或多个线程一直等待其它线程执行完后再执行。

final CountDownLatch latch = new CountDownLatch(2);
new Thread(){ public void run(){ latch.countDown(); } }.start(); // 子线程1
new Thread(){ public void run(){ latch.countDown(); } }.start(); // 子线程2
latch.await() // 等待两个子程序都执行完毕才唤醒

CyclicBarrier 循环屏障

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

// 等待所有三个线程都执行完await后才继续
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () ->{
    System.out.println("\n123");
});
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
    executorService.execute(() -> {
        System.out.print("before..");
        cyclicBarrier.await();
        System.out.print("after..");
    });
}
// 输出
before..before..before..
123
after..after..after..

Semaphore 信号量

通过 AQS 实现,共享锁

// 计数
Semaphore semp = new Semaphore(5);
// 申请
semp.acquire(1);
// 释放
semp.release(1);									

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

public class SemaphoreExample {

    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
// 输出
2 1 2 2 2 2 2 1 2 2

BlockingQueue 阻塞队列

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置。

使用阻塞队列实现生产者消费者问题

  • ArrayBlockingQueue(FIFO,定长)
  • LinkedBlockingQueue(FIFO,变长)
  • PriorityBlockingQueue(优先级)
  • DelayQueue
  • SynchronousQueue
  • LinkedTransferQueue
  • LinkedBlockingDeque
public class ProducerConsumer {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("produce..");
        }
    }

    private static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                String product = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("consume..");
        }
    }
}

Atomic 原子类

J.U.C.atomic

轻量级锁,基于CAS

// 定义原子操作数
static AtomicInteger safeCounter = new AtomicInteger(0);
// 自增
safeCounter.getAndIncrement();

ConcurrentHashMap 并发映射

包含一个 Segment 数组,每个 Segment 里包含一个 HashEntry 数组。即内部分为若干个小的HashMap(Segment),对每个数据段单独加锁。

1.8之后,ConcurrentHashMap取消了Segment分段锁, 采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构类似,数组+链表/红黑树。jdk1.8在链表长度超过一定阀值(8)时,将链表(寻址时间复杂度为O(N))转换为红黑树(寻址时间复杂度为O(log(N)) )。synchronized只锁定当前链表或红黑树的首节点, 这样只要hash不冲突, 就不会产生并发, 效率又提升N倍。

FutureTask 延后返回

FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future<V> 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。

FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        int result = 0;
        for (int i = 0; i < 100; i++) {
            Thread.sleep(10);
            result += i;
        }
        return result;
    }
});

Thread computeThread = new Thread(futureTask);
computeThread.start();

Thread.sleep(2000);

System.out.println(futureTask.get());

ForkJoin

主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任务足够小则直接计算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任务
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。

img

锁优化

这里的锁优化主要是指 JVM 对 synchronized 的优化。

  • 自旋锁:避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景
  • 减少持有时间:只在有线程安全要求的程序上加锁
  • 减少锁粒度:减少锁持有时间,增加锁并行度,减少同一个锁上的竞争。如 ConcurrentHashMap
  • 锁分离:如读写锁。可进一步延伸为只要操作不影响,就可进一步拆分,如 LinkedBlockingQueue 的队列存取数据
  • 锁粗化:锁分的太细会导致频繁获取和释放锁。将关联性强的锁集中处理,以提高整体效率。
  • 锁消除:将不需要用到锁的地方消除。通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

共享数据

将数据抽象成类,并将数据的操作方法作为类的方法,加上synchronized

public class MyData{
    private  int j=0;
    public synchronized void add(){ j++; }
    public synchronized void dec(){ j--; }
    public int getData(){ return j;}
}
public class TestThread{
    public static void main(String[] args) {
        final MyData data = new MyData();
        new Thread(new Runnable() {
            public void run() { data.add(); }
        }).start();
        new Thread(new Runnable() {
            public void run() { data.dec(); }
        }).start();
    }
}

CAS (Compare And Swap)

乐观锁,非阻塞算法

CAS(V,E,N):当要更新的变量V的值等于预期值E时,才会将V的值更新为N;否则重试直到成功为止

java.util.concurrent.atomic里的原子类内部基于CAS算法实现。

通过版本号来解决ABA问题(一个线程将值从A修改到B再修改到A,另一个线程没有感知到)

AQS (Abstract Queued Synchronizer)

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。

抽象的队列同步器,通过维护一个共享的资源状态(volatile int state)和一个线程等待队列来实现一个多线程访问共享资源的同步框架。

每个共享资源都有锁,线程访问前需先获取锁,如果获取不到就进入线程等待队列。

AQS维护了一个变量 volatile int state,保证其可见性,并提供了原子的读写更新操作。

资源的共享方式有独占式和共享式。

线程安全概念

  1. 不可变

    • final
    • String
    • 枚举
    • Number部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。
    • 对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
  2. 阻塞同步(互斥同步)

    悲观策略,最主要的问题就是线程阻塞和唤醒所带来的性能问题

    加锁:synchronized 和 ReentrantLock。

  3. 非阻塞同步

    • CAS
    • AtomicInteger
  4. 无同步方案

ThreadLocal

线程变量,通常被private static修饰

ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。

ThreadLocal 适用于如下两种场景:

  • 每个线程需要有自己单独的实例(数据)
  • 实例(数据)需要在多个方法中共享,但不希望被多线程共享

ThreadLocal与synchronized有本质的区别:

  1. Synchronized用于线程间的数据共享,而ThreadLocal则用于线程间的数据隔离。

  2. Synchronized是利用锁的机制,使变量或代码块在某一时该只能被一个线程访问。而ThreadLocal为每一个线程都提供了变量的副本,使得每个线程在某一时间访问到的并不是同一个对象,这样就隔离了多个线程对数据的数据共享。

img

ThreadLocal 只跟其归属的线程有关,线程死亡了,那么它对应的 ThreadLocal 中存储的信息也就被清除了(线程死亡前一定要释放掉绑定的用户数据,不然会出现 OOM 问题),也就是说,ThreadLocal 只用于在本次请求中持有数据。

比如把用户数据存入 ThreadLocal 里,这样,只要本次请求未处理完,这个线程就一直还在,当前用户数据就一直被持有,当服务器对本次请求做出响应后,这个线程就会被销毁。

❓ 如何解决同一个用户发出的两次请求可能被不同的两个线程进行处理

定义一个过滤器,在每次请求前都对用户进行判断(为了避免每次请求都经过过滤器,可以将登录成功的用户信息暂时存储到 Redis 中),然后将已经登录成功的用户信息存到 ThreadLocal 里,从而使得该线程在本次请求中持有该用户信息。

多线程开发良好的实践

  • 给线程起个有意义的名字,这样可以方便找 Bug。
  • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。
  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。
  • 使用 BlockingQueue 实现生产者消费者问题。
  • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。
  • 使用本地变量和不可变类来保证线程安全。
  • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。