一:功能介绍
基于数组的有界阻塞队列,基于FIFO的存储模式,支持公平非公平锁。
二:源码分析
//数组 final Object[] items; //出队索引 int takeIndex; //入队索引 int putIndex; //队列大小 int count; //可重入锁 final ReentrantLock lock; //等待通知条件 private final Condition notEmpty; //等待通知条件 private final Condition notFull;
构造函数
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //初始化数组容量 this.items = new Object[capacity]; //内容采用可重入锁ReentrantLock实现,支持公平非公平选择 lock = new ReentrantLock(fair); //阻塞队列,等待条件 notEmpty = lock.newCondition(); //阻塞队列,等待条件 notFull = lock.newCondition(); }
入队操作
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //可中断获取锁,如果出现了interrupted,不用一直阻塞 lock.lockInterruptibly(); try { //如果队列已满 while (count == items.length) //入队线程阻塞 notFull.await(); //插入数据 insert(e); } finally { lock.unlock(); } } private void insert(E x) { //将新的数据赋值在数组的某一个索引处 items[putIndex] = x; //重新赋值putIndex,设置下一个被取出元素的索引 putIndex = inc(putIndex); //队列大小+1 ++count; //唤醒take线程 notEmpty.signal(); } final int inc(int i) { //如果队列满了,重新初始化为0 return (++i == items.length) ? 0 : i; }
出队操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //同上,获取中断锁 lock.lockInterruptibly(); try { //队列没有值,阻塞 while (count == 0) notEmpty.await(); //返回被取走的数据 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; //获取takeIndex处的元素 E x = this.<E>cast(items[takeIndex]); //置空takeIndex处的元素,引用不存在,便于GC,释放内存 items[takeIndex] = null; //重新赋值takeIndex,设置下一个被取出的元素 takeIndex = inc(takeIndex); //队列大小-1 --count; //唤醒put线程 notFull.signal(); return x; }
移除数据
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //从takeIndex处开始计算,每次i加1,最大为队列最大容量count for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { //如果移除元素在数组某个下标找到 if (o.equals(items[i])) { removeAt(i); return true; } } return false; } finally { lock.unlock(); } } void removeAt(int i) { final Object[] items = this.items; //如果准备移除的索引和下一个被取出的元素索引一样,直接移除 if (i == takeIndex) { //赋值null,便于GC items[takeIndex] = null; //重新设置下一个被取出元素的索引 takeIndex = inc(takeIndex); //如果需要删除的元素索引不是当前被取出的索引 } else { //一直循环,直到删除为止 for (;;) { //假设队列容量是4,目前存了3个元素,即takeIndex=0,putIndex=3,目前我打算删除数组下标为1的元素 // nexti第一次为2 int nexti = inc(i); if (nexti != putIndex) { //相当于将队列往前移 items[i] = items[nexti]; //相当于i+1 i = nexti; //待删除的索引与待put的索引相等,比如putIndex=2,i=1,inc(i) = 2 } else { //索引i处置null,偏于GC items[i] = null; //重新赋值下一个即将放入元素的索引 putIndex = i; break; } } } //队列大小-1 --count; //唤醒put线程,公平的话按FIFO顺序,非公平的话可以抢占 notFull.signal(); }
遍历队列
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { //队列里面还剩的元素个数 private int remaining; //下一次调用next()返回的索引 private int nextIndex; //下一次调用next()返回的元素 private E nextItem; //上一次调用next()返回的元素 private E lastItem; //上一次调用next()返回的索引 private int lastRet; Itr() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; //只有队列里面还有元素 if ((remaining = count) > 0) //获取takeIndex处的元素 nextItem = itemAt(nextIndex = takeIndex); } finally { lock.unlock(); } } public boolean hasNext() { return remaining > 0; } public E next() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { //如果队列没有值 if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; //获取下一次获取索引处的元素 E x = itemAt(nextIndex); // check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else //将刚获取的元素当做上一次获取的元素 lastItem = x; //当下一次获取的元素不存在的时候 while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null; // only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
相关推荐
ArrayBlockingQueue源码分析.docx
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue常用方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue功能简介,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先出)原则,当队列已经填满,在去增加则会导致阻塞,这种阻塞类似线程阻塞。 ArrayBlockingQueue提供的增加和取出方法总结 使用...
java中,常用的阻塞式队列Demo。包含:ArrayBlockingQueue、LinkedQueue、PriorityBlockingQueue
数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...
ArrayBlockingQueue源码解析__动力节点共23页.pdf.zip
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
ArrayBlockingQueue :由切片支持的有界阻塞队列 LinkedBlockingQueue :由容器/列表支持的有界阻塞队列 ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues...
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素
ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。下面通过本文给大家介绍Java concurrency集合之ArrayBlockingQueue的相关知识,感兴趣的朋友一起看看吧
BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...