Java--ReentrantLock
Java–ReentrantLock
在JDK
的代码中,我们用于实现同步方法
的常用的来说,有三种
synchronize
ReentrantLock
countDownLatch
其中,我们已经在上一篇文章中讲解了synchronize,现在,我们讲解下ReentrantLock
。
在ReentrantLock
中,我们的锁既可以是公平锁
,也可以是非公平锁
。至于具体是哪一种,是根据我们在初始化ReentrantLock
时,通过构造函数的请求参数来设置的。另外,提一句,synchronize
只能是非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过上面的代码,我们可以发现,如果我们调用无参的构造函数ReentrantLock()
,则默认的是非公平锁
;而如果我们调用的是有参的构造函数ReentrantLock(boolean fair)
,则取决于我们传入的参数,如果是false
,则采用的是非公平锁
,而如果是true
,则采用的是公平锁
。
那么,对于公平锁
和非公平锁
,ReentrantLock
是怎么实现的呢?
我们通过上面的代码可以看出,如果我们采用ReentrantLock()
和ReentrantLock(false)
的时候,此时获取的是非公平锁
。此时,我们使用的是NonfairSync
这个静态内部类产生的对象,那么ReentrantLock
是怎么实现非公平锁的呢?
在我们真正的讲解ReentrantLock
的非公平锁,其实,我们就是在讲NonFairSync
这个类。既然,我们想要讲解这个类,那么就面临着我们要知道这个类的一个类图:
接下来,问主要看下这个类。
NonFairSync实现非公平锁(加锁)
首先,我们看下NonfairSync
的源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
}
在上面,我们已经罗列出了所有的主要的源码的信息。接下来,我们一点一点的进行分析。
lock方法
首先,我们查看NonFairSync
的lock
方法,我们发现,其实对于lock
方法而言,很简单。就是如果当前线程需要锁,则首先通过CAS
自旋的方式,去获取锁,如果锁不存在,那么就去执行acquire
方法。
acquire方法
然而,在acquire
方法中,我们看到,主要的业务逻辑在if
的判断中。在这里,我们发现,JDK
的库工程师们采用了模板方法
的设计模式,将整个加锁的过程,已经固化了,只是在不同的地方,需要实现者自己去实现而已。因此,tryAcquire
方法就是由NonFairSync
自己去实现的。而NonFairSync
中的tryAcquire
方法,仅仅只是调用底层的nonfairTryAcquire
方法而已。而在nonfairTryAcquire
方法中,我们发现一个神奇的事情,那就是这个方法中对于获取锁,它仍然通过了一次CAS
自旋的方式去获取锁。如果没有获取到,才会执行下面的步骤。
那在这里就有一个问题了,因为我们在之前的lock
方法中,已经通过CAS
自旋的方式去尝试获取锁而失败了,那么为什么我们还要在nonfairTryAcquire
中再执行一次呢?其实,这里面有一个效率的问题。在这里,是一个典型的通过增加一些冗余代码的方式,来提高执行效率的问题。
OK,到这里,我们开始重新的讲解一下acquire
这个方法。在这个方法中,我们发现:他的主要部分是放在了if
语句的里面。在if
语句中,采用的是短路的原则,来进行一步一步的设置。接下俩,我们讲解下:
tryAcquire方法
我们首先执行的是tryAcquire
方法。通过名字可以知道,这个代码的含义是”获取锁”。只有我们获取失败的时候,才会执行后续的流程。
tryAcquire
方法调用的是nonfairTryAcquire
方法。- 在
nonfairTryAcquire
中,我们首先会拿到当前线程,通过新创建一个Node
的方式,将当前线程信息存放到Node
信息中。此时我们会判断当前线程是否已经获取到锁。- 如果没有获取到锁,则通过一次
CAS
自旋来获取锁,如果获取成功,此时我们继续下当前线程的信息,以便后续重入锁的时候使用。 - 如果我们获取锁失败,此时我们查看当前线程是否已经获取到锁,如果发现已经获取,则直接通过重入锁来获取当前锁。并且将
state
加1
。在这里,state
表示的是锁的重入次数。 - 如果上述两种情况,我们都没有获取到锁,则直接返回
false
,表示获取锁失败。
- 如果没有获取到锁,则通过一次
addWaiter方法
这个方法的调用前提是在tryAcquire
获取锁失败的时候进行调用的。这个方法的主要目的是为了将未获取到锁的线程,通过Node
的方式来添加到队列
中。到此,我们需要介绍一下AbstractQueuedSynchronizer
也就是AQS
和他用户存储阻塞线程的队列
的数据结构。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer
,我们俗称AQS
。这个类是实现ReentrantLock
锁的重要类。这个类采用的是设计模式中的模板方法
。他帮我们默认了提供了一套关于锁的解决方法。但是对于内部的一些实现,是需要子类去自己实现的,例如tryAcquire
方法。同时,我们的NonFairSync
也是继承了这个类。在这个类中,存在了两个成员变量head
和tail
。这两个变量,一个是指定了队列
的头节点,一个指定了队列
的尾节点。注意的是,这个队列
采用的是懒加载
模式。默认情况下,head
和tail
的值为null
。如果举例,我们可以这样表示:
对于AbstractQueuedSynchronizer
的介绍,我们会在后续的文章中进行讲解的。
队列
的存储结构
在AbstractQueuedSynchronizer
中,我们所有的未获取到锁的线程都会添加到一个队列
当中。而这个队列,采用的是一个数据结构中典型的无头的双向链表
的数据模型。对于列表
中的每一个节点Node
,主要是由
waitStatus
- 当前节点的状态,其中有SIGNAL
- 值为-1
,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行CANCELLED
- 值为1
,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化CONDITION
- 值为-2
,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁PROPAGATE
- 值为-3
,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态0
- 值为0
,代表初始化状态
prev
- 当前节点的前置节点next
- 当前节点的后置节点thread
- 当前节点对应的线程
当前,Node
节点还存在一些其他的变量,在这里,我们主要关注的就是上面的这几个。对于AQS
而言,我们创建的队列正是通过Node
节点组成的一个FIFO
的队列。他的格式如下
OK,有了上面的前提,我们再看看addWaiter
这段代码。同样的,这段代码中,JDK
的库工程师们依然采用了通过冗余代码来提高效率的方式来提升性能,因此,我们只需要看enq
这个方法即可。其中,源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在这段代码中,我们依然是通过一个死循环的方式来执行的。首先我们会判断tail
这个指针。如果我们发现tail
指针为null
,那么此时这个队列中根本就不存在。这个也是之前我们讲解的,AQS
的队列
采用的懒加载
的模式进行初始化的(也就是说,并不是事先初始化,而是在我们使用的时候进行初始化)。因此,在for
循环中的第一个if
中,就是为了来初始化队列
的操作。在初始化完成操作之后,会将head
和tail
都指向这个新创建的Node
节点。注意,这个节点不存在任何的thread
信息。它仅仅指向的是一个阻塞队列
。
在这里,我们会想到,会不会存在并发的问题呢?其实,是不存在。因为就算是有多个线程进入了for
循环内,此时多个线程都获取到了tail
为null
的情况,此时都回去执行compareAndSetHead
方法。但是在compareAndSetHead
中,采用了CAS
自旋锁的方式进行设置,因此,只可能有一个线程成功,其他的线程都是不会成功的。这样也就解决了并发
的问题。
当执行完第一个if
语句后,或者是当前的队列
不为空时,此时会执行else
里面的语句。此时,我们会当前节点node
的prev
指向队列的前一个节点,然后通过CAS
自旋的方式,将当前节点添加到队列的后面。然后将前面一个节点next
指向当前节点。最后返回插入节点的prev
节点。
acquireQueued方法
你有可能会问,到目前为止,我们都是将线程添加到了阻塞队列中,但是并没有去获取锁啊。别急,acquireQueued
方法就是对于锁队列
的操作过程。同时,也是lock
方法的重点方法,我们会以最简便的方式,来进行讲解。
方法的源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
首先,我们发现这里仍然是一个死循环
的方式。然后再for
循环中进行了两次判断。
对于第一个if
语句,我们首先获取当前节点prev
节点是不是一个head
节点,如果不是head
节点,则跳出当前的if
判断。那么,这里就有一个问题,我们为什么非要看当前节点是不是在head
之后的第一个呢?是因为
head
节点是不存在获取权限判断的。head
节点存在的目的,是为了形成一个队列- 我们始终认为,
head
节点后面的第一个节点是目前正在获取锁的节点,对于之后的节点,都需要前置的节点已经获取锁才可以。 - 对于当前获取锁的节点,我们需要保证我们的后续节点的
waitState
一定是SIGNAL
。
所以,在这里,我们解释了为什么判断当前的节点的prev
一定是head
的原因。接下来,就是如果当前的node
是head
的后置节点,我们就要去获取锁,如果获取锁失败,则会执行第二个if
语句。如果获取成功
- 将当前的node节点设置为
head
节点。 - 将之前的head节点弃用,从
GCRoots
下摘掉,帮助GC
进行清理 - 将
failed
改成false,表示我们已经获取到锁。防止在finally
中执行cancelAcquire
操作。 - 通过返回一个
false
你有可能会问,我们已经拿到锁了,为什么还要返回false
呢?其实,这个方法的返回值表示的并不是我们有没有拿到锁,而是我们在获取锁的过程中,是否发生了interrupt
操作。正是因为我们拿到了锁,所以,才是返回false
。
接下来,我们看下关于第二个if
操作。第二个if
操作,主要对于如果我们获取锁失败,或者当前节点不是head
的后继节点的情况,这样的情况,笼统的讲,就是将自己变成waiting
状态。具体如下:
- 程序首先执行
shouldParkAfterFailedAcquire
方法,通过这个方法,我们可以猜到,这个方法的目的就是当我们获取锁失败的时候,应该去阻塞我们的线程。在这里Park
和我们的wait
是很像的。通过方法的源码我们可以知道:- 如果当前节点的
prev
节点是SIGNAL
状态,则是允许我们将当前节点阻塞的。因为只有是SIGNAL
状态的节点,才会被head
进行唤醒,并且获取锁。 - 如果我们的
prev
节点是CANCELLED
状态,那么证明这些节点是在等待的过程中,已经取消的了,是不需要在获取锁的。我们会一直往前找,直到找到第一个节点的状态是SIGNAL
的节点为止,然后将当前节点挂在这个节点的后面。 - 如果我们的节点是一个初始化的节点,那么需要将前置节点的
waitState
设置为SIGNAL
状态。
- 如果当前节点的
- 对于上面的第
2
、3
点,是需要重新执行for
循环的。因为我们之前的队列
是存在问题。因此需要重新执行。之后当队列不存在问题,此时,我们会执行parkAndCheckInterrupt
方法。通过方法的源码可以知道,它其实是调用LockSupport.park
的方法,将线程进行阻塞。
cancelAcquire方法
如果程序在执行的过程中,发生了异常,此时会执行finally
的方法。正常来讲,finally
方法是方法结束后必须执行的方法,那么在这里,我们为什么要说是在发生异常后执行的呢?因为如果程序正常退出for
循环,failed
一定是false
。只有当程序发生异常,此时failed
才会为true
。接下来,我们看下cancelAcquire
方法的源码:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
此时在程序中,我们进行了判断。
- 如果当前节点为
null
,则不作任何处理,直接方法结束 - 将当前
node
与thread
进行解绑 - 如果我们当前节点的
prev
的waitState
大于0
,则一直往前找,直到找到waitState
为SIGNAL
或者是初始化的。 - 获取这个
waitState
为SIGNAL
或者是初始化的的节点的后置节点 - 将当前的节点设置为
CANCELLED
- 在这里,我们要分为
3
中情况进行考虑- 如果我们当前节点是
tail
节点,则我们将当前节点的tail
指针指向当前节点的prev
节点。然后将当前节点的prev
节点的next
指针设置为null
,在下面我们用图1进行表示。
- 对于第二种情况,我们判断两个条件,如果满足以下条件,则获取当前
node
节点的next
节点。如果next
节点不为null
,并且next
节点的waitState
为SIGNAL
,则我们将当前prev
的next
指针指向node
的next
节点,在下面,我们用图2进行表示。- 当前
node
的prev
节点不是head
节点 - 当前
node
的prev
节点的waitState
为SIGNAL
,或者我们可以将当前node
的prev
节点的waitState
设置为SIGNAL
- 当前
node
的prev
节点的thread
不为null
- 当前
- 对于第三种情况,我们首先获取
node
的waitState
,如果waitState
小于0
,则更新为0
。如果当前node
的next
节点为null
或者是next
节点的waitState
大于0,则从tail
节点开始往前,一致找到在node
节点之后的第一个waitState
小于0的节点,然后将当前节点执行LockSupport.unpark
。如果没有找到,则不进行任何操作,在下面,我们用图3进行表示。
- 如果我们当前节点是
unlock方法
首先,废话不多说,我们先上源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
通过当前代码,我们可以发现:首先我们会尝试释放锁,如果释放锁成功,我们会当前head
指向的node
进行判断,
- 如果不为
null
,则代表队列有值 - 如果
waitState
不是初始化
只有满足以上两点,我们才能够将锁给接下来的线程。
首先,我们看下tryRelease
方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先我们会对当前的state
减1操作,代表我们已经出了一次同步方法。如果此时我们的state
为0,代表此时线程已经不再需要锁,同时我们会把重入锁的对象设置为null
。
接下来,我们看下unparkSuccessor
方法。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先我们获取当前节点的waitState
,如果当前节点为SIGNAL
,则将节点更新为初始化状态。这是因为我们的入参node
正好是head
。而head
正好是我们之前认为的已经获取到锁的线程,现在这个线程已经释放了锁,因此,我们必须将该线程的waitState
的状态从小于0改掉,在这里,我们一般是改成0,然后我们会从tail
开始往回找,直到找到最后一个waitState
为SIGNAL
的,如果存在,我们直接将其唤醒即可。
ReentrantLock实现公平锁
lock方法
对于ReentrantLock
而言,它的公平锁
和非公平锁
非常的类似,在这里,我们进行不同部分的代码讲解即可:
final void lock() {
acquire(1);
}
我们发现,在这里,它并没有通过CAS
在一开始的时候去获取锁,而是走了通用的逻辑acquire
。而我们的公平锁类FairSync
通用的也实现了tryAcquire
方法。
tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
hasQueuedPredecessors
在这里,我们发现,唯一的不同在于方法hasQueuedPredecessors
,而hasQueuedPredecessors
的源码为:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在这个方法中,进行了判断,对于head
的后置节点是否是当前的节点,如果不是当前的节点,则代表在当前的node节点之前,有更加重要的节点要获取锁,如果是当前节点,则代表当前节点就是要获取锁的节点。这样的好处是所有的节点都是按照FIFO
的方式来获取锁。保证了获取锁的公平性。
总结
ReentrantLock
在实际的开发过程中是十分的重要的。对于ReentrantLock
的源码的研究是十分的有必要的。
转载请注明来源,欢迎指出任何有错误或不够清晰的表达。可以邮件至 gouqiangshen@126.com
文章标题:Java--ReentrantLock
文章字数:5.4k
本文作者:BiggerShen
发布时间:2019-11-08, 21:07:14
最后更新:2024-01-16, 03:51:15
原始链接:https://shengouqiang.cn/JavaLock/JavaLockDay03/版权声明: 转载请保留原文链接及作者。