java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理的使用线程池能够带来三个好处。
1. 降低资源消耗:频繁的创建线程和销毁线程会产生不少的资源上的消耗,通过重复利用已创建的线程可以降低资源消耗。
2.提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
3.提高线程的可管理性:通过线程池来对所有创建的线程进行资源的管理,对其可以进行统一的资源分配,调优和监控等功能,从而提高了稳定性
实现原理:
当任务向线程池提交一个任务之后,线程池是如何处理这个任务的呢?
下面我们看一个示意图:
上图我们可以分析出
1.当提交任务的时候,线程池会先判断当前核心线程是否已满(占用完),如果未占满则直接使用核心线程
2.当核心线程占满的时候就会将其放入队列中进行等待,放入之前也会判断队列是否已满,未满则放入队列
3.如果已经满了,线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的线程(这里的新线程是一个临时线程如果线程使用完毕之后则会回收这个线程)。如果都满了则会交给饱和策略来处理这个任务
下面我们看看线程池的几个关键参数:
核心线程数 (corePoolSize
)
-
本质:线程池的常驻工作兵力
-
行为特点:
-
即使线程空闲也不会销毁(除非设置
allowCoreThreadTimeOut=true
) -
新任务优先使用核心线程执行
-
最大线程数 (maximumPoolSize
)
-
本质:线程池的最大兵力上限
-
行为特点:
-
当核心线程满且队列满时,临时创建新线程(直到达到此值)
-
空闲超时后销毁(由
keepAliveTime
控制)
-
线程存活时间 (keepAliveTime
)
-
本质:非核心线程的空闲存活时间
-
行为特点:
-
线程空闲超过此时间后自动销毁
-
仅作用于超过
corePoolSize
的线程
-
-
单位:
TimeUnit
(秒/毫秒/纳秒)
任务队列 (workQueue
)
-
本质:任务缓冲区的容器
-
常见实现及适用场景:
队列类型 | 特性 | 适用场景 |
---|---|---|
SynchronousQueue | 零容量队列,直接传递任务 | 高响应要求,拒绝任务转移 |
ArrayBlockingQueue | 有界固定容量队列 | 流量可控的系统 |
LinkedBlockingQueue | 可选有界/无界队列(默认无界) | 吞吐量优先,允许任务堆积 |
PriorityBlockingQueue | 带优先级的队列 | 任务有优先级区分时 |
-
危险点:
⚠️ 无界队列可能导致 OOM(堆积过多任务)
⚠️ 有界队列需配合合理的拒绝策略
拒绝策略 (RejectedExecutionHandler
)
-
触发条件:线程数达最大且队列满时
-
四大内置策略:
策略名 | 行为 | 适用场景 |
---|---|---|
AbortPolicy (默认) | 抛出 RejectedExecutionException | 需要快速失败感知的系统 |
CallerRunsPolicy | 由提交任务的线程直接执行 | 保证任务不丢失的慢速降级 |
DiscardPolicy | 静默丢弃想要加入的任务 | 可容忍丢失的场景(如日志) |
DiscardOldestPolicy | 丢弃队列头部的任务并重试新任务 | 优先处理新任务的场景 |
源码分析:
接下来我们选几个线程池比较核心的方法进行一些源码分析
内部属性:
下面我们来看看没有介绍过的线程池内部的属性
private final AtomicInteger ctl;private static final int COUNT_BITS = 29;private static final int CAPACITY = 536870911;private static final int RUNNING = -536870912;private static final int SHUTDOWN = 0;private static final int STOP = 536870912;private static final int TIDYING = 1073741824;private static final int TERMINATED = 1610612736;private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock;private final HashSet<Worker> workers;private final Condition termination;
ctl表示两个状态值--线程池状态以及工作线程最大数量(线程池总线程数)
COUNT_BITS 表示在Integer的几位表示线程数
而后续的几个int变量值都是表示的是线程池状态值
workQueue则是线程池内部的任务队列用来存放任务
mainLock线程池内部的锁用来保证同步
termination目的是当线程池进行关闭的时候会将调用关闭方法的线程放入到Condition队列中进行等待,当线程池完全关闭之后唤醒该线程
核心方法execute
下面我们看看核心方法execute,揭秘线程池是如何运行任务的:
public void execute(Runnable var1) {if (var1 == null) {throw new NullPointerException();} else {int var2 = this.ctl.get();if (workerCountOf(var2) < this.corePoolSize) {if (this.addWorker(var1, true)) {return;}var2 = this.ctl.get();}if (isRunning(var2) && this.workQueue.offer(var1)) {int var3 = this.ctl.get();if (!isRunning(var3) && this.remove(var1)) {this.reject(var1);} else if (workerCountOf(var3) == 0) {this.addWorker((Runnable)null, false);}} else if (!this.addWorker(var1, false)) {this.reject(var1);}}}
首先对任务进行判空处理,随后获取线程池的状态变量进行判断当前线程数是否小于核心线程数,如果小于核心线程数则直接调用addWorker创建核心线程来执行当前任务
下面我们看看addWorker方法是如何创建核心线程来执行任务的
private boolean addWorker(Runnable var1, boolean var2) {while(true) {int var3 = this.ctl.get();int var4 = runStateOf(var3);if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {return false;}while(true) {int var5 = workerCountOf(var3);if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {return false;}if (this.compareAndIncrementWorkerCount(var3)) {boolean var18 = false;boolean var19 = false;Worker var20 = null;try {var20 = new Worker(var1);Thread var6 = var20.thread;if (var6 != null) {ReentrantLock var7 = this.mainLock;var7.lock();try {int var8 = runStateOf(this.ctl.get());if (var8 < 0 || var8 == 0 && var1 == null) {if (var6.isAlive()) {throw new IllegalThreadStateException();}this.workers.add(var20);int var9 = this.workers.size();if (var9 > this.largestPoolSize) {this.largestPoolSize = var9;}var19 = true;}} finally {var7.unlock();}if (var19) {var6.start();var18 = true;}}} finally {if (!var18) {this.addWorkerFailed(var20);}}return var18;}var3 = this.ctl.get();if (runStateOf(var3) != var4) {break;}}}}
首先说明一下参数:第一个参数表示任务,第二个参数Boolean类型则是表示启用何种线程进行执行true表示核心线程,false表示临时线程。
首先进入方法中就是两个while嵌套循环,第一个循环则是判断当前线程池的状态是否可以继续创建线程执行任务否则返回false,第二个则是首先判断当前线程数是否可以继续创建线程否则返回false,随后开始使用CAS来修改状态值增加1个线程数。随后则创建任务线程,在线程池中任务线程则是一个Worker对象,创建完成之后将其加入到线程池的线程集合中,然后更改线程集合的大小,这里的操作都是采用lock全局锁的机制进行创建的保证了线程同步,上述都完成之后则开始调用线程的start方法正式运行线程并且修改标志位,如果出现了异常则会调用addWorkerFailed进行回滚的操作,这里的回滚操作则是线程数减一,状态位更改等操作。
随后我们接着看execute之后的代码逻辑
if (isRunning(var2) && this.workQueue.offer(var1)) {int var3 = this.ctl.get();if (!isRunning(var3) && this.remove(var1)) {this.reject(var1);} else if (workerCountOf(var3) == 0) {this.addWorker((Runnable)null, false);}} else if (!this.addWorker(var1, false)) {this.reject(var1);}
这里如果当前核心线程数已经占满那么就会将其丢入阻塞队列中也就是任务队列,如果成功之后则会再次检查当前线程池的状态标志位是否可以执行任务,如果不可以则将任务从中移除,并且执行拒绝策略,如果当前线程数为0则创建一个临时线程去执行(自定义线程池如果核心线程为0则会出现这些情况)。如果队列已经满了则会创建临时线程去执行,如果也不行就会执行拒绝策略。
Worker类:
我们来看一看Worker对象的内部构造
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable var2) {this.setState(-1);this.firstTask = var2;this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);}public void run() {ThreadPoolExecutor.this.runWorker(this);}protected boolean isHeldExclusively() {return this.getState() != 0;}protected boolean tryAcquire(int var1) {if (this.compareAndSetState(0, 1)) {this.setExclusiveOwnerThread(Thread.currentThread());return true;} else {return false;}}。。。。。}
可以看出它是存在线程池内部的内部类,封装了线程池中的工作线程,并通过继承 AbstractQueuedSynchronizer
(AQS)实现了独特的锁机制,封装工作线程和任务我们可以理解但为什么内部还要实现AQS的锁机制呢?
首先我们可以看到当Worker进行初始化的时候将state设置为-1
Worker(Runnable var2) {this.setState(-1);this.firstTask = var2;this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);}
这里的-1就是防止线程在初始化过程中被中断(state=-1
)。通过AQS的 state
值(-1
, 0
, 1
)区分线程的不同阶段(未启动、空闲、忙碌)。线程池的shutdown方法会遍历所有的线程根据AQS的state不同进行对应的中断操作。换句话来说worker内部采用AQS的更多目的是通过AQS的state来表示当前Worker的一个状态,随后根据这些状态来执行不同的业务逻辑处理,而不是真正的去使用锁。在后续文章中将会源码分析shutdown方法来看看是如何中断线程的。
因此Worker
集成锁机制的核心目的是:
- 启动保护:防止线程在初始化过程中被中断(
state=-1
)。 - 状态区分:通过
state
值(-1
,0
,1
)区分线程的不同阶段(未启动、空闲、忙碌)。 - 精确中断:配合
shutdown()
和shutdownNow()
实现不同的中断策略,既保证任务完整性,又能快速响应关闭请求。
总结execute方法:
到此execute方法已经解析完毕,可以看上述流程图对整个方法的业务逻辑有一个清晰的认知。接下来我们将会讲一讲线程池的其他方法。
线程池实际上使用了两种锁:一个是全局锁lock还有一个是每个Worker线程内部自带的AQS实现锁,全局锁的目的更主要是为了线程同步,而AQS锁的目的则更主要的是为了实现Worker 的状态控制。