Java--ReentrantLock

Java–ReentrantLock

JDK的代码中,我们用于实现同步方法的常用的来说,有三种

  1. synchronize
  2. ReentrantLock
  3. countDownLatch

其中,我们已经在上一篇文章中讲解了synchronize,现在,我们讲解下ReentrantLock

ReentrantLock中,我们的锁既可以是公平锁,也可以是非公平锁。至于具体是哪一种,是根据我们在初始化ReentrantLock时,通过构造函数的请求参数来设置的。另外,提一句,synchronize只能是非公平锁。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

通过上面的代码,我们可以发现,如果我们调用无参的构造函数ReentrantLock(),则默认的是非公平锁;而如果我们调用的是有参的构造函数ReentrantLock(boolean fair),则取决于我们传入的参数,如果是false,则采用的是非公平锁,而如果是true,则采用的是公平锁

那么,对于公平锁非公平锁ReentrantLock是怎么实现的呢?

我们通过上面的代码可以看出,如果我们采用ReentrantLock()ReentrantLock(false)的时候,此时获取的是非公平锁。此时,我们使用的是NonfairSync这个静态内部类产生的对象,那么ReentrantLock是怎么实现非公平锁的呢?

在我们真正的讲解ReentrantLock的非公平锁,其实,我们就是在讲NonFairSync这个类。既然,我们想要讲解这个类,那么就面临着我们要知道这个类的一个类图:

ReentrantLockNonFairLock

接下来,问主要看下这个类。

NonFairSync实现非公平锁(加锁)

首先,我们看下NonfairSync的源码:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
        * Performs lock.  Try immediate barge, backing up to normal
        * acquire on failure.
        */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

     public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
}

在上面,我们已经罗列出了所有的主要的源码的信息。接下来,我们一点一点的进行分析。

lock方法

首先,我们查看NonFairSynclock方法,我们发现,其实对于lock方法而言,很简单。就是如果当前线程需要锁,则首先通过CAS自旋的方式,去获取锁,如果锁不存在,那么就去执行acquire方法。

acquire方法

然而,在acquire方法中,我们看到,主要的业务逻辑在if的判断中。在这里,我们发现,JDK的库工程师们采用了模板方法的设计模式,将整个加锁的过程,已经固化了,只是在不同的地方,需要实现者自己去实现而已。因此,tryAcquire方法就是由NonFairSync自己去实现的。而NonFairSync中的tryAcquire方法,仅仅只是调用底层的nonfairTryAcquire方法而已。而在nonfairTryAcquire方法中,我们发现一个神奇的事情,那就是这个方法中对于获取锁,它仍然通过了一次CAS自旋的方式去获取锁。如果没有获取到,才会执行下面的步骤。

那在这里就有一个问题了,因为我们在之前的lock方法中,已经通过CAS自旋的方式去尝试获取锁而失败了,那么为什么我们还要在nonfairTryAcquire中再执行一次呢?其实,这里面有一个效率的问题。在这里,是一个典型的通过增加一些冗余代码的方式,来提高执行效率的问题。

OK,到这里,我们开始重新的讲解一下acquire这个方法。在这个方法中,我们发现:他的主要部分是放在了if语句的里面。在if语句中,采用的是短路的原则,来进行一步一步的设置。接下俩,我们讲解下:

tryAcquire方法

我们首先执行的是tryAcquire方法。通过名字可以知道,这个代码的含义是”获取锁”。只有我们获取失败的时候,才会执行后续的流程。

  1. tryAcquire方法调用的是nonfairTryAcquire方法。
  2. nonfairTryAcquire中,我们首先会拿到当前线程,通过新创建一个Node的方式,将当前线程信息存放到Node信息中。此时我们会判断当前线程是否已经获取到锁。
    • 如果没有获取到锁,则通过一次CAS自旋来获取锁,如果获取成功,此时我们继续下当前线程的信息,以便后续重入锁的时候使用。
    • 如果我们获取锁失败,此时我们查看当前线程是否已经获取到锁,如果发现已经获取,则直接通过重入锁来获取当前锁。并且将state1。在这里,state表示的是锁的重入次数。
    • 如果上述两种情况,我们都没有获取到锁,则直接返回false,表示获取锁失败。

addWaiter方法

这个方法的调用前提是在tryAcquire获取锁失败的时候进行调用的。这个方法的主要目的是为了将未获取到锁的线程,通过Node的方式来添加到队列中。到此,我们需要介绍一下AbstractQueuedSynchronizer也就是AQS和他用户存储阻塞线程的队列的数据结构。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,我们俗称AQS。这个类是实现ReentrantLock锁的重要类。这个类采用的是设计模式中的模板方法。他帮我们默认了提供了一套关于锁的解决方法。但是对于内部的一些实现,是需要子类去自己实现的,例如tryAcquire方法。同时,我们的NonFairSync也是继承了这个类。在这个类中,存在了两个成员变量headtail。这两个变量,一个是指定了队列的头节点,一个指定了队列的尾节点。注意的是,这个队列采用的是懒加载模式。默认情况下,headtail的值为null。如果举例,我们可以这样表示:

ReentrantLockNonFairLock

对于AbstractQueuedSynchronizer的介绍,我们会在后续的文章中进行讲解的。

队列的存储结构

AbstractQueuedSynchronizer中,我们所有的未获取到锁的线程都会添加到一个队列当中。而这个队列,采用的是一个数据结构中典型的无头的双向链表的数据模型。对于列表中的每一个节点Node,主要是由

  • waitStatus - 当前节点的状态,其中有
    • SIGNAL - 值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行
    • CANCELLED - 值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
    • CONDITION - 值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
    • PROPAGATE - 值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
    • 0 - 值为0,代表初始化状态
  • prev - 当前节点的前置节点
  • next - 当前节点的后置节点
  • thread - 当前节点对应的线程

当前,Node节点还存在一些其他的变量,在这里,我们主要关注的就是上面的这几个。对于AQS而言,我们创建的队列正是通过Node节点组成的一个FIFO的队列。他的格式如下

AQS的FIFO队列


OK,有了上面的前提,我们再看看addWaiter这段代码。同样的,这段代码中,JDK的库工程师们依然采用了通过冗余代码来提高效率的方式来提升性能,因此,我们只需要看enq这个方法即可。其中,源码如下:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在这段代码中,我们依然是通过一个死循环的方式来执行的。首先我们会判断tail这个指针。如果我们发现tail指针为null,那么此时这个队列中根本就不存在。这个也是之前我们讲解的,AQS队列采用的懒加载的模式进行初始化的(也就是说,并不是事先初始化,而是在我们使用的时候进行初始化)。因此,在for循环中的第一个if中,就是为了来初始化队列的操作。在初始化完成操作之后,会将headtail都指向这个新创建的Node节点。注意,这个节点不存在任何的thread信息。它仅仅指向的是一个阻塞队列

在这里,我们会想到,会不会存在并发的问题呢?其实,是不存在。因为就算是有多个线程进入了for循环内,此时多个线程都获取到了tailnull的情况,此时都回去执行compareAndSetHead方法。但是在compareAndSetHead中,采用了CAS自旋锁的方式进行设置,因此,只可能有一个线程成功,其他的线程都是不会成功的。这样也就解决了并发的问题。

当执行完第一个if语句后,或者是当前的队列不为空时,此时会执行else里面的语句。此时,我们会当前节点nodeprev指向队列的前一个节点,然后通过CAS自旋的方式,将当前节点添加到队列的后面。然后将前面一个节点next指向当前节点。最后返回插入节点的prev节点。

acquireQueued方法

你有可能会问,到目前为止,我们都是将线程添加到了阻塞队列中,但是并没有去获取锁啊。别急,acquireQueued方法就是对于锁队列的操作过程。同时,也是lock方法的重点方法,我们会以最简便的方式,来进行讲解。

方法的源码如下:

   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    } 

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }       

首先,我们发现这里仍然是一个死循环的方式。然后再for循环中进行了两次判断。

对于第一个if语句,我们首先获取当前节点prev节点是不是一个head节点,如果不是head节点,则跳出当前的if判断。那么,这里就有一个问题,我们为什么非要看当前节点是不是在head之后的第一个呢?是因为

  1. head节点是不存在获取权限判断的。head节点存在的目的,是为了形成一个队列
  2. 我们始终认为,head节点后面的第一个节点是目前正在获取锁的节点,对于之后的节点,都需要前置的节点已经获取锁才可以。
  3. 对于当前获取锁的节点,我们需要保证我们的后续节点的waitState一定是SIGNAL

所以,在这里,我们解释了为什么判断当前的节点的prev一定是head的原因。接下来,就是如果当前的nodehead的后置节点,我们就要去获取锁,如果获取锁失败,则会执行第二个if语句。如果获取成功

  1. 将当前的node节点设置为head节点。
  2. 将之前的head节点弃用,从GCRoots下摘掉,帮助GC进行清理
  3. failed改成false,表示我们已经获取到锁。防止在finally中执行cancelAcquire操作。
  4. 通过返回一个false

你有可能会问,我们已经拿到锁了,为什么还要返回false呢?其实,这个方法的返回值表示的并不是我们有没有拿到锁,而是我们在获取锁的过程中,是否发生了interrupt操作。正是因为我们拿到了锁,所以,才是返回false

接下来,我们看下关于第二个if操作。第二个if操作,主要对于如果我们获取锁失败,或者当前节点不是head的后继节点的情况,这样的情况,笼统的讲,就是将自己变成waiting状态。具体如下:

  1. 程序首先执行shouldParkAfterFailedAcquire方法,通过这个方法,我们可以猜到,这个方法的目的就是当我们获取锁失败的时候,应该去阻塞我们的线程。在这里Park和我们的wait是很像的。通过方法的源码我们可以知道:
    • 如果当前节点的prev节点是SIGNAL状态,则是允许我们将当前节点阻塞的。因为只有是SIGNAL状态的节点,才会被head进行唤醒,并且获取锁。
    • 如果我们的prev节点是CANCELLED状态,那么证明这些节点是在等待的过程中,已经取消的了,是不需要在获取锁的。我们会一直往前找,直到找到第一个节点的状态是SIGNAL的节点为止,然后将当前节点挂在这个节点的后面。
    • 如果我们的节点是一个初始化的节点,那么需要将前置节点的waitState设置为SIGNAL状态。
  2. 对于上面的第23点,是需要重新执行for循环的。因为我们之前的队列是存在问题。因此需要重新执行。之后当队列不存在问题,此时,我们会执行parkAndCheckInterrupt方法。通过方法的源码可以知道,它其实是调用LockSupport.park的方法,将线程进行阻塞。

cancelAcquire方法

如果程序在执行的过程中,发生了异常,此时会执行finally的方法。正常来讲,finally方法是方法结束后必须执行的方法,那么在这里,我们为什么要说是在发生异常后执行的呢?因为如果程序正常退出for循环,failed一定是false。只有当程序发生异常,此时failed才会为true。接下来,我们看下cancelAcquire方法的源码:

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

此时在程序中,我们进行了判断。

  1. 如果当前节点为null,则不作任何处理,直接方法结束
  2. 将当前nodethread进行解绑
  3. 如果我们当前节点的prevwaitState大于0,则一直往前找,直到找到waitStateSIGNAL或者是初始化的。
  4. 获取这个waitStateSIGNAL或者是初始化的的节点的后置节点
  5. 将当前的节点设置为CANCELLED
  6. 在这里,我们要分为3中情况进行考虑
    • 如果我们当前节点是tail节点,则我们将当前节点的tail指针指向当前节点的prev节点。然后将当前节点的prev节点的next指针设置为null,在下面我们用图1进行表示。
      图1
    • 对于第二种情况,我们判断两个条件,如果满足以下条件,则获取当前node节点的next节点。如果next节点不为null,并且next节点的waitStateSIGNAL,则我们将当前prevnext指针指向nodenext节点,在下面,我们用图2进行表示。
      • 当前nodeprev节点不是head节点
      • 当前nodeprev节点的waitStateSIGNAL,或者我们可以将当前nodeprev节点的waitState设置为SIGNAL
      • 当前nodeprev节点的thread不为null
        图2
    • 对于第三种情况,我们首先获取nodewaitState,如果waitState小于0,则更新为0。如果当前nodenext节点为null或者是next节点的waitState大于0,则从tail节点开始往前,一致找到在node节点之后的第一个waitState小于0的节点,然后将当前节点执行LockSupport.unpark。如果没有找到,则不进行任何操作,在下面,我们用图3进行表示。
      图3

unlock方法

首先,废话不多说,我们先上源码:

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

通过当前代码,我们可以发现:首先我们会尝试释放锁,如果释放锁成功,我们会当前head指向的node进行判断,

  1. 如果不为null,则代表队列有值
  2. 如果waitState不是初始化

只有满足以上两点,我们才能够将锁给接下来的线程。

首先,我们看下tryRelease方法

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

首先我们会对当前的state减1操作,代表我们已经出了一次同步方法。如果此时我们的state为0,代表此时线程已经不再需要锁,同时我们会把重入锁的对象设置为null

接下来,我们看下unparkSuccessor方法。

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

首先我们获取当前节点的waitState,如果当前节点为SIGNAL,则将节点更新为初始化状态。这是因为我们的入参node正好是head。而head正好是我们之前认为的已经获取到锁的线程,现在这个线程已经释放了锁,因此,我们必须将该线程的waitState的状态从小于0改掉,在这里,我们一般是改成0,然后我们会从tail开始往回找,直到找到最后一个waitStateSIGNAL的,如果存在,我们直接将其唤醒即可。

ReentrantLock实现公平锁

lock方法

对于ReentrantLock而言,它的公平锁非公平锁非常的类似,在这里,我们进行不同部分的代码讲解即可:

    final void lock() {
    acquire(1);
    }

我们发现,在这里,它并没有通过CAS在一开始的时候去获取锁,而是走了通用的逻辑acquire。而我们的公平锁类FairSync通用的也实现了tryAcquire方法。

tryAcquire方法

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

hasQueuedPredecessors

在这里,我们发现,唯一的不同在于方法hasQueuedPredecessors,而hasQueuedPredecessors的源码为:

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

在这个方法中,进行了判断,对于head的后置节点是否是当前的节点,如果不是当前的节点,则代表在当前的node节点之前,有更加重要的节点要获取锁,如果是当前节点,则代表当前节点就是要获取锁的节点。这样的好处是所有的节点都是按照FIFO的方式来获取锁。保证了获取锁的公平性。

总结

ReentrantLock在实际的开发过程中是十分的重要的。对于ReentrantLock的源码的研究是十分的有必要的。


转载请注明来源,欢迎指出任何有错误或不够清晰的表达。可以邮件至 gouqiangshen@126.com

文章标题:Java--ReentrantLock

文章字数:5.4k

本文作者:BiggerShen

发布时间:2019-11-08, 21:07:14

最后更新:2024-01-16, 03:51:15

原始链接:https://shengouqiang.cn/JavaLock/JavaLockDay03/

版权声明: 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏