目录
- Condition接口
- 一、 Condition介绍及使用
- 二、 Condition接口方法描述(基于jdk1.8)
- 三、 Condition实现分析
- 3.1 等待队列
- 3.2 await()等待
- 3.3 signal() 通知
Condition接口
一、 Condition介绍及使用
Condition接口是为了与Lock配合实现等待/通知模式, 可以将Condition等待通知和Lock的关系 与 Object的等待通知和Synchronized的关系类比;
- Synchronized是通过锁对象即Object的wait() 和 notify() 实现等待通知;
- Lock 则可以通过Condition的await() 和 signal() 实现等待通知;
Object的监视器方法与Condition接口对比如下图:
Condition对象定义了等待通知两种类型的方法, Condition对象有Lock对象(调用newCondition()方法)创建, Condition对象依赖于Lock对象;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ConditionTest {private static Lock lock = new ReentrantLock();private static Condition condition = lock.newCondition();public static void main(String[] args) throws InterruptedException {new Thread(() -> {try {conditionWait();} catch (InterruptedException e) {e.printStackTrace();}}).start();//Thread.sleep(100);new Thread(() -> { conditionSignal(); }).start();}public static void conditionWait() throws InterruptedException {lock.lock();try{System.out.println("start wait");condition.await();System.out.println("wait end");}finally {lock.unlock();}}public static void conditionSignal() {lock.lock();try{System.out.println("signal");condition.signal();}finally {lock.unlock();}}
}
--- 输出:
start wait
signal
wait end
二、 Condition接口方法描述(基于jdk1.8)
三、 Condition实现分析
3.1 等待队列
以ConditionObject为例, ConditionObject是同步器AbstractQueuedSynchronizer内部类; 每个Condition对象都包含一个等待队列, 该队列是实现通知等待的关键; 队列节点使用的是AbstractQueuedSynchronizer.Node
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter; // 队首/** Last node of condition queue. */private transient Node lastWaiter; // 队尾
}
等待队列结构:
3.2 await()等待
调用Condition的await()方法, 会使当前线程进入等待队列并释放锁, 同时线程变为等待状态;
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); // 将当前线程添加到等待队列队尾// 调用AbstractQueuedSynchronizer的方法, 释放掉当前线程持有的资源state, 并唤醒同步队列中下一个后继节点long savedState = fullyRelease(node); int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this); // 最终还是依靠unsafe.park()实现的if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters(); // 删除等待队列中等待状态不为CONDIITON的节点if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
// 将当前线程添加到等待队列队尾
private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters(); // 删除等待队列中等待状态不为CONDIITON的节点t = lastWaiter;}// 将当前线程封装成Node对象添加到等待队列队尾// 由于调用await()的前提是已经获取了锁, 所以下面操作不需要进行线程同步Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}// 遍历等待队列, 删除队列节点中等待状态不为CONDITION(表示在Condition队列中等待)的节点
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}// AbstractQueuedSynchronizer中的方法, 释放掉当前线程持有的资源state, 并唤醒同步队列中下一个后继节点
final long fullyRelease(Node node) {boolean failed = true;try {long savedState = getState(); // 获取当前线程所占有的资源if (release(savedState)) { // 将资源释放, 并唤醒同步队列中的后继节点failed = false;return savedState;} else { // 资源释放失败, 抛出异常throw new IllegalMonitorStateException();}} finally {if (failed) // 资源释放失败, 将该节点状态标记为CANCELLED(1)node.waitStatus = Node.CANCELLED;}
}
3.3 signal() 通知
- Condition的signal()方法, 将唤醒在等待队列中等待时间最长的节点(首节点), 在唤醒节点前, 会将节点移动到同步队列中;
public final void signal() {if (!isHeldExclusively()) // 判断当前线程是否获取了锁; 未获取锁则抛出异常throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first); // 唤醒头结点
}
// 将first节点添加到同步队列尾部, 如果失败则尝试唤醒下一个节点;
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
// 将node添加到同步队列尾部, 并尝试将其唤醒
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); // 最终还是依靠unsafe.unpark()实现的return true;
}
- Condition的signalAll()方法, 相当于对等待队列中每个节点执行一次signal()方法;
/*** Moves all threads from the wait queue for this condition to* the wait queue for the owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}