JUC(6) 线程池
2023-12-29 15:15:20 # Language # Java

1. 概述

线程池(thread pool)一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度

线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超过数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

线程池的特点:

  • 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 ExecutorExecutorsExecutorServiceThreadPoolExecutor 这几个类

image-20231229144024317

2. 使用方式

创建线程池

  • Executors.newFixedThreadPool(int):一池N线程

    • 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程
    • 如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务。在某个线程被显式地关闭之前,池中的线程将一直存在。
    • 线程池中的线程处于一定的量,可以很好的控制线程的并发量
    • 线程可以重复被使用,在显式关闭之前,都将一直存在
    • 超出一定量的线程被提交时候需在队列中等待
  • Executors.newSingleThreadExecutor():一池一线程

    • 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
    • 如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务
  • Executors.newCachedThreadPool():一池可扩容根据需求创建线程

    • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    • 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
    • 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
    • 当线程池中没有可用线程,会重新创建一个线程
  • Executors.newScheduledThreadPool(int corePoolSize)(了解)
    • 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池
    • 线程池中具有指定数量的线程,即便是空线程也将保留
    • 可定时或者延迟执行线程活动
    • 适用于需要多个后台线程执行周期任务的场景
  • Executors.newWorkStealingPool(int parallelism)
    • parallelism:并行级别,通常默认为 JVM 可用的处理器个数
    • 底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务
    • 适用于大耗时,可并行执行的场景

执行线程:

  • void execute(Runnable command) 参数为Runnable接口类,可以设置lambda

关闭线程:

  • shutdown()
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
ExecutorService threadPool3 = Executors.newCachedThreadPool();

// 是个顾客请求
try{
for (int i = 1; i <= 10; i++) {
// 到此时执行execute()方法才创建线程
threadPool1.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}finally {
// 关闭线程
threadPool1.shutdown();
}
}
threadPool1 执行结果 threadPool2 执行结果 threadPool3 执行结果
image-20231229145713692 image-20231229145743251 image-20231229145809781

3. 线程池参数

3.1 参数解释

通过查看上面三种方式创建对象的类源代码,都有 new ThreadPoolExecutor

具体查看该类的源代码,涉及七个参数

public ThreadPoolExecutor(int corePoolSize, // 常驻(核心)线程数量
int maximumPoolSize, // 最大线程数量
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列(存放提交但未执行任务的队列)
ThreadFactory threadFactory, // 线程工厂,用于创建线程
RejectedExecutionHandler handler // 拒绝策略(线程满了)
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

3.2 工作流程

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

当提交的任务数大于 workQueue.size() + maximumPoolSize,就会触发线程池的拒绝策略。

示例:阻塞队列为3,常驻线程数2,最大线程数5,现在来了9个任务

img

第1-2个线程进入线程池创建

第3-5个线程进入阻塞队列

第6-8个线程会为他们创建新线程执行(直接运行线程6而非线程3)

第9个线程会被拒绝

3.3 拒绝策略

AbortPolicy(默认): 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。

CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大

DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入,尝试再次提交

DiscardPolicy: 直接丢弃

4. 自定义线程池

实际在开发中不允许使用 Executors 创建,而是通过 ThreadPoolExecutor 的方式,规避资源耗尽风险

image-20231229151351014

public static void main(String[] args) {
// 定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
// 常驻线程数量(核心)2个
2,
// 最大线程数量5个
5,
// 线程存活时间:2秒
2L,
TimeUnit.SECONDS,
// 阻塞队列
new ArrayBlockingQueue<>(3),
// 默认线程工厂
Executors.defaultThreadFactory(),
// 拒绝策略。抛出异常
new ThreadPoolExecutor.AbortPolicy()
);

try {
for (int i = 1; i <= 8; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}