博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——BlockingQueue阻塞队列
阅读量:4181 次
发布时间:2019-05-26

本文共 19473 字,大约阅读时间需要 64 分钟。

阻塞队列(BlockingQueue)是一个支持两个特殊操作的队列:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。 下表中总结了这些方法:

操作 抛异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 element() peek() 不适用 不适用

BlockingQueue 特性:

  • 不接受 null 元素;
  • 可以限定容量;
  • 实现主要用于生产者-使用者队列,另外还支持 Collection 接口;
  • 线程安全的;
  • 不支持某种 close 或者 shutdown 来指示不再插入元素。

Java中的阻塞队列:

  • ArrayBlockingQueue:由 final 的数组组成的有界阻塞队列,数组的大小就决定了队列的边界。
  • LinkedBlockingQueue:基于链表的有界阻塞队列,不指定队列长度默认设置为 Integer.MAX_VALUE。
  • SynchronousQueue:不存储元素的阻塞队列,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。支持公平访问队列,默认非公平。
  • PriorityBlockingQueue:无边界的优先队列,默认采用自然顺序升序排列。
  • DelayedQueue:基于 PriorityQueue 的支持延时获取元素的无边界阻塞队列。
  • LinkedTransferQueue:基于链表的无边界阻塞队列。transfer 方法可以把生产者出入的元素立即传输给消费者。
  • LinkedBlockingDeque:基于链表的双向阻塞队列。可以从队列的两端插入和删除元素。

ArrayBlockingQueue 源码分析

主要属性

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable {
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; }

创建一个公平的阻塞队列

ArrayBlockingQueue
arrayBlockingQueue = new ArrayBlockingQueue<>(12, true);/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}

put 方法:插入元素到队尾,如果队列满了,队列会一直阻塞生产者线程,直到队列可用或者中断。

/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public void put(E e) throws InterruptedException {
//校验不为空 checkNotNull(e); final ReentrantLock lock = this.lock; //获取锁 lock.lockInterruptibly(); try {
while (count == items.length) //队列满了,就一直阻塞 notFull.await(); //将元素插入队列 enqueue(e); } finally {
//释放锁 lock.unlock(); }}/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */private void enqueue(E x) {
// assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //将元素放入数组 items[putIndex] = x; //判断是否需要重设下次插入数组的下标 if (++putIndex == items.length) putIndex = 0; //队列中存在的元素总数加一 count++; //有元素进入,需要唤醒阻塞在 notEmpty 条件的消费者线程 notEmpty.signal();}

take 方法:获取并移除队列头部的元素,如果没有元素将会阻塞

/** * Retrieves and removes the head of this queue, waiting if necessary * until an element becomes available. * * @return the head of this queue * @throws InterruptedException if interrupted while waiting */public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
while (count == 0) //队列为空,阻塞,等待被生产者线程唤醒 notEmpty.await(); //出队列 return dequeue(); } finally {
lock.unlock(); }}/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */private E dequeue() {
// assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //获取之后,将数组对应的元素删除 items[takeIndex] = null; //判断是否需要重设下次读取数组中元素的下标 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //唤醒因队列满而阻塞的生产者 notFull.signal(); return x;}

LinkedBlockingQueue 源码分析

主要属性

/** The capacity bound, or Integer.MAX_VALUE if none *///容量private final int capacity;/** Current number of elements *///队列中已存在的节点数量private final AtomicInteger count = new AtomicInteger();/** * Head of linked list. * Invariant: head.item == null *///头结点transient Node
head;/** * Tail of linked list. * Invariant: last.next == null *///尾结点private transient Node
last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes *///如果调用take方法队列为空,则阻塞在此条件private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts *///如果调用put方法队列已满,则阻塞在此条件private final Condition notFull = putLock.newCondition();

构造方法

LinkedBlockingQueue
blockingQueue = new LinkedBlockingQueue<>(12);/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */public LinkedBlockingQueue() {
//创建时如果不指定容量,则默认为 0x7fffffff,即变为无边界阻塞队列 this(Integer.MAX_VALUE);}/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */public LinkedBlockingQueue(int capacity) {
//指定一个非0的容量 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //构造一个空的节点作为首尾 last = head = new Node
(null);}

put 方法:

/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node
node = new Node
(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取写锁 putLock.lockInterruptibly(); try {
/* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //队列如果已满,需要阻塞 while (count.get() == capacity) {
notFull.await(); } //入队尾 enqueue(node); //c为加一之前的值 c = count.getAndIncrement(); //如果加入一个元素之后,还有空位,唤醒等待中的线程 if (c + 1 < capacity) //注意put可不是循环操作,可能有多个生产者线程阻塞,但只有一个能获取到 putLock 锁 //然后由此线程唤醒后续的生产者线程 notFull.signal(); } finally {
//释放锁之后才会唤醒后续节点 putLock.unlock(); } if (c == 0) //进入到这里,表明原队列中无元素,可能是首次调用put,也可能是消费者把元素消费完了 //加入元素,那么在此之前可能存在消费者线程阻塞,唤醒消费者 signalNotEmpty();}/** * Links node at end of queue. * * @param node the node */private void enqueue(Node
node) {
// assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node;}

take 方法:

public E take() throws InterruptedException {
E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取读锁 takeLock.lockInterruptibly(); try {
//队列如果为空,需要阻塞 while (count.get() == 0) {
notEmpty.await(); } //出队列 x = dequeue(); c = count.getAndDecrement(); //如果出队列后还有一个元素,则唤醒其他的消费者线程 if (c > 1) notEmpty.signal(); } finally {
takeLock.unlock(); } //消费之前,队列是满的话会有生产者线程阻塞,则唤醒生产者线程 if (c == capacity) signalNotFull(); return x;}/** * Removes a node from head of queue. * * @return the node */private E dequeue() {
// assert takeLock.isHeldByCurrentThread(); // assert head.item == null; //头结点是空的,头结点之后的第一个节点才是队列的第一个元素 Node
h = head; Node
first = h.next; h.next = h; // help GC head = first; E x = first.item; //取出元素后,将 first 设置为头结点 first.item = null; return x;}

SynchronousQueue 源码分析

一种阻塞队列,同步队列中没有任何内部容量,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。这里以公平模式做分析。

SynchronousQueue
blockingQueue = new SynchronousQueue<>(true);/** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue
() : new TransferStack
();}/** Dual Queue */static final class TransferQueue
extends Transferer
{
/* * This extends Scherer-Scott dual queue algorithm, differing, * among other ways, by using modes within nodes rather than * marked pointers. The algorithm is a little simpler than * that for stacks because fulfillers do not need explicit * nodes, and matching is done by CAS'ing QNode.item field * from non-null to null (for put) or vice versa (for take). */ /** Node class for TransferQueue. */ static final class QNode {
volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; } /** Head of queue */ transient volatile QNode head; /** Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe;}/** * Shared internal API for dual stacks and queues. */abstract static class Transferer
{ /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ //e不为空代表是生产者要传递给消费者的元素 //e为空代表消费者等待生产者提供一个元素 abstract E transfer(E e, boolean timed, long nanos);}

生产者放入一个元素,然后自旋转等待消费者

blockingQueue.put("item");/** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} *///加入一个元素到队列中,等待其他的消费者来接收public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) {
//上面的返回值为空,代表被中断 Thread.interrupted(); throw new InterruptedException(); }}/** * Puts or takes an item. */@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */ //看上面的注释,基本逻辑为: //1.如果队列为空,或者是相同的节点(队列中只会有一种类型的节点阻塞,要么全部是消费者节点,要么全部是生产者节点) // 将此线程加入到队列中,等待被唤醒处理 //2.如果队列已经有节点阻塞,并且此线程是互补性的节点(消费者阻塞在队列中,那么生产者进来就是互补性的节点), // 尝试将 QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) {
QNode t = tail; QNode h = head; //可能头尾还没有初始化成功 if (t == null || h == null) // saw uninitialized value continue; // spin //头尾为空代表队列为空,进来的线程需要阻塞,和尾结点相同操作的节点进来也一样需要阻塞 if (h == t || t.isData == isData) {
// empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) {
// lagging tail //帮其他线程设置为尾结点 advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; //初始化当前线程的节点 if (s == null) s = new QNode(e, isData); //加入到尾结点后面去 if (!t.casNext(null, s)) // failed to link in continue; //将自己设置为尾结点 advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) {
// wait was cancelled clean(t, s); return null; } if (!s.isOffList()) {
// not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else {
// complementary-mode //互补的节点进来 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled //第一个阻塞的节点没有被其他线程唤醒,且没有被中断 //修改第一个阻塞节点的item为相反的值,item变化就在这里 !m.casItem(x, e)) {
// lost CAS advanceHead(h, m); // dequeue and retry continue; } //casItem 成功后,设置第一个阻塞的节点为头结点,即出队列 advanceHead(h, m); // successfully fulfilled //唤醒阻塞的第一个节点 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } }}/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //自旋转的次数 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) {
if (w.isInterrupted()) //被中断,设置节点的内容 item 为节点自己 s.tryCancel(e); Object x = s.item; if (x != e) //item变化了,才退出 return x; if (timed) {
nanos = deadline - System.nanoTime(); if (nanos <= 0L) {
s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; //自旋转达到最大值,代表这段期间没有其他互补的线程进来,阻塞自己 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}

SynchronousQueue 只是一个传球手,它把生产者的数据直接传递给消费者,吞吐量比 ArrayBlockingQueue 和 LinkedBlockingQueue 高。

DelayQueue 源码分析

一个支持延迟获取元素的阻塞队列,队列使用 PriorityQueue。队列中的元素必须实现 Delayed 接口,定义多久才能从队列中获取元素。

直接上 take() 方法,只有延迟期满才能提取元素。

/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //必须先获取锁 lock.lockInterruptibly(); try {
for (;;) {
//先获取到头元素,头元素并不出队列 E first = q.peek(); if (first == null) //头元素为空,说明队列中没有元素,等待生产者线程生产元素。 available.await(); else {
//获取需要延迟的纳秒数 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) //延迟期满,出队列 return q.poll(); //延迟期满未满 //等待期间释放 first 引用 first = null; // don't retain ref while waiting //leader 是一个等待获取头元素的线程 if (leader != null) //若等待队列有其他线程等待了,自己也等待 available.await(); else {
//无其他线程等待 Thread thisThread = Thread.currentThread(); //当前线程设置成 leader leader = thisThread; try {
//等待延迟期满后,自动唤醒 available.awaitNanos(delay); } finally {
//若自己是 leader,则清空 leader if (leader == thisThread) leader = null; } } } } } finally {
if (leader == null && q.peek() != null) //没有等待获取头元素的线程,且头元素不为空,唤醒等待队列中可能存在的节点 available.signal(); lock.unlock(); }}

生产一个元素

/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */public void put(E e) {
offer(e);}public boolean offer(E e) {
final ReentrantLock lock = this.lock; //也是要先获取锁 lock.lock(); try {
//向优先级队列中插入元素 q.offer(e); if (q.peek() == e) {
//此次插入的元素是头元素 leader = null; //唤醒等待队列中的头节点,让他重新获取头元素 available.signal(); } return true; } finally {
lock.unlock(); }}

LinkedBlockingDeque 双向阻塞度列

相比其他阻塞队列,多了 taskFirst、 taskLast 、putFirst 、putLast 等方法,较简单就不分析了

推荐阅读:

转载地址:http://zqrai.baihongyu.com/

你可能感兴趣的文章
关于vue-router2.0的学习笔记
查看>>
vue1.0与2.0区别之生命周期
查看>>
vue2.0之非父子组件通信
查看>>
如何建立svn版本库并运行它
查看>>
如何合并svn分支到主干上
查看>>
libusb源码学习:list_entry
查看>>
libusb源码学习:几个函数加载的宏(windows)
查看>>
MCU_如何通过硬件VID 查找生产厂家
查看>>
NCNN部署例程 mxnet-gluoncv之simple_pose
查看>>
Ubuntu18.04查看显卡信息并安装NVDIA显卡驱动driver + Cuda + Cudnn
查看>>
电子元件二极管封装SMA,SMB,SMC的区别
查看>>
利用FFmpeg玩转Android视频录制与压缩(二)
查看>>
eclipse下生成Java类图和时序图,生成UML图
查看>>
M文件程序设计(matlab)
查看>>
matlab基础知识
查看>>
程序员的职业素养
查看>>
一道面试题深入了解java底层
查看>>
java下载附件
查看>>
cron表达式每个月最后一天
查看>>
Oracle中Like与Instr模糊查询性能大比拼
查看>>