JUC

JUC是java.util.concurrent工具包的简称,用来处理线程

进程与线程

进程

  • 进程是系统进行资源分配和调度的基本单位。
  • 进程指在系统中正在运行的一个应用程序

线程

  • 线程是操作系统能够进行运算调度的最小单位。
  • 它被包含在进程之中,是进程中的实际运作单位。
  • 一条线程指的是进程中一个单一顺序的控制流个进程中可以并发多个线程,每条线程并行执行不同的任务。

线程的状态

线程状态枚举类

状态 说明
NEW 创建状态
RUNNABLE 运行状态
BLOCKED 阻塞状态
WAITING 等待状态
TIMED_WAITING 超时等待
TERMINATED 终止状态

wait/sleep 的区别

  • sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都能调用。
  • sleep不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提是当前线程占有锁(即代码要在 synchronized 中)
  • 它们都可以被 interrupted 方法中断

并发与并行

串行模式

串行表示所有任务都一一按先后顺序进行

并行模式

并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。

并发

并发指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。

用户线程和守护线程

用户线程 :平时用到的普通线程,自定义线程

守护线程运行在后台,是一种特殊的线程,比如垃圾回收

  • 当主线程结束后,用户线程还在运行,JVM 存活

  • 如果没有用户线程,都是守护线程,JVM 结束

Lock 接口

Synchronized

synchronized

synchronized 是 Java 中的关键字,是一种同步锁

  • 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象
  • 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
    • 虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承
    • 如果在父类中的某个方法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以
  • 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象
  • 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用的对象是这个类的所有对象

如果一个代码块被 synchronized 修饰,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁

  1. 获取锁的线程执行完了该代码块,然后线程释放对锁的占有
  2. 线程执行发生异常,此时 JVM 会让线程自动释放锁

synchronized实现同步的基础︰Java中的每一个对象都可以作为锁具体表现为以下3种形式。

  • 对于普通同步方法,锁是当前实例对象。
  • 对于静态同步方法,锁是当前类的class对象。
  • 对于同步方法块,锁是synchonized括号里配置的对象

Lock

Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象

Lock 与的 Synchronized 区别

  • Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性。Lock 是一个接口,通过这个类可以实现同步访问;
  • 采用 synchronized 不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后, 系统会自动让线程释放对锁的占用
  • Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象
  • Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用 synchronized 时,等待的线程会一直等待下去,不能够响应中断;
  • 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
  • Lock 可以提高多个线程进行读操作的效率。在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源 非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于 synchronized
1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
概念 说明
lock 用来获取锁。如果锁已被其他线程获取,则进行等待。采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。
newCondition Lock 锁的 newContition()方法返回 Condition 对象,Condition 类 也可以实现等待/通知模式
ReentrantLock 可重入锁
ReadWriteLock ReadWriteLock 也是一个接口

lock

1
2
3
4
5
6
7
8
Lock lock = ...;
lock.lock();
try{
// 处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}

newCondition

方法 说明
await() 会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重新获得锁并继续执行
signal() 用于唤醒一个等待的线程。

=在调用 Condition 的 await()/signal()方法前,也需要线程持有相关 的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前 Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。

ReentrantLock

ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void insert(Thread thread) {
Lock lock = new ReentrantLock();
lock.lock();
try {
System.out.println(thread.getName()+"得到了锁");
for(int i=0;i<5;i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
lock.unlock();
}
}

ReadWriteLock

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。

1
2
3
4
5
// Returns the lock used for reading.
Lock readLock();

// Returns the lock used for writing.
Lock writeLock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Test {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();

while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
} finally {
rwl.readLock().unlock();
}
}

public static void main(String[] args) {
final Test test = new Test();

new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();

new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
}

thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

线程定制化通信

线程间通信的模型有两种:共享内存和消息传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ShareResource {
// 标志位 A 1 B 2 C 3
private int flag = 1;
// lock
private Lock lock = new ReentrantLock();

// condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();

// print 5 times
public void print5Times(int loop) {
lock.lock();
try {
// 等待
while (flag != 1) {
c1.await();
}
// 操作
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + "\tloop: " + loop);
}
// 通知
flag = 2; // 修改标志位
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

// print 10 times
public void print10Times(int loop) {
lock.lock();
try {
// 等待
while (flag != 2) {
c2.await();
}
// 操作
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + "\tloop: " + loop);
}
// 通知
flag = 3; // 修改标志位
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

// print 15 times
public void print15Times(int loop) {
lock.lock();
try {
// 等待
while (flag != 3) {
c3.await();
}
// 操作
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + " : " + i + "\tloop: " + loop);
}
// 通知
flag = 1; // 修改标志位
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo {

public static void main(String[] args) {
ShareResource resource = new ShareResource();

new Thread(()->{
for (int i = 0; i < 10; i++) {
resource.print5Times(i);
}
}, "ThreadA").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
resource.print10Times(i);
}
}, "ThreadB").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
resource.print15Times(i);
}
}, "ThreadC").start();
}
}

集合的线程安全

存在问题

ArrayList线程不安全实例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadDemo {

public static void main(String[] args) {
List<String> list = new ArrayList<>();

for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}

此时会出现 java.util.ConcurrentModificationException

出现原因 add方法没加锁

1
2
3
4
5
6
7
8
9
10
11
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

解决方案

Vector

1
2
3
4
5
6
7
8
9
List<String> list = new Vector<>();

// 多个线程同时对集合进行修改
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString())
System.out.println(list);
}, "线程" + i).start();
}

add 方法被 synchronized 同步修辞,线程安全!因此没有并发异常

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}

Collections

1
2
3
4
5
6
7
8
9
List<String> list = Collections.synchronizedList(new ArrayList<>());

// 多个线程同时对集合进行修改
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString())
System.out.println(list);
}, "线程" + i).start();
}
1
2
3
4
5
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}

CopyOnWriteArrayList

  • 独占锁效率低:采用读写分离思想解决
  • 写线程获取到锁,其他写线程阻塞
  • 思想:当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器
    • 会造成数据不一致的问题。如果写线程还没来得及写回内存,其他的线程就会读到了脏数据

1
2
3
4
5
6
7
8
9
List list = new CopyOnWriteArrayList();

// 多个线程同时对集合进行修改
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString())
System.out.println(list);
}, "线程" + i).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

动态数组机制

  • 内部有个“volatile 数组”来保持数据。在“添加/修改/删除”数据 时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该 数组赋值给“volatile 数组”
  • 它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话, 效率比较高

线程安全机制

  • 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,读取到的数据总是最新的
  • 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”, 再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥锁”

补充

并发修改异常 解决方案
HashSet线程不安全 CopyOnWriteArraySet
HashMap线程不安全 ConcurrentHashMap

多线程锁

Synchonized 锁

方法 锁对象
普通同步方法 锁当前实例对象
静态同步方法 锁当前类的Class对象
同步方法块 锁Synchonized括号里面配置的对象

公平锁和非公平锁

公平锁采用先到先得的策略,每次获取锁之前都会检查队列里面有没有排队等待的线程。没有才会尝试获取锁,如果有就将当前线程追加到队列中

非公平锁采用”有机会插队”的策略,一个线程获取锁之前,要尝试获取锁,而不是在队列中等待,如果真的获取锁成功,说明线程虽然后启动的,但是先获得了锁,如果获取锁没有成功,那么将自身追加到队列中进行等待

可重入锁

可重入锁指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁,而其他的线程是不可以的。

Java中两种可重入锁: ReentrantLock 和 synchronized

死锁

概念

死锁:多个进程在运行过程中因争夺资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进

原因

  • 竞争资源
    • 系统中的资源可以分为两类:
      可剥夺资源:指某进程在获得这类资源后,该资源可以再被其他进程或系统剥夺,CPU和主存均属于可剥夺性资源;
      不可剥夺资源:当系统把这类资源分配给某进程后,再不能强行收回,只能在进程用完后自行释放,如磁带机、打印机等。
    • 产生死锁中的竞争资源之一指的是竞争不可剥夺资源(系统中只有一台打印机,可供进程P1使用,假定P1已占用了打印机,若P2继续要求打印机打印将阻塞)
    • 产生死锁中的竞争资源另外一种资源指的是竞争临时资源(临时资源包括硬件中断、信号、消息、缓冲区内的消息等),通常消息通信顺序进行不当,则会产生死锁
  • 进程间推进顺序非法

产生必要条件

  1. 互斥条件:在一段时间内某资源仅为一进程所占用。
  2. 请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
  4. 环路等待条件:在发生死锁时,必然存在一个进程—资源的环形链。

解决方案

预防死锁

  • 资源一次性分配:一次性分配所有资源,这样就不会再有请求了:(破坏请求条件)
  • 只要有一个资源得不到分配,也不给这个进程分配其他的资源:(破坏请保持条件)
  • 可剥夺资源:即当某进程获得了部分资源,但得不到其它资源,则释放已占有的资源(破坏不可剥夺条件)
  • 资源有序分配法:系统给每类资源赋予一个编号,每一个进程按编号递增的顺序请求资源,释放则相反(破坏环路等待条件)

避免死锁

  • 银行家算法
    • 首先需要定义状态和安全状态的概念。系统的状态是当前给进程分配的资源情况。因此,状态包含两个向量Resource(系统中每种资源的总量)和Available(未分配给进程的每种资源的总量)及两个矩阵Claim(表示进程对资源的需求)和Allocation(表示当前分配给进程的资源)。安全状态是指至少有一个资源分配序列不会导致死锁。当进程请求一组资源时,假设同意该请求,从而改变了系统的状态,然后确定其结果是否还处于安全状态。如果是,同意这个请求;如果不是,阻塞该进程知道同意该请求后系统状态仍然是安全的。

检测死锁

  1. 首先为每个进程和每个资源指定一个唯一的号码;
  2. 然后建立资源分配表和进程等待表。

解除死锁

  • 剥夺资源:从其它进程剥夺足够数量的资源给死锁进程,以解除死锁状态;
  • 撤销进程:可以直接撤消死锁进程或撤消代价最小的进程,直至有足够的资源可用,死锁状态消除为止

Callable & Future 接口

Callable 接口

目前介绍的创建线程方法可以是创建Thread类和使用Runnable创建线程,但是Runnable方法线程终止时无法返回结果,于是乎Callable接口诞生

  • Callable接口需要实现在完成时返回结果的call()方法, 该方法可以引发异常,而 run()则不能
  • 为实现 Callable 而必须重写 call 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
// 实现 runnable 接口
class MyThread implements Runnable{
@Override
public void run() {
}
}
// 实现 callable 接口
class MyThread implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}

Future 接口

call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。

将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦 Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式

重写方法 说明
public boolean cancel(boolean mayInterrupt) 用于停止任务,如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true 时才会中断任务
public Object get() 用于获取任务的结果,如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果
public boolean isDone() 如果任务完成,则返回 true,否则返回 false

要创建线程,需要 Runnable。为了获得结果,需要 future

FutureTask

Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建 FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成

  • 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态
  • 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果
  • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
  • 一旦计算完成,就不能再重新开始或取消计算
  • get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常,get 只计算一次,因此 get 方法放到最后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CallableDemo {

// 实现 runnable 接口
static class MyThread1 implements Runnable {

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "线程进入了 run方法");
} catch (Exception e){
e.printStackTrace();
}
}
}

// 实现 callable 接口
static class MyThread2 implements Callable{
@Override
public Long call() throws Exception {
try {
System.out.println(Thread.currentThread().getName() + "线程进入了 call方法,开始准备睡觉");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "睡醒了");
}catch (Exception e){
e.printStackTrace();
}
return System.currentTimeMillis();
}
}

public static void main(String[] args) throws Exception{
// 声明 runable
Runnable runable = new MyThread1();
// 声明 callable
Callable callable = new MyThread2();
// future-callable
FutureTask<Long> futureTask = new FutureTask(callable);

new Thread(futureTask, "线程二").start();
for (int i = 0; i < 10; i++) {
Long result1 = futureTask2.get();
System.out.println(result1);
}
new Thread(runable,"线程一").start();
}
}

CountDownLatch

CountDownLatch是一个同步工具类,它通过一个计数器来实现的,初始值为线程的数量。每当一个线程完成了自己的任务,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已执行完毕,然后在等待的线程就可以恢复执行任务。

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法 之后的语句。

方法 说明
CountDownLatch(int count) count为计数器的初始值(使用线程数)
countDown() 每调用一次计数器值-1,直到count被减为0,代表所有线程全部执行完毕
getCount() 获取当前计数器的值
await() 等待计数器变为0,即等待所有异步线程执行完毕
boolean await(long timeout, TimeUnit unit) 此方法至多会等待指定的时间,超时后会自动唤醒,若 timeout 小于等于零,则不会等待;boolean 类型返回值:若计数器变为零了,则返回 true;若指定的等待时间过去了,则返回 false

应用场景

  • 某个线程需要在其他n个线程执行完毕后再向下执行
  • 多个线程并行执行同一个任务,提高响应速度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CountDownLatchDemo {

// 6 个同学陆续离开教室后值班同学才可以关门
public static void main(String[] args) throws Exception{
// 定义一个数值为 6 的计数器
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {

new Thread(() ->{
try {
if (Thread.currentThread().getName().equals("同学 6")){
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName() + "离开");
// 计数器减一,不会阻塞
countDownLatch.countDown();
} catch (Exception e){
e.printStackTrace();
}

}, "同学" + i).start();
}
// 主线程 await 休息
System.out.println("主线程睡觉");
countDownLatch.await();
// 全部离开后自动唤醒主线程
System.out.println("全部离开了,现在的计数器为" + countDownLatch.getCount());
}
}

CyclicBarrier

它和CountDownLatch很相似,都可以使线程先等待然后再执行。不过CountDownLatch是使一批线程等待另一批线程执行完后再执行;

而CyclicBarrier只是使等待的线程达到一定数目后再让它们继续执行。故而CyclicBarrier内部也有一个计数器,计数器的初始值在创建对象时通过构造参数指定

CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一 次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后 的语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CyclicBarrierDemo {

// 定义神龙召唤需要的龙珠总数
private final static int NUMBER = 7;

// 集齐 7 颗龙珠就可以召唤神龙
public static void main(String[] args) {

// 定义循环栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("集齐" + NUMBER + "颗龙珠,现在召唤神龙!!!!!!!!!");
});
// 定义 7 个线程分别去收集龙珠
for (int i = 1; i <= 7; i++) {

new Thread(()->{
try {
if(Thread.currentThread().getName().equals("龙珠 3 号")){
System.out.println("龙珠 3 号抢夺战开始,孙悟空开启超级赛亚人模式!");
Thread.sleep(5000);
System.out.println("龙珠 3 号抢夺战结束,孙悟空打赢了,拿到了龙珠 3号!");
} else{

System.out.println(Thread.currentThread().getName() + "收集到
了!!!!");
}
cyclicBarrier.await();
} catch (Exception e){
e.printStackTrace();
}
},"龙珠" + i + "号").start();
}
}
}

Semaphore

  • Semaphore也叫信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
  • Semaphore可以看作是synchronized关键字的升级版本,能够控制线程并发的数量,这一点是synchronized关键字做不到的
  • 内部维护了一个计数器,可加可减,acquire()方法是做减法,release()方法是做加法
  • 用于限制线程并发的数量,如果不限制线程的并发数量,CPU资源会很快被耗尽

构造方法:

new Semaphore(),构造函数的permits参数是许可的意思,代表同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码

acquire()方法:

  • 无参数的acquire()作用是使用1个permits(许可),是减法操作,即减去对应的permits(许可);
  • 有参数的acquire(),可以指定减去多少个permits(许可)数量。

release()方法:

  • 无参数的release()方法会加上1个permits(许可),是加法操作,即加上对应的permits(许可);
  • 有参数的release()方法,可以动态添加permits(许可),比如new Semaphore(2),之后我们可以通过release(2),把permits(许可)数量变成4,这个可以说明构造方法中中的2并不是最终的permits(许可)数量,而只是初始数量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

// 抢车位, 10 部汽车 1 个停车位
public static void main(String[] args) throws Exception{
// 定义 3 个停车位
Semaphore semaphore = new Semaphore(1);
// 模拟 6 辆汽车停车
for (int i = 1; i <= 10; i++) {
Thread.sleep(100);
// 停车
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() + "找车位 ing");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "汽车停车成功!");
Thread.sleep(10000);

} catch (Exception e){
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName() + "溜了溜了");

semaphore.release();
}
}, "汽车" + i).start();
}
}
}

读写锁

概念

JAVA 的并发包提供了读写锁 ReentrantReadWriteLock, 它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称 为排他锁

线程进入读锁的前提条件

  • 没有其他线程的写锁

  • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。·

线程进入写锁的前提条件:

  • 没有其他线程的读锁
  • 没有其他线程的写锁

读写锁重要的特性

  • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平

  • 重进入:读锁和写锁都支持线程重进入

  • 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁

ReentrantReadWriteLock

ReentrantReadWriteLock 类的整体结构

ReentrantReadWriteLock 实现了 ReadWriteLock 接口, ReadWriteLock 接口定义了获取读锁和写锁的规范,具体需要实现类去实现; 同时其还实现了 Serializable 接口,表示可以进行序列化,可以看 到 ReentrantReadWriteLock 实现了自己的序列化逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ReentrantReadWriteLock implements ReadWriteLock,
java.io.Serializable {
/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

/** 使用默认(非公平)的排序属性创建一个新的ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}

/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this)
}

/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}

/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}

public static class ReadLock implements Lock, java.io.Serializable {}

public static class WriteLock implements Lock, java.io.Serializable {}
}

案例说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 使用 ReentrantReadWriteLock 对一个 hashmap 进行读和写操作
class MyCache {
// 创建 map 集合
private volatile Map<String,Object> map = new HashMap<>();
// 创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();

// 放数据
public void put(String key,Object value) {
// 添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " " + key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
// 放数据
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " " + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放写锁
rwLock.writeLock().unlock();
}
}

// 取数据
public Object get(String key) {
// 添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + " " + key);
// 暂停一会
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName() + " " + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放读锁
rwLock.readLock().unlock();
}
return result;
}
}

小结

  • 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)

  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)

    原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;

    而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

阻塞队列

BlockingQueue

  • 当队列是空的,从队列中获取元素的操作将会被阻塞
  • 当队列是满的,从队列中添加元素的操作将会被阻塞

  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素

  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多 个元素或者完全清空,使队列变得空闲起来并后续新增

常用的队列主要有以下两种:

  • 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。 从某种程度上来说这种队列也体现了一种公平性
  • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)

方法说明

方法 说明
offer(anObject) 如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit) 如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
put(anObject) 如果 BlockQueue 没有 空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续
poll(time) 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null
poll(long timeout, TimeUnit unit): 从 BlockingQueue 取出一个队首的对象, 如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知 道时间超时还没有数据可取,返回失败。
take() 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入;
drainTo() 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定 获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加 锁或释放锁。

常见的 BlockingQueue

ArrayBlockingQueue : 由数组结构组成的有界阻塞队列

  • 基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,内部还保存着两个整形变量,分别标识着队列的 头部和尾部在数组中的位置。

  • 生产者放入数据和消费者获取数据,都是共用同一个 锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;

  • ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除 元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。

LinkedBlockingQueue :由链表结构组成的有界阻塞队列

  • 基于链表的阻塞队列,其内部也维持着一 个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;
  • 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

  • 对于生 产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发 的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能

DelayQueue : 使用优先级队列实现的延迟无界阻塞队列

  • 元素只有当其指定的延迟时间到了,才能够从队列中获取到 该元素
  • 没有大小限制,因此往队列中插入数据的 操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞

PriorityBlockingQueue: 支持优先级排序的无界阻塞队列

  • 基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来 决定)
  • 需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。 特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间
  • 在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁

SynchronousQueue : 不存储元素的阻塞队列,也即单个元素的队列

  • 一种无缓冲的等待队列

  • 公平模式:会采用公平锁,并配合一个 FIFO 队列来阻塞 多余的生产者和消费者,从而体系整体的公平策略

  • 非公平模式(默认):采用非平 锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者, 如果生产者和消费者的处理速度有差距,则可能有某些生产者或者是消费者的数据永远都得不到处理

LinkedTransferQueue : 由链表组成的无界阻塞队列

  • 一个由链表结构组成的无界阻塞 TransferQueue 队 列。相对于其他阻塞队列,它多了 tryTransfer 和 transfer 方法

  • 采用一种预占模式。消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到 该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。

LinkedBlockingDeque:由链表组成的双向阻塞队列

  • 是一个由链表结构组成的双向阻塞队列,即可以从队 列的两端插入和移除元素。
  • 对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作 可能会阻塞住该线程直到队列状态变更为允许操作

  • 插入元素:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时将该元素插入

  • 读取元素:如果当前队列为空会阻塞住直到队列不为空然后返回元素

ThreadPool 线程池

线程池的相关概念

线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资 源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

  • Java 中的线程池是通过 Executor 框架实现的,涉及 Executor,Executors, ExecutorService,ThreadPoolExecutor

参数说明

参数 说明
corePoolSize 线程池的核心线程数
maximumPoolSize 能容纳的最大线程数
keepAliveTime 空闲线程存活时间
unit 存活的时间单位
workQueue 存放提交但未执行任务的队列
threadFactory 创建线程的工厂类
handler 等待队列满后的拒绝策略

拒绝策略

当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

拒绝策略 说明
CallerRunsPolicy 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常 信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执 行流程,影响后续的任务执行。
DiscardPolicy 直接丢弃
DiscardOldestPolicy 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞 队列 workQueue 中最老的一个任务,并将新任务加入

线程池的种类与创建

newCachedThreadPool

适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较 短,任务多的场景

方法 说明
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程.
  • 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  • 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
  • 当线程池中,没有可用线程,会重新创建一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 可缓存线程池
public static ExecutorService newCachedThreadPool(){

/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}

newFixedThreadPool

适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严 格限制的场景

方法 说明
newFixedThreadPool 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
  • 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  • 线程可以重复被使用,在显示关闭之前,都将一直存在
  • 超出一定量的线程被提交时候需在队列中等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 固定长度线程池
public static ExecutorService newFixedThreadPool(){

/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(10,
10,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

}

newSingleThreadExecutor

适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个 线程的场景

方法 说明
newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该 线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程, 那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各 个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即 可使用其他的线程。
  • 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 单一线程池
public static ExecutorService newSingleThreadExecutor(){

/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}

newScheduleThreadPool

适用于需要多个后台线程执行周期任务的场景

方法 说明
newScheduleThreadPool 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参 数,最大线程数为整形的最大数的线程池
  • 线程池中具有指定数量的线程,即便是空线程也将保留
  • 可定时或者 延迟执行线程活动
1
2
3
4
public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

newWorkStealingPool

适用于大耗时,可并行执行的场景

方法 说明
newWorkStealingPool jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个 任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执 行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
public static ExecutorService newWorkStealingPool(int parallelism) {

/**
* parallelism:并行级别,通常默认为 JVM 可用的处理器个数
* factory:用于创建 ForkJoinPool 中使用的线程。
* handler:用于处理工作线程未处理的异常,默认为 null
* asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列
*/
return new ForkJoinPool(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
}

案例说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 火车站 3 个售票口, 10 个用户买票
public class ThreadPoolDemo {

public static void main(String[] args) {
ExecutorService threadService = new ThreadPoolExecutor(3,
3,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());

try {
// 10 个人买票
for (int i = 1; i <= 10; i++) {
threadService.execute(()->{
try {
System.out.println(Thread.currentThread().getName() + " 窗口,开始卖票");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " 窗口买票结束");
}catch (Exception e){
e.printStackTrace();
}});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 完成后结束
threadService.shutdown();
}
}
}

线程池底层工作原理

  1. 在创建了线程池后,线程池中的线程数为零
  2. 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入 队列;
    • 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程 池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
    • 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉
    • 以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小
JDK内置拒绝策略 说明
AbortPolicy(默认) 直接抛出RejetedExecutionException异常阳止系统正常运行
CallerRunsPolicy “调用者运行“一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardoldestPolicy 抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务
DiscardPolicy 该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常如果允许任务丢失,这是最好的一种策略

注意事项

  • 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都 有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用 LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE, 容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参 数,自定义线程池

  • 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建

    | 参数 | 说明 |
    | ———————- | ————————————— |
    | corePoolSize | 线程池的核心线程数 |
    | maximumPoolSize | 能容纳的最大线程数 |
    | keepAliveTime | 空闲线程存活时间 |
    | unit | 存活的时间单位 |
    | workQueue | 存放提交但未执行任务的队列 |
    | threadFactory | 创建线程的工厂类 |
    | handler | 等待队列满后的拒绝策略 |

Fork/Join

框架简介

Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子 任务结果合并成最后的计算结果,并进行输出。

  • Fork:把一个复杂任务进行分拆
  • Join:把分拆任务的结果进行合并

任务分割:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里, 启动一个线程从队列里取数据,然后合并这些数据。

在 Java 的 Fork/Join 框架中

  • ForkJoinTask:要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。 该类提供了在任务中执行 fork 和 join 的机制。通常情况下不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类

    • RecursiveAction:用于没有返回结果的任务
    • RecursiveTask:用于有返回结果的任务
  • ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行

  • RecursiveTask:继承后可以实现递归调用的任务

框架的实现原理

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。

Fork 方法

当我们调用 ForkJoinTask 的 fork 方法时,程序会把 任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地执行这个任务,然后立即返回结果

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);//执行
}
else if (n >= m)
growArray();
}
}

join 方法

Join 方法的主要作用是阻塞当前线程并等待获取结果

1
2
3
4
5
6
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回 什么结果,任务状态有 4 种:

  • 已完成(NORMAL):直接返回任务结果。
  • 被取消(CANCELLED): 直接抛出 CancellationException
  • 信号(SIGNAL)
  • 出现异常(EXCEPTIONAL) : 直接抛出对应的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}

final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}

在 doJoin()方法流程如下:

  1. 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接 返回任务状态
  2. 如果没有执行完,则从任务数组里取出任务并执行
  3. 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记 录异常,并将任务状态设置为 EXCEPTIONAL。

框架的异常处理

方法 说明
isCompletedAbnormally() 来检查 任务是否已经抛出异常或已经被取消
getException 返回 Throwable 对象,如果任务被取消则返回
CancellationException。 如果任务没有完成或者没有抛出异常则返回 null

CompletableFuture

简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞, 可以使任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future 接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程 的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类

Future 与 CompletableFuture

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,将任务提交到线程池里面,然后会得到一个 Futrue,在 Future 里面有 isDone 方法来判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future 的主要缺点

  • 不支持手动完成

  • 不支持进一步的非阻塞调用

  • 不支持链式调用

  • 不支持多个 Future 合并

  • 不支持异常处理