JUC-11-AQS
AQS 理论介绍
源码
AQS
AQS(AbstractQueuedSynchronizer)是Java中的一个抽象基类,用于实现同步器(Synchronizer)。它是Java并发包中一些重要的同步工具的基础,如ReentrantLock、Semaphore、CountDownLatch等。AQS提供了一种底层机制,帮助开发者实现自定义的同步器,从而能够更灵活地管理多线程之间的竞争和协作。
同步器的概念:
同步器是一种用于控制多线程访问共享资源的机制,它通常包含两个重要的方法:acquire
(获取锁)和release
(释放锁)。这两个方法分别用于在多线程之间协调资源的访问,确保线程安全。
AQS的基本结构:
AQS的核心思想是维护一个等待队列(Wait Queue)和一个状态变量(State)。等待队列中的线程按照先进先出(FIFO)的顺序等待获取锁或资源。状态变量表示同步器的状态,不同的同步器可以定义不同的状态语义。
AQS的主要方法:
方法 | 说明 |
---|---|
acquire(int arg) |
用于获取锁或资源,如果无法获取,则将当前线程加入等待队列,然后自旋等待或阻塞,直到获取到锁或资源 |
release(int arg) |
用于释放锁或资源,同时通知等待队列中的某个线程可以继续尝试获取锁或资源 |
tryAcquire |
这独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false |
tryRelease |
独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false |
getState |
获取锁的标志state值 |
setState |
设置锁的标志state值 |
hasQueuedThreads |
检查是否有线程在等待队列中等待获取锁或资源 |
AQS的实现原理:
AQS通过使用CAS(Compare-And-Swap)操作来实现对状态变量的原子操作,从而保证线程安全。等待队列则是使用链表数据结构来管理等待线程。
AQS
中 维护了一个volatile int state
(代表共享资源)和一个FIFO
线程等待队列(多线程争用资源被阻塞时会进入此队列)。
整体就一个抽象的FIFO队列完成资源获取线程的排队工作,并通过int类变量,表示持有锁的状态
CLH: Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO
AQS的应用:
- ReentrantLock:Java中可重入锁的实现就是基于AQS的,它允许同一个线程多次获取同一个锁。
- Semaphore:Semaphore也是基于AQS的,它用于控制同时访问某个资源的线程数量。
- CountDownLatch:CountDownLatch是用于线程之间的协调,它允许一个或多个线程等待一组操作完成。
- CyclicBarrier:CyclicBarrier也是用于线程之间的协调,它允许一组线程互相等待达到某个同步点后再继续执行。
锁和同步器关系
锁:面向锁使用者,定义程序员和锁交互的API,屏蔽实现细节
同步器:统一规范简化锁的实现,将其抽象出来,屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,它是一切锁和同步组件实现的公共基础部分。
AQS解释说明
抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待,但等候线程仍然保留获取锁的可能且获取锁流程仍在继续。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配,该机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS使用volatle
的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
同步队列结构
CLH: Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO
AQS源码分析
AQS内部体系架构
AQS的int变量
AQS的CLH队列
AQS内部类Node
内部结构
属性说明
以ReentrantLock说明
可以明显看出公平锁与非公平锁的lock()
唯一区别在于公平锁在获取同步状态时多一个限制条件hasQueuedPredecessors()
,该方法是公平锁加锁时判断等待队列中否存在有效节点
对比公平锁和非公平锁的tryAcquire()
方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断hasQueuedPredecessors()
,它判断是否需要排队,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列
- 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象
lock()
acquire()
多线程场景模拟说明
非公平锁实现原理
三个线程(线程一、线程二、线程三)同时来加锁/释放锁
线程一加锁成功
如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二和线程三抢占锁失败,具体执行流程如下:
此时AQS
内部数据为:
线程二、线程三加锁失败:
由图可以看出,等待队列中的节点Node
是一个双向链表,这里SIGNAL
是Node
中waitStatus
属性,Node
中还有一个nextWaiter
属性,这个并未在图中画出来,这个到后面Condition
会具体讲解的。
具体看下抢占锁代码实现
java.util.concurrent.locks.ReentrantLock .NonfairSync
这里使用的ReentrantLock非公平锁,线程进来直接利用CAS
尝试抢占锁,如果抢占成功state
值回被改为1,且设置对象独占锁线程为当前线程。
线程二抢占锁失败
线程一抢占锁成功后,state
变为1,线程二通过CAS
修改state
变量必然会失败。此时AQS
中FIFO
队列中数据如图所示:
将线程二执行的逻辑一步步拆解来看:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
先看看tryAcquire()
的具体实现:
java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire()
:
nonfairTryAcquire()
方法中首先会获取state
的值:
- 如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加
state
值,这就是可重入锁的具体实现,累加state
值,释放锁的时候也要依次递减state
值 - 如果
state
为0,则执行CAS
操作,尝试更新state
值为1,如果更新成功则代表当前线程加锁成功。
以线程二为例,因为线程一已经将state
修改为1,所以线程二通过CAS
修改state
的值不会成功。加锁失败。
线程二执行tryAcquire()
后会返回false,接着执行addWaiter(Node.EXCLUSIVE)
逻辑,将自己加入到一个FIFO
等待队列中,代码实现如下 :
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter()
:
这段代码首先会创建一个和当前线程绑定的Node
节点,Node
为双向链表。此时等待对内中的tail
指针为空,直接调用enq(node)
方法将当前线程加入等待队列尾部 :
第一遍循环时tail
指针为空,进入if逻辑,使用CAS
操作设置head
指针,将head
指向一个新创建的Node
节点。此时AQS
中数据:
执行完成之后,head
、tail
、t
都指向第一个Node
元素。
接着执行第二遍循环,进入else
逻辑,此时已经有了head
节点,这里要操作的就是将线程二对应的Node
节点挂到head
节点后面。此时队列中就有了两个Node
节点:
addWaiter()
方法执行完后,会返回当前线程创建的节点信息。继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
逻辑,此时传入的参数为线程二对应的Node
节点信息:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued()
:
acquireQueued()
这个方法会先判断当前传入的Node
对应的前置节点是否为head
,如果是则尝试加锁。加锁成功过则将当前节点设置为head
节点,然后空置之前的head
节点,方便后续被垃圾回收掉。
如果加锁失败或者Node
的前置节点不是head
节点,就会通过shouldParkAfterFailedAcquire
方法 将head
节点的waitStatus
变为了SIGNAL=-1
,最后执行parkAndChecknIterrupt
方法,调用LockSupport.park()
挂起当前线程。
此时AQS
中的数据如下图:
此时线程二就静静的待在AQS
的等待队列里面了,等着其他线程释放锁来唤醒它。
线程三抢占锁失败
看完了线程二抢占锁失败的分析,那么再来分析线程三抢占锁失败就很简单了,先看看addWaiter(Node mode)
方法:
此时等待队列的tail
节点指向线程二,进入if
逻辑后,通过CAS
指令将tail
节点重新指向线程三。接着线程三调用enq()
方法执行入队操作,和上面线程二执行方式是一致的,入队后会修改线程二对应的Node
中的waitStatus=SIGNAL
。最后线程三也会被挂起。此时等待队列的数据如图:
线程一释放锁
现在来分析下释放锁的过程,首先是线程一释放锁,释放锁后会唤醒head
节点的后置节点,也就是我们现在的线程二,具体操作流程如下:
执行完后等待队列数据如下:
此时线程二已经被唤醒,继续尝试获取锁,如果获取锁失败,则会继续被挂起。如果获取锁成功,则AQS
中数据如图:
接着还是一步步拆解来看,先看看线程一释放锁的代码:
java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
这里首先会执行tryRelease()
方法,这个方法具体实现在ReentrantLock
中,如果tryRelease
执行成功,则继续判断head
节点的waitStatus
是否为0,前面我们已经看到过,head
的waitStatue
为SIGNAL(-1)
,这里就会执行unparkSuccessor()
方法来唤醒head
的后置节点,也就是我们上面图中线程二对应的Node
节点。
此时看ReentrantLock.tryRelease()
中的具体实现:
执行完ReentrantLock.tryRelease()
后,state
被设置成0,Lock对象的独占锁被设置为null。此时看下AQS
中的数据:
接着执行java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()
方法,唤醒head
的后置节点:
这里主要是将head
节点的waitStatus
设置为0,然后解除head
节点next
的指向,使head
节点空置,等待着被垃圾回收。
此时重新将head
指针指向线程二对应的Node
节点,且使用LockSupport.unpark
方法来唤醒线程二。
被唤醒的线程二会接着尝试获取锁,用CAS
指令修改state
数据。
执行完成后可以查看AQS
中数据:
此时线程二被唤醒,线程二接着之前被park
的地方继续执行,继续执行acquireQueued()
方法。
线程二唤醒继续加锁
此时线程二被唤醒,继续执行for
循环,判断线程二的前置节点是否为head
,如果是则继续使用tryAcquire()
方法来尝试获取锁,其实就是使用CAS
操作来修改state
值,如果修改成功则代表获取锁成功。接着将线程二设置为head
节点,然后空置之前的head
节点数据,被空置的节点数据等着被垃圾回收。
此时线程三获取锁成功,AQS
中队列数据如下:
等待队列中的数据都等待着被垃圾回收。
线程二释放锁/线程三加锁
当线程二释放锁时,会唤醒被挂起的线程三,流程和上面大致相同,被唤醒的线程三会再次尝试加锁,具体代码可以参考上面内容。具体流程图如下:
此时AQS
中队列数据如图:
公平锁实现原理
上面所有的加锁场景都是基于非公平锁来实现的,非公平锁是ReentrantLock
的默认实现,那我们接着来看一下公平锁的实现原理,这里先用一张图来解释公平锁和非公平锁的区别:
非公平锁执行流程:
这里我们还是用之前的线程模型来举例子,当线程二释放锁的时候,唤醒被挂起的线程三,线程三执行tryAcquire()
方法使用CAS
操作来尝试修改state
值,如果此时又来了一个线程四也来执行加锁操作,同样会执行tryAcquire()
方法。
这种情况就会出现竞争,线程四如果获取锁成功,线程三仍然需要待在等待队列中被挂起。这就是所谓的非公平锁,线程三辛辛苦苦排队等到自己获取锁,却眼巴巴的看到线程四插队获取到了锁。
公平锁执行流程:
公平锁在加锁的时候,会先判断AQS
等待队列中是存在节点,如果存在节点则会直接入队等待,具体代码如下.
公平锁在获取锁是也是首先会执行acquire()
方法,只不过公平锁单独实现了tryAcquire()
方法:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
这里会执行ReentrantLock
中公平锁的tryAcquire()
方法
#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire()
这里会先判断state
值,如果不为0且获取锁的线程不是当前线程,直接返回false代表获取锁失败,被加入等待队列。如果是当前线程则可重入获取锁。
如果state=0
则代表此时没有线程持有锁,执行hasQueuedPredecessors()
判断AQS
等待队列中是否有元素存在,如果存在其他等待线程,那么自己也会加入到等待队列尾部,做到真正的先来后到,有序加锁。具体代码如下:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors()
这段代码返回false
代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回true
则代表队列中存在等待节点,当前线程需要入队等待。
先判断head
是否等于tail
,如果队列中只有一个Node
节点,那么head
会等于tail
,接着判断head
的后置节点,这里肯定会是null
,如果此Node
节点对应的线程和当前的线程是同一个线程,那么则会返回false
,代表没有等待节点或者等待节点就是当前线程创建的Node
节点。此时当前线程会尝试获取锁。
如果head
和tail
不相等,说明队列中有等待线程创建的节点,此时直接返回true
,如果只有一个节点,而此节点的线程和当前线程不一致,也会返回true
非公平锁和公平锁的区别:
非公平锁性能高于公平锁性能。非公平锁可以减少
CPU
唤醒线程的开销,整体的吞吐效率会高点,CPU
也不必取唤醒所有线程,会减少唤起线程的数量非公平锁性能虽然优于公平锁,但是会存在导致线程饥饿的情况。在最坏的情况下,可能存在某个线程一直获取不到锁。不过相比性能而言,饥饿问题可以暂时忽略,这可能就是
ReentrantLock
默认创建非公平锁的原因之一了。
Condition实现原理
Condition简介
Condition
是在java 1.5中才出现的,它用来替代传统的Object
的wait()
、notify()
实现线程间的协作,相比使用Object
的wait()
、notify()
,使用Condition
中的await()
、signal()
这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition
。
其中AbstractQueueSynchronizer
中实现了Condition
中的方法,主要对外提供awaite(Object.wait())
和signal(Object.notify())
调用。
1 | public class ReentrantLockDemo { |
这里线程一先获取锁,然后使用await()
方法挂起当前线程并释放锁,线程二获取锁后使用signal
唤醒线程一。
Condition实现原理
我们还是用上面的demo
作为实例,执行的流程如下:
线程一执行await()
方法:
先看下具体的代码实现,#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()
:
await()
方法中首先调用addConditionWaiter()
将当前线程加入到Condition
队列中。
执行完后我们可以看下Condition
队列中的数据
具体实现代码为:
这里会用当前线程创建一个Node
节点,waitStatus
为CONDITION
。接着会释放该节点的锁,调用之前解析过的release()
方法,释放锁后此时会唤醒被挂起的线程二,线程二会继续尝试获取锁。
接着调用isOnSyncQueue()
方法判断当前节点是否为Condition
队列中的头部节点,如果是则调用LockSupport.park(this)
挂起Condition
中当前线程。此时线程一被挂起,线程二获取锁成功。
具体流程如下图:
线程二执行signal()
方法:
首先我们考虑下线程二已经获取到锁,此时AQS
等待队列中已经没有了数据。
接着就来看看线程二唤醒线程一的具体执行流程:
先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。
接着调用doSignal()
方法来唤醒线程。
这里先从transferForSignal()
方法来看,通过上面的分析我们知道Condition
队列中只有线程一创建的一个Node
节点,且waitStatue
为CONDITION
,先通过CAS
修改当前节点waitStatus
为0,然后执行enq()
方法将当前线程加入到等待队列中,并返回当前线程的前置节点。
加入等待队列的代码在上面也已经分析过,此时等待队列中数据如下图:
接着开始通过CAS
修改当前节点的前置节点waitStatus
为SIGNAL
,并且唤醒当前线程。此时AQS
中等待队列数据为:
线程一被唤醒后,继续执行await()
方法中的while循环
因为此时线程一的waitStatus
已经被修改为0,所以执行isOnSyncQueue()
方法会返回false
。跳出while
循环。
接着执行acquireQueued()
方法,这里之前也有讲过,尝试重新获取锁,如果获取锁失败继续会被挂起。直到另外线程释放锁才被唤醒。
此时线程一的流程都已经分析完了,等线程二释放锁后,线程一会继续重试获取锁,流程到此终结。
Condition总结
- Condition可以精准的对多个不同条件进行控制,wait/notify只能和synchronized关键字一起使用,并且只能唤醒一个或者全部的等待队列;
- Condition需要使用Lock进行控制,使用的时候要注意lock()后及时的unlock(),Condition有类似于await的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是park/unpark的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是wait/notify会产生先唤醒再挂起的死锁。