【手写系列】手写线程池

article/2025/8/4 21:04:55

PS:本文的线程池为演示 Demo,皆在理解线程池的工作原理,并没有解决线程安全问题

最简单一版的线程池

public class MyThreadPool {// 存放线程,复用已创建的线程List<Thread> threadList = new ArrayList<>();public void execute(Runnable command){Thread thread = new Thread(command);threadList.add(thread);thread.start();}
}

现在有个问题:创建出来的线程,是可复用的吗?

  • 答案是否定的。
  • 因为线程在执行完 runnable() 就结束,被销毁了。

思考两个问题:

  1. 线程什么时候创建
  2. 线程的 runnable() 方法是什么?是参数的 command 吗?
  • 线程的 runnable() 是一个死循环,不断从 commandList 取任务执行。
  • 参数的 command ,是 commandList(任务队列) 中的任务。

只有一个线程的线程池

简化一下问题:假设线程池现在只有一个线程,该如何设计呢?

  • List 中不在存放线程,存放提交的任务。
  • Thread 的 runnable() 不断去判断 taskList 中是否有任务。
public class MyThreadPool {// 存放线程池中的任务List<Runnable> commandList = new ArrayList<>();Thread thread = new Thread(() ->{while (true){if (!commandList.isEmpty()){Runnable task = commandList.remove(0);command.run();}}});public void execute(Runnable command){commandList.add(command);}
}

看似完美的单线程线程池,还有其他问题: while(true){} 循环中,CPU 一直在空转,浪费资源。

有没有一种容器,可以在容器中没有元素的时候阻塞,有元素的时候在获取?

  • 有的。 阻塞队列。
public class MyThreadPool {BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);Thread thread = new Thread(() ->{while (true){try {// 一直阻塞,直到取出元素Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {// 线程在阻塞、休眠时 被打断,都会抛出 InterruptedException 异常throw new RuntimeException(e);}}},"唯一线程");{thread.start();}public void execute(Runnable command){boolean offered = blockingQueue.offer(command);}}

线程池

单个线程的线程池,多数情况下是满足不了我们的需求的,需要多个线程共同来完成任务。

public class MyThreadPool {// 核心线程数private int corePoolSize = 10;// 核心线程集合List<Thread> coreList = new ArrayList<>();// 存放任务BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);private final Runnable task = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};public void execute(Runnable command){// 小于核心线程数if (coreList.size() < corePoolSize){// 创建核心线程Thread thread = new Thread(task);coreList.add(thread);thread.start();}// 任务添加失败,说明阻塞队列满了,核心线程处理不过来if (!blockingQueue.offer(command)) {Thread thread = new Thread(task);coreList.add(thread);thread.start();return;}}
}

问题:阻塞队列满了,核心线程处理不过来了 该如何做呢?

  • 可以添加一些辅助线程,帮助核心线程处理任务。
public class MyThreadPool {private int corePoolSize = 10;private int maxSize = 16;List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);private final Runnable task = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};public void execute(Runnable command){// 小于核心线程数if (coreList.size() < corePoolSize){// 创建核心线程Thread thread = new Thread(task);coreList.add(thread);thread.start();}// 任务添加成功if (blockingQueue.offer(command)) {return;}// 已创建的线程数 < 最大线程数if (coreList.size() + supportList.size() < maxSize){// 创建辅助线程Thread thread = new Thread(task);supportList.add(thread);thread.start();}// 任务添加失败if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}
}

问题:一个线程如何在空闲的时候结束呢?

  • blockingQueue.take() ,阻塞一定时间我就认为空闲了,因为 在规定时间内取不到元素,说明阻塞队列不满。就应该结束辅助线程。
  • blockingQueue.poll(timeout,timeUnit) ,阻塞 指定的时间。
public class MyThreadPool {private int corePoolSize;private int maxSize;private int timeout;private TimeUnit timeUnit;private BlockingQueue<Runnable> blockingQueue;// 线程池参数交给使用者决定public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();// 核心线程的任务private final Runnable coreTask = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};// 辅助线程的任务private final Runnable supportTask = () ->{while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");};public void execute(Runnable command){if (coreList.size() < corePoolSize){Thread thread = new Thread(coreTask);coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){Thread thread = new Thread(supportTask);supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}
}

coreTask、supportTask 封装为 CoreThread、SupportThread

public class MyThreadPool {private int corePoolSize;private int maxSize;private int timeout;private TimeUnit timeUnit;private BlockingQueue<Runnable> blockingQueue;public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();public void execute(Runnable command){if (coreList.size() < corePoolSize){// 核心线程Thread thread = new CoreThread();coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){// 辅助线程Thread thread = new SupportThread();supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}class CoreThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");}}
}

问题:上述 command 第二次添加到阻塞队列失败了,除了抛异常,还有什么其他处理方式?

  • 定义一个 拒绝策略,由创建线程池的开发者决定。

最终版线程池

@FunctionalInterface
public interface RejectHandle {/*** 拒绝策略* @param rejectCommand 被拒绝的任务* @param threadPool 拒绝任务的线程池*/void reject(Runnable rejectCommand, MyThreadPool threadPool);}public class MyThreadPool {private final int corePoolSize;private final int maxSize;private final int timeout;private final TimeUnit timeUnit;private final BlockingQueue<Runnable> blockingQueue;private final RejectHandle rejectHandle;public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectHandle rejectHandler) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;this.rejectHandle = rejectHandler;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();public void execute(Runnable command){if (coreList.size() < corePoolSize){Thread thread = new CoreThread();coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){Thread thread = new SupportThread();supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {// 执行拒绝策略rejectHandle.reject(command,this);}}class CoreThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");}}
}
public class Test {public static void main(String[] args) {MyThreadPool threadPool = new MyThreadPool(2,6,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),((rejectCommand, Pool) -> {// 执行拒绝策略: 抛异常;丢弃阻塞队列中的第一个任务;丢弃当前任务 。。。} ));Runnable task = () -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName());};for (int i = 0;i < 5;i ++){threadPool.execute(task);}System.out.println("主线程没有被阻塞!");}
}

思考


http://www.hkcw.cn/article/mDPDFryLOT.shtml

相关文章

Git企业级项目管理实战

目录 1. 准备工作 2. 添加成员 2.1 添加企业成员 2.2 添加项目成员 2.3 添加仓库开发人员 3. 开发场景 - 基于git flow模型的实践 3.1 新需求加入 3.2 修复测试环境 Bug 3.3 修改预发布环境Bug 3.4 修改正式环境 Bug 3.5 紧急修复正式环境 Bug 4. 拓展阅读 4.1 其…

go环境配置

下载对应版本的 go 版本 https://go.dev/dl/ 配置 vim ~/.zshrc export GOROOT/usr/local/go export PATH$PATH:$GOROOT/binsource ~/.zshrc >>>>>> go versiongoland 配置&#xff1a; &#x1f50d; 一、什么是GOPATH&#xff1f; GOPATH 是旧的项目结…

MySql(十二)

目录 MySql约束 1.添加主键约束 语法格式 1&#xff09;创建一个带主键的表 查看表结构 2&#xff09;创建表的时候指定主键名称 查看表结构 3&#xff09;创建一个表然后&#xff0c;然后再使用alter为列添加主键 查看表结构 4&#xff09;为表添加数据 1---正常数据 2---主键…

chrome.runtime.sendMessage 和 new FormData()

chrome.runtime.sendMessage 是Chrome扩展程序API中的一个方法&#xff0c;可用于背景脚本和内容脚本之间的消息传递。 new FormData() 提供了一种方便的方式来构建表单数据集。 在Chrome插件中&#xff0c;在 background.js 和 content.js 进行通信时使用了使用new FormData()…

数据结构-排序-排序的七种算法(2)

一&#xff0c;七种算法的介绍和比较 二&#xff0c;冒泡排序 原理&#xff1a;重复遍历列表&#xff0c;比较相邻元素&#xff0c;如果顺序错误就交换它们 时间复杂度&#xff1a; 最好&#xff1a;O(n)&#xff08;已有序时&#xff09; 平均&#xff1a;O(n) 最坏&#x…

【目标检测】backbone究竟有何关键作用?

backbone的核心在于能为检测提供若干种感受野大小和中心步长的组合&#xff0c;以满足对不同尺度和类别的目标检测。

JAVA实战开源项目:精简博客系统 (Vue+SpringBoot) 附源码

本文项目编号 T 215 &#xff0c;文末自助获取源码 \color{red}{T215&#xff0c;文末自助获取源码} T215&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

IO流1——体系介绍和字节输出流

什么是io流 io流分类 纯文本文件&#xff1a; windows自带的记事本打开能读懂的 经验证&#xff1a; word&#xff0c;excel不是&#xff0c; txt, md的是纯文本文件 &#xff01;&#xff01;&#xff01;&#xff01; 字节输出流 io流体系 抽象类不能直接创建他们的对象…

告别复杂操作!电脑极简风格计时使用

无论是工作、学习还是日常生活&#xff0c;这款小巧实用的计时工具都能成为你掌控时间的好帮手。特别适合需要频繁切换正计时、倒计时和查看当前时间的场景。界面简洁&#xff0c;操作便捷&#xff0c;助你高效管理每一刻。 这是一款免安装的工具&#xff0c;下载后可直接打开…

湖北理元理律师事务所:个人债务管理的温度与精度

湖北理元理律师事务所&#xff1a;个人债务管理的温度与精度 面对信用卡、网贷、医疗债等多重债务压力&#xff0c;普通人常陷入“拆东墙补西墙”的恶性循环。湖北理元理律师事务所通过计划集团公司服务平台&#xff0c;推出“有温度的债务优化计划”&#xff0c;其人性化设计…

启动你的RocketMQ之旅(七)-Store存储原理

前言&#xff1a; &#x1f44f;作者简介&#xff1a;我是笑霸final。 &#x1f4dd;个人主页&#xff1a; 笑霸final的主页2 &#x1f4d5;系列专栏&#xff1a;java专栏 &#x1f4e7;如果文章知识点有错误的地方&#xff0c;请指正&#xff01;和大家一起学习&#xff0c;一…

无标注数据如何提升LLM推理能力?熵最小化 提升LLM自信度

熵最小化 提升LLM自信度 ——熵最小化(Entropy Minimization,EM),如何在不使用任何标注数据的情况下,提升大语言模型(LLMs)在数学、物理和编程等复杂推理任务上的表现。 1. 什么是熵最小化? 熵在机器学习中衡量模型输出的不确定性。熵越小,模型对输出越“自信”(概率…

[yolov11改进系列]基于yolov11引入多尺度空洞注意力MSDA的python源码+训练源码

【MSDA介绍】 本文提出了一种新颖的多尺度空洞 Transformer&#xff0c;简称DilateFormer&#xff0c;以用于视觉识别任务。原有的 ViT 模型在计算复杂性和感受野大小之间的权衡上存在矛盾。众所周知&#xff0c;ViT 模型使用全局注意力机制&#xff0c;能够在任意图像块之间建…

LCA(最近公共祖先)与树上差分

LCA&#xff1a; 我们先看一道例题洛谷p3379 这道题就是LCA的模板题 LCA大抵有三种方法处理&#xff0c;我们这里只讲两种 分别是Tarjan和倍增法&#xff0c;也分别是离线和在线算法 我们这里先讲Tarjan Tarjan&#xff1a; 一提到Tarjan这个名字&#xff0c;相信大家都…

PCIe—TS1/TS2 之Polling下的应用(一)

前文 训练序列有序集用于比特对齐、符号对齐以及交换物理层参数。2.5GT/s和5GT/s速率时,训练序列有序集不会加扰,只用8b/10b 编码。但到8GT/s及以上速率时,采用128b/130b编码,符号有可能加扰有可能不加扰,具体参阅SPEC物理层章节,后续可能会写。 训练序列(TS1或…

Spring AI调用Ollama+DeepSeek

文章目录 Spring AI集成DeepSeek申请api_keySpringBoot工程 Spring AI聊天模型概述ChatClient接口角色预设流式响应 ChatModel接口实现简单的对话提示词 函数调用函数调用实现 AI调用Ollama下载并安装 Ollama拉取 DeepSeek 模型代码测试 Spring AI Spring AI是一个AI工程领域的…

maven中的maven-antrun-plugin插件详解

1. 核心功能2. 典型使用场景3. 配置示例4. 关键配置项5. 优缺点分析6. 最佳实践7. 常见问题8. 使用案例1. 基本配置2. 常用 Ant 任务示例文件操作执行系统命令条件判断 3. 绑定到不同生命周期阶段4. 传递参数到 Ant 脚本5. 跳过任务执行6. 调试与日志7. 完整示例 总结 maven-an…

1Remote远程会话管理以及一键启动虚拟机

1Remote远程会话管理以及一键启动虚拟机 前言 vmware中安装的虚拟机命令行没有右键粘贴功能&#xff0c;想用ssh但又得启动虚拟机又得连接SSH&#xff0c;本文使用开源的1Remote以及windows脚本来实现一键启动虚拟机并连接SSH。 实现过程 下载1Remote 下载地址&#xff1a…

Linux基础 文件描述符,重定向及缓冲区理解

&#x1f3d9;️正文 1、文件描述符 在使用 C语言 相关文件操作函数时&#xff0c;可以经常看到 FILE 这种类型&#xff0c;不同的 FILE* 表示不同的文件&#xff0c;实际进行读写时&#xff0c;根据 FILE* 进行操作即可。 #include<iostream> #include <cstdio>…

Vue 核心技术与实战智慧商城项目Day08-10

1.项目演示 2. 项目收获 3. 创建项目 4. 调整初始化目录 5. vant 组件库 6. 其他 Vue 组件库 7. vant 全部导入 和 按需导入 全部导入&#xff1a; 按需导入&#xff1a; 8. 项目中的 vw 适配 记得执行yarn serve module.exports {plugins: {postcss-px-to-viewport: {// vw适…