JUC(5) 阻塞队列
2023-12-29 14:34:24 # Language # Java

1. 阻塞队列

阻塞队列是共享队列(多线程操作),一端输入,一端输出,不能无限放队列,满了之后就会进入阻塞,取出也同理

  • 当队列是空的,从队列中获取元素的操作将会被阻塞
  • 当队列是满的,从队列中添加元素的操作将会被阻塞
  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

1.1 架构

BlockingQueue 是一个接口

  • 父接口有:CollectionIterableQueue

  • 子接口有:BlockingDequeTransferQueue

  • 实现类有:ArrayBlockingQueueDelayQueueLinkedBlockingDequeLinkedBlockingQueueLinkedTransferQueuePriorityBlockingQueueSynchronousQueue

1.2 分类

ArrayBlockingQueue (常用)

  • 基于数组的阻塞队列实现, 除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的 头部和尾部在数组中的位置。
  • ArrayBlockingQueue 在生产者放入数据和消费者获取数据,共用同一个锁对象,两者无法真正并行运行
  • 由数组结构组成的有界阻塞队列

LinkedBlockingQueue (常用)

  • 基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)
  • 对于生产者端和消费者端分别采用了独立的锁来控制数据同步
  • 由链表结构组成的有界(大小默认值为Integer.MAX_VALUE)阻塞队列

DelayQueue

  • DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素
  • DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

  • 使用优先级队列实现的延迟无界阻塞队列

PriorityBlockingQueue

  • 基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Comparator 对象来决定)
  • 不会阻塞数据生产者,只会在没有可消费的数据时,阻塞数据的消费者
  • 注意:生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间
  • 内部控制线程同步的锁采用的是公平锁
  • 支持优先级排序的无界阻塞队列

SynchronousQueue

  • 一种无缓冲的等待队列
  • 相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)
  • 不存储元素的阻塞队列,也即单个元素的队列
  • 声明一个 SynchronousQueue 有两种不同的方式
    • 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略
    • 非公平模式(默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者
      • 如果生产者和消费者的处理速度有差距,很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理

LinkedTransferQueue

  • 由链表结构组成的无界阻塞 TransferQueue 队列
  • 相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfertransfer 方法
  • 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时 发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到 该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。

LinkedBlockingDeque

  • 由链表结构组成的双向阻塞队列
  • 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再将该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常
  • 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数

2. 核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer() put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() - -

image-20231229142616707

第一种方式:抛出异常

public class BlockingQueueTest {
public static void main(String[] args) {
// 创建阻塞队列
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// 当队列中加元素
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// 检查
System.out.println(blockingQueue.element());
/** 此时输出的结果为
* true
* true
* true
* a
* */
System.out.println(blockingQueue.add("d"));
// 此时再添加元素,会抛出异常:IllegalStateException: Queue full
// 取出元素
System.out.println(blockingQueue.remove());
// 检查
System.out.println(blockingQueue.element());
/** 此时输出的结果为
* true
* true
* true
* a
* a (取出数据)
* b (队首数据)
* */
}
}

第二种方式:布尔值

System.out.println(blockingQueue.offer("a"));
// 检查元素
System.out.println(blockingQueue.peek());
// 取出元素,先进先出
System.out.println(blockingQueue.poll());

第三种方式:阻塞

System.out.println(blockingQueue.put("a"));
// 检查元素
System.out.println(blockingQueue.element());
// 取出元素,先进先出
System.out.println(blockingQueue.take());

该方法加入元素或者取出元素,如果满了或者空了,还进行下一步加入或者取出操作,会出现阻塞的状态

第四种方式:超时

System.out.println(blockingQueue.offer("w", 3L, TimeUnit.SECONDS));

该方法满了或者空了在进行会有阻塞,但可以加入参数,超时退出,返回false