博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AbstractQueuedSynchronizer 源码分析
阅读量:6829 次
发布时间:2019-06-26

本文共 26534 字,大约阅读时间需要 88 分钟。

hot3.png

AQS是通过CHL队列来实现锁请求阻塞列表的。可以通过acquire(int arg)来分析,当前线程竞争锁时的流程,然后再通过release(int arg)方法来分析,当前线程释放一个锁时的流程。这两个方法都是独占锁的流程。相对应的acquireShared(int arg) ,releaseShared(int arg)是共享锁的获取释放流程。

/** * Provides a framework for implementing blocking locks and related * synchronizers (semaphores, events, etc) that rely on * first-in-first-out (FIFO) wait queues.  This class is designed to * be a useful basis for most kinds of synchronizers that rely on a * single atomic int value to represent state. Subclasses * must define the protected methods that change this state, and which * define what that state means in terms of this object being acquired * or released.  Given these, the other methods in this class carry * out all queuing and blocking mechanics. Subclasses can maintain * other state fields, but only the atomically updated int * value manipulated using methods {@link #getState}, {@link * #setState} and {@link #compareAndSetState} is tracked with respect * to synchronization. * 这个类根据fifo策略,实现一个阻塞锁同步器。他设计的基础,是依靠一个int 型的原子变量代表同步器的状态。 * 

Subclasses should be defined as non-public internal helper * classes that are used to implement the synchronization properties * of their enclosing class. Class * AbstractQueuedSynchronizer does not implement any * synchronization interface. Instead it defines methods such as * {@link #acquireInterruptibly} that can be invoked as * appropriate by concrete locks and related synchronizers to * implement their public methods. * *

This class supports either or both a default exclusive * mode and a shared mode. When acquired in exclusive mode, * attempted acquires by other threads cannot succeed. Shared mode * acquires by multiple threads may (but need not) succeed. This class * does not "understand" these differences except in the * mechanical sense that when a shared mode acquire succeeds, the next * waiting thread (if one exists) must also determine whether it can * acquire as well. Threads waiting in the different modes share the * same FIFO queue. Usually, implementation subclasses support only * one of these modes, but both can come into play for example in a * {@link ReadWriteLock}. Subclasses that support only exclusive or * only shared modes need not define the methods supporting the unused mode. * 可以根据这个类,构造共享或者独占模式的锁。 *

This class defines a nested {@link ConditionObject} class that * can be used as a {@link Condition} implementation by subclasses * supporting exclusive mode for which method {@link * #isHeldExclusively} reports whether synchronization is exclusively * held with respect to the current thread, method {@link #release} * invoked with the current {@link #getState} value fully releases * this object, and {@link #acquire}, given this saved state value, * eventually restores this object to its previous acquired state. No * AbstractQueuedSynchronizer method otherwise creates such a * condition, so if this constraint cannot be met, do not use it. The * behavior of {@link ConditionObject} depends of course on the * semantics of its synchronizer implementation. * 类中有个内部类ConditionObject,用于实现条件队列。 *

This class provides inspection, instrumentation, and monitoring * methods for the internal queue, as well as similar methods for * condition objects. These can be exported as desired into classes * using an AbstractQueuedSynchronizer for their * synchronization mechanics. * *

Serialization of this class stores only the underlying atomic * integer maintaining state, so deserialized objects have empty * thread queues. Typical subclasses requiring serializability will * define a readObject method that restores this to a known * initial state upon deserialization. * 。。。。。。。省略。。 * * @since 1.5 * @author Doug Lea */public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * Creates a new AbstractQueuedSynchronizer instance * with initial synchronization state of zero. */ protected AbstractQueuedSynchronizer() { } /** * 等待队列的Node类: * Wait queue node class. * 等待队列用的是CLH队列。CLH 队列一般用于自旋锁队列。 *

The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. * 这里,我们使用相同的控制前置节点策略,把它用在阻塞同步器上。 * A "status" field in each node keeps track of whether a thread * should block. * 每个节点的status 字段,表明这个节点是否应该阻塞。 * A node is signalled when its predecessor * releases. * 当一个节点的前置节点释放锁,它会得到通知。 * Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. * 每个节点,以明确通知交互方式,保存着一个等待线程。 * The status field does NOT control whether threads are * granted locks etc though. * status字段,不代表一个线程是否已经获得锁。 * A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. * 一个状态只表明,某个线程是否有权利参与锁竞争了。 *

To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * 一个线程入队,就是把线程放入队列尾部。数据结构如下图:CLH队列其实是一个个Node类组成的链表,Node 中的waitStatus字段值表示节点的阻塞策略 *

     *      +------+  prev +-----+       +-----+     * head |      | <---- |     | <---- |     |  tail     *      +------+       +-----+       +-----+     * 
* *

Insertion into a CLH queue requires only a single atomic * operation on "tail", so there is a simple atomic point of * demarcation from unqueued to queued. * 入队在tail节点,原子操作 * Similarly, dequeing involves only updating the "head". However, it takes a bit * more work for nodes to determine who their successors are, * in part to deal with possible cancellation due to timeouts * and interrupts. * 类似地,出队在head节点上操作,取消,还要考虑当前节点的下一个节点线程。 *

The "prev" links (not used in original CLH locks), are mainly * needed to handle cancellation. If a node is cancelled, its * successor is (normally) relinked to a non-cancelled * predecessor. * 节点的prev字段(在原始的 CLH locks是没用到的)主要用于,一个节点被取消了,它的下一个节点,重新指向,它的前一个节点。 * For explanation of similar mechanics in the case * of spin locks, see the papers by Scott and Scherer at * http://www.cs.rochester.edu/u/scott/synchronization/ * 这里的一篇论文,解释在自旋锁中相似的机制。 *

We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. * 用next字段实现阻塞机制,每个节点保存有它自己的的线程id * 前置节点可以通过next 字段找到它的下一个节点线程去唤醒。 * Determination of successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) * 以队列下一个节点的方式,可以很方便的解决,在决定谁是下一个节点时,会产生的竞争问题。 *

Cancellation introduces some conservatism to the basic * algorithms. Since we must poll for cancellation of other * nodes, we can miss noticing whether a cancelled node is * ahead or behind us. This is dealt with by always unparking * successors upon cancellation, allowing them to stabilize on * a new predecessor, unless we can identify an uncancelled * predecessor who will carry this responsibility. * 取消一个节点,就是它的下一个节点挂到一个新的前置节点。 *

CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * CLH队列需要一个岗哨节点。为了节约性能,这个节点只有在有竞争者节点时才创建, * 此时,头结点,尾节点都同时指向这个节点。 *

Threads waiting on Conditions use the same nodes, but * use an additional link. Conditions only need to link nodes * in simple (non-concurrent) linked queues because they are * only accessed when exclusively held. Upon await, a node is * inserted into a condition queue. Upon signal, the node is * transferred to the main queue. A special value of status * field is used to mark which queue a node is on. * 条件队列,使用相同的node节点。但是使用另外的链接(nextWaiter),条件队列是非并发的队列,因为,只用获取排他锁,后才能操作它。 * 当await的时候,加入条件队列,当signal时候,把一个节点放到主队列(锁队列)。 * 一个status字段值(CONDITION = -2),可以指定当前节点时在哪个队列。 *

Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */ static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node();//说明节点是共享模式 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;//说明节点是独占模式 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1;//说明节点线程被取消 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1;//说明当前线程的下一个节点是需要被调度唤醒的节点 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2;//说明节点线程在条件队列上等待 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate//说明,下一个共享获取,可以无条件传播(被唤醒)。 */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * 当前线程的,下一个节点(即将)已经被阻塞。所以当前线程,在释放锁时(取消时),要唤醒它的继任者(下一个节点) * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** *这个节点链接是条件队列用的。 * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a volatile read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a volatile write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a volatile read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Queuing utilities /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L; /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * 把一个节点插入到等待队列。如果有需要,初始化。 */ private Node enq(final Node node) { for (;;) {//不停的尝试。 Node t = tail; if (t == null) { // Must initialize 尾节点为空,就创建一个空节点,作为头节点 if (compareAndSetHead(new Node())) tail = head;//并且尾节点指向头节点 } else { node.prev = t;//头节点已经创建好,把当前节点的前置节点,指向头节点。 if (compareAndSetTail(t, node)) {//原子排他方式,把当前节点设置为尾节点。 t.next = node;//设置成功把当,原来的尾节点的next指向当前节点。 return t; } } } } /** * Creates and enqueues node for current thread and given mode. * 把当前线程,以指定的模式(独占或者分享)的方式,放入队列 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//当前队列不空 node.prev = pred;//当前队列的前置队列,指向尾节点。 if (compareAndSetTail(pred, node)) {//原子排他方式,把当前节点设置为尾节点。 pred.next = node;//设置成功把当,原来的尾节点的next指向当前节点。 return node;//返回最新节点。 } } enq(node);//如果当前登台队列为空,把一个节点插入到等待队列。 return node;//返回当前节点。 } /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Wakes up node's successor, if one exists. * 唤醒当前节点的继任者节点线程。 * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. * 如果,继任者已被取消或者为null,就从尾部遍历,找到最近的一个非取消节点。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//native方法唤醒继任者节点线程。 } // Utilities for various versions of acquire /** * Cancels an ongoing attempt to acquire. * 取消一个节点 * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) {//如果是尾节点 compareAndSetNext(pred, predNext, null);//前置节点下一个节点为null.删除当前节点。 } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. // 设置 当前节点的node.next为下一个节点。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node);//唤醒继任者节点 } node.next = node; // help GC } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * 把node节点前置(没取消)节点,waitStatus设置为-1 * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park.前置节点已经被设置为可唤醒状态。可以放心阻塞当前节点。 */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. 如果前置节点被取消,直接抛弃,直到找到没取消的前置节点。 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * 给前置节点信号为-1。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /* * Various flavors of acquire, varying in exclusive/shared and * control modes. Each is mostly the same, but annoyingly * different. Only a little bit of factoring is possible due to * interactions of exception mechanics (including ensuring that we * cancel if tryAcquire throws exception) and other control, at * least not without hurting performance too much. */ /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * 一个已经在队列中的线程,尝试获取一个排他锁。 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//获取当前节点的前置节点 if (p == head && tryAcquire(arg)) {//如果前置节点是头节点,不停的尝试,知道成功。说明没有线程在等待。调用tryAcquire(arg)尝试改变状态变量值。 setHead(node);//如果成功了,就把当前节点设置为头节点,然后出列。 p.next = null; // help GC failed = false; return interrupted; } //不是头节点,表示前面还有等待的节点。调用shouldParkAfterFailedAcquire(p, node)设置当前节点前置节点waitStatus=-1, //如果前置节点waitStatus值已被设置为-1(返回true),接着调用parkAndCheckInterrupt()阻塞当前节点,安全放心的阻塞。 //如果前置节点waitStatus不为-1(>0),设置前置节点的waitStatus为-1 返回false,不阻塞当前节点。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed)//这个正常执行不到,异常时,取消进程。 cancelAcquire(node); } } /** * 先从获取一个排他锁,说起, * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //tryAcquire(arg)尝试修改state状态值,失败返回flase, //执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。把当前请求线程放入队列 selfInterrupt();//既不能修改state值,也不能入队列的,就自我中断。 } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * 释放一个独占锁。 * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) {//如果子类实现的tryRelase返回为true Node h = head;//把头节点赋给h if (h != null && h.waitStatus != 0)//头节点不为null,并且waitStatus不为0,为0时下一个节点已经被唤醒 unparkSuccessor(h);//唤醒h的下个节点(即当前队列头第一个等待节点)如果waitStatus为负数,就把waitStatus设置为0 return true; } return false; } }

 

转载于:https://my.oschina.net/u/146130/blog/881848

你可能感兴趣的文章
公众号python
查看>>
基于OWIN的WEB API承载
查看>>
Linux-常见服务配置(软件、服务、配置文件)
查看>>
XML和JSON的区别
查看>>
javascript中的this指向问题总结
查看>>
git强制放弃本地更改
查看>>
mapreduce中一个map多个输入路径
查看>>
HBase 多级索引
查看>>
虚拟机Ubuntu16.04安装lrzsz
查看>>
树状数组总结
查看>>
STL vector练习
查看>>
【转载】复数基础知识
查看>>
kali网络环境配置
查看>>
VC++6.0 IDE的工程用Code::Blocks来打开、编译、调试终极配置方案
查看>>
spring mvc 重定向,转发和传参
查看>>
HDU 5724 - Chess
查看>>
计算机网络五:局域网、城域网、广域网和互联网
查看>>
angularjs使用directive实现分页组件
查看>>
mysql的时间函数整理
查看>>
Oracle-DDL,DML理解以及应用
查看>>