本文共 19473 字,大约阅读时间需要 64 分钟。
阻塞队列(BlockingQueue)是一个支持两个特殊操作的队列:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。 下表中总结了这些方法:
操作 | 抛异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不适用 | 不适用 |
BlockingQueue 特性:
Java中的阻塞队列:
主要属性
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }
创建一个公平的阻塞队列
ArrayBlockingQueuearrayBlockingQueue = new ArrayBlockingQueue<>(12, true);/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}
put 方法:插入元素到队尾,如果队列满了,队列会一直阻塞生产者线程,直到队列可用或者中断。
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public void put(E e) throws InterruptedException { //校验不为空 checkNotNull(e); final ReentrantLock lock = this.lock; //获取锁 lock.lockInterruptibly(); try { while (count == items.length) //队列满了,就一直阻塞 notFull.await(); //将元素插入队列 enqueue(e); } finally { //释放锁 lock.unlock(); }}/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //将元素放入数组 items[putIndex] = x; //判断是否需要重设下次插入数组的下标 if (++putIndex == items.length) putIndex = 0; //队列中存在的元素总数加一 count++; //有元素进入,需要唤醒阻塞在 notEmpty 条件的消费者线程 notEmpty.signal();}
take 方法:获取并移除队列头部的元素,如果没有元素将会阻塞
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element becomes available. * * @return the head of this queue * @throws InterruptedException if interrupted while waiting */public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //队列为空,阻塞,等待被生产者线程唤醒 notEmpty.await(); //出队列 return dequeue(); } finally { lock.unlock(); }}/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //获取之后,将数组对应的元素删除 items[takeIndex] = null; //判断是否需要重设下次读取数组中元素的下标 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒因队列满而阻塞的生产者 notFull.signal(); return x;}
主要属性
/** The capacity bound, or Integer.MAX_VALUE if none *///容量private final int capacity;/** Current number of elements *///队列中已存在的节点数量private final AtomicInteger count = new AtomicInteger();/** * Head of linked list. * Invariant: head.item == null *///头结点transient Nodehead;/** * Tail of linked list. * Invariant: last.next == null *///尾结点private transient Node last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes *///如果调用take方法队列为空,则阻塞在此条件private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts *///如果调用put方法队列已满,则阻塞在此条件private final Condition notFull = putLock.newCondition();
构造方法
LinkedBlockingQueueblockingQueue = new LinkedBlockingQueue<>(12);/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */public LinkedBlockingQueue() { //创建时如果不指定容量,则默认为 0x7fffffff,即变为无边界阻塞队列 this(Integer.MAX_VALUE);}/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */public LinkedBlockingQueue(int capacity) { //指定一个非0的容量 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //构造一个空的节点作为首尾 last = head = new Node (null);}
put 方法:
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取写锁 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //队列如果已满,需要阻塞 while (count.get() == capacity) { notFull.await(); } //入队尾 enqueue(node); //c为加一之前的值 c = count.getAndIncrement(); //如果加入一个元素之后,还有空位,唤醒等待中的线程 if (c + 1 < capacity) //注意put可不是循环操作,可能有多个生产者线程阻塞,但只有一个能获取到 putLock 锁 //然后由此线程唤醒后续的生产者线程 notFull.signal(); } finally { //释放锁之后才会唤醒后续节点 putLock.unlock(); } if (c == 0) //进入到这里,表明原队列中无元素,可能是首次调用put,也可能是消费者把元素消费完了 //加入元素,那么在此之前可能存在消费者线程阻塞,唤醒消费者 signalNotEmpty();}/** * Links node at end of queue. * * @param node the node */private void enqueue(Node node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node;}
take 方法:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取读锁 takeLock.lockInterruptibly(); try { //队列如果为空,需要阻塞 while (count.get() == 0) { notEmpty.await(); } //出队列 x = dequeue(); c = count.getAndDecrement(); //如果出队列后还有一个元素,则唤醒其他的消费者线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //消费之前,队列是满的话会有生产者线程阻塞,则唤醒生产者线程 if (c == capacity) signalNotFull(); return x;}/** * Removes a node from head of queue. * * @return the node */private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //头结点是空的,头结点之后的第一个节点才是队列的第一个元素 Nodeh = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; //取出元素后,将 first 设置为头结点 first.item = null; return x;}
一种阻塞队列,同步队列中没有任何内部容量,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。这里以公平模式做分析。
SynchronousQueueblockingQueue = new SynchronousQueue<>(true);/** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue () : new TransferStack ();}/** Dual Queue */static final class TransferQueue extends Transferer { /* * This extends Scherer-Scott dual queue algorithm, differing, * among other ways, by using modes within nodes rather than * marked pointers. The algorithm is a little simpler than * that for stacks because fulfillers do not need explicit * nodes, and matching is done by CAS'ing QNode.item field * from non-null to null (for put) or vice versa (for take). */ /** Node class for TransferQueue. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; } /** Head of queue */ transient volatile QNode head; /** Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe;}/** * Shared internal API for dual stacks and queues. */abstract static class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ //e不为空代表是生产者要传递给消费者的元素 //e为空代表消费者等待生产者提供一个元素 abstract E transfer(E e, boolean timed, long nanos);}
生产者放入一个元素,然后自旋转等待消费者
blockingQueue.put("item");/** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} *///加入一个元素到队列中,等待其他的消费者来接收public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { //上面的返回值为空,代表被中断 Thread.interrupted(); throw new InterruptedException(); }}/** * Puts or takes an item. */@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */ //看上面的注释,基本逻辑为: //1.如果队列为空,或者是相同的节点(队列中只会有一种类型的节点阻塞,要么全部是消费者节点,要么全部是生产者节点) // 将此线程加入到队列中,等待被唤醒处理 //2.如果队列已经有节点阻塞,并且此线程是互补性的节点(消费者阻塞在队列中,那么生产者进来就是互补性的节点), // 尝试将 QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; //可能头尾还没有初始化成功 if (t == null || h == null) // saw uninitialized value continue; // spin //头尾为空代表队列为空,进来的线程需要阻塞,和尾结点相同操作的节点进来也一样需要阻塞 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail //帮其他线程设置为尾结点 advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; //初始化当前线程的节点 if (s == null) s = new QNode(e, isData); //加入到尾结点后面去 if (!t.casNext(null, s)) // failed to link in continue; //将自己设置为尾结点 advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode //互补的节点进来 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled //第一个阻塞的节点没有被其他线程唤醒,且没有被中断 //修改第一个阻塞节点的item为相反的值,item变化就在这里 !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } //casItem 成功后,设置第一个阻塞的节点为头结点,即出队列 advanceHead(h, m); // successfully fulfilled //唤醒阻塞的第一个节点 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } }}/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //自旋转的次数 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) //被中断,设置节点的内容 item 为节点自己 s.tryCancel(e); Object x = s.item; if (x != e) //item变化了,才退出 return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; //自旋转达到最大值,代表这段期间没有其他互补的线程进来,阻塞自己 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
SynchronousQueue 只是一个传球手,它把生产者的数据直接传递给消费者,吞吐量比 ArrayBlockingQueue 和 LinkedBlockingQueue 高。
一个支持延迟获取元素的阻塞队列,队列使用 PriorityQueue。队列中的元素必须实现 Delayed 接口,定义多久才能从队列中获取元素。
直接上 take() 方法,只有延迟期满才能提取元素。
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //必须先获取锁 lock.lockInterruptibly(); try { for (;;) { //先获取到头元素,头元素并不出队列 E first = q.peek(); if (first == null) //头元素为空,说明队列中没有元素,等待生产者线程生产元素。 available.await(); else { //获取需要延迟的纳秒数 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) //延迟期满,出队列 return q.poll(); //延迟期满未满 //等待期间释放 first 引用 first = null; // don't retain ref while waiting //leader 是一个等待获取头元素的线程 if (leader != null) //若等待队列有其他线程等待了,自己也等待 available.await(); else { //无其他线程等待 Thread thisThread = Thread.currentThread(); //当前线程设置成 leader leader = thisThread; try { //等待延迟期满后,自动唤醒 available.awaitNanos(delay); } finally { //若自己是 leader,则清空 leader if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //没有等待获取头元素的线程,且头元素不为空,唤醒等待队列中可能存在的节点 available.signal(); lock.unlock(); }}
生产一个元素
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */public void put(E e) { offer(e);}public boolean offer(E e) { final ReentrantLock lock = this.lock; //也是要先获取锁 lock.lock(); try { //向优先级队列中插入元素 q.offer(e); if (q.peek() == e) { //此次插入的元素是头元素 leader = null; //唤醒等待队列中的头节点,让他重新获取头元素 available.signal(); } return true; } finally { lock.unlock(); }}
相比其他阻塞队列,多了 taskFirst、 taskLast 、putFirst 、putLast 等方法,较简单就不分析了
推荐阅读:
转载地址:http://zqrai.baihongyu.com/