Condition
在使用 Lock 锁的过程中,我们往往会使用到另外一个对象 Condition ,用于等待/通知模式的处理。
Condition 的创建
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();复制代码
使用 Condition 的前提是获取锁
final ConditionObject newCondition() { return new ConditionObject();}复制代码
从 newCondition 方法看出 Condition 对象实际上是 AQS 的内部类 ConditionObject ()。
成员变量
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;复制代码
从内部定义的变量 firstWaiter, lastWaiter 看出, ConditionObject 对象内部维护了一个同样以 Node 为节点的等待队列。
await()
await 操作会使当前线程释放锁并进入等待模式。
public final void await() throws InterruptedException { if (Thread.interrupted()) // 当前线程中断 抛出中断异常 throw new InterruptedException(); // 将当前线程构造节点插入等待队列尾部 Node node = addConditionWaiter(); // 当前线程释放锁,唤醒同步队列 head 的后置节点 int savedState = fullyRelease(node); int interruptMode = 0; // 节点添加到同步队列后 退出循环 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 应该是在其他线程释放锁后被唤醒 // 检查当前线程是否中断,若未中断则返回 0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // node 进入自旋过程尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}复制代码
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 移除等待队列中状态非 CONDITION 的节点 unlinkCancelledWaiters(); t = lastWaiter; } // 将当前线程构造节点并设置状态为 CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 等待队列为空的时候将 firstWaiter 指向 node firstWaiter = node; else // 等待队列非空时将 lastWaiter 尾节点的 nextWaiter 指向 node t.nextWaiter = node; // 移动尾节点 lastWaiter = node; return node;}复制代码
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 当前线程释放锁,并唤醒同步队列中 head 的后置节点 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}复制代码
// 判断节点是否在同步队列上final boolean isOnSyncQueue(Node node) { // 节点状态为 CONDITION 或 节点的前置为空 说明节点还在等待队列上 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果节点存在后置节点 next 则说明节点在同步队列上 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 从 tail 尾节点开始遍历同步队列查找 node 节点;若存在返回 true,反之返回 false return findNodeFromTail(node);}复制代码
await 操作流程如下 :
- 将当前线程构造一个新的 node 节点,状态为 CONDITION 添加到等待队列尾部
- 释放锁,唤醒同步队列 head 的后置节点
- 判断当前 node 节点是否在同步队列中,若不在同步队列上则挂起当前线程,等待其他线程释放锁时被唤醒
- 节点 node 被唤醒后若在同步队列上,则进入自旋过程再次尝试获取锁
signal()
signal 操作激活等待队列中节点
public final void signal() { // 判断当前线程是否为锁的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first);}复制代码
private void doSignal(Node first) { do { // 判断 first 的后置节点是否为空,为空说明等待队列为空 if ( (firstWaiter = first.nextWaiter) == null) // 等待队列的尾节点置为空 lastWaiter = null; // 将 first 的后置节点置为空,也即是将 first 节点从等待队列中移除 first.nextWaiter = null; // 执行信号转移 } while (!transferForSignal(first) && (first = firstWaiter) != null);}复制代码
// 将节点从等待队列 (condition queue) 转移到 同步队列 (sync queue)final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 将节点状态设置为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 将节点添加到同步队列(sync queue)尾部, 此时 p 应该是 node 的前置节点 ws 为 0 Node p = enq(node); // int ws = p.waitStatus; // 将 node 的前置节点状态改为 SIGNAL; 便于节点 p 释放锁的时候唤醒 node if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }复制代码
signal 操作的流程如下:
- 将等待队列中的节点从队列中移除
- 将等待队列中的节点状态由 CONDITION 改为 0
- 将等待队列中的节点添加到 AQS 的同步队列尾部
signal 的作用 只是将节点从等待队列转移到同步队列中,只有当前线程释放锁后,转移到同步队列的节点才会有机会获取到锁。
如下图所示为 Condition 操作节点的转移过程:
小结
从 Condition 的 await()、signal() 操作可以看出,其作用等效于 Object 对象的 await(), notify() 方法;