PS:本文的线程池为演示 Demo,皆在理解线程池的工作原理,并没有解决线程安全问题。
最简单一版的线程池
public class MyThreadPool {// 存放线程,复用已创建的线程List<Thread> threadList = new ArrayList<>();public void execute(Runnable command){Thread thread = new Thread(command);threadList.add(thread);thread.start();}
}
现在有个问题:创建出来的线程,是可复用的吗?
- 答案是否定的。
- 因为线程在执行完
runnable()
就结束,被销毁了。
思考两个问题:
- 线程什么时候创建
- 线程的 runnable() 方法是什么?是参数的 command 吗?
- 线程的
runnable()
是一个死循环,不断从 commandList 取任务执行。 - 参数的 command ,是 commandList(任务队列) 中的任务。
只有一个线程的线程池
简化一下问题:假设线程池现在只有一个线程,该如何设计呢?
- List 中不在存放线程,存放提交的任务。
- Thread 的
runnable()
不断去判断 taskList 中是否有任务。
public class MyThreadPool {// 存放线程池中的任务List<Runnable> commandList = new ArrayList<>();Thread thread = new Thread(() ->{while (true){if (!commandList.isEmpty()){Runnable task = commandList.remove(0);command.run();}}});public void execute(Runnable command){commandList.add(command);}
}
看似完美的单线程线程池,还有其他问题: while(true){}
循环中,CPU 一直在空转,浪费资源。
有没有一种容器,可以在容器中没有元素的时候阻塞,有元素的时候在获取?
- 有的。 阻塞队列。
public class MyThreadPool {BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);Thread thread = new Thread(() ->{while (true){try {// 一直阻塞,直到取出元素Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {// 线程在阻塞、休眠时 被打断,都会抛出 InterruptedException 异常throw new RuntimeException(e);}}},"唯一线程");{thread.start();}public void execute(Runnable command){boolean offered = blockingQueue.offer(command);}}
线程池
单个线程的线程池,多数情况下是满足不了我们的需求的,需要多个线程共同来完成任务。
public class MyThreadPool {// 核心线程数private int corePoolSize = 10;// 核心线程集合List<Thread> coreList = new ArrayList<>();// 存放任务BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);private final Runnable task = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};public void execute(Runnable command){// 小于核心线程数if (coreList.size() < corePoolSize){// 创建核心线程Thread thread = new Thread(task);coreList.add(thread);thread.start();}// 任务添加失败,说明阻塞队列满了,核心线程处理不过来if (!blockingQueue.offer(command)) {Thread thread = new Thread(task);coreList.add(thread);thread.start();return;}}
}
问题:阻塞队列满了,核心线程处理不过来了 该如何做呢?
- 可以添加一些辅助线程,帮助核心线程处理任务。
public class MyThreadPool {private int corePoolSize = 10;private int maxSize = 16;List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(1024);private final Runnable task = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};public void execute(Runnable command){// 小于核心线程数if (coreList.size() < corePoolSize){// 创建核心线程Thread thread = new Thread(task);coreList.add(thread);thread.start();}// 任务添加成功if (blockingQueue.offer(command)) {return;}// 已创建的线程数 < 最大线程数if (coreList.size() + supportList.size() < maxSize){// 创建辅助线程Thread thread = new Thread(task);supportList.add(thread);thread.start();}// 任务添加失败if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}
}
问题:一个线程如何在空闲的时候结束呢?
blockingQueue.take()
,阻塞一定时间我就认为空闲了,因为 在规定时间内取不到元素,说明阻塞队列不满。就应该结束辅助线程。blockingQueue.poll(timeout,timeUnit)
,阻塞 指定的时间。
public class MyThreadPool {private int corePoolSize;private int maxSize;private int timeout;private TimeUnit timeUnit;private BlockingQueue<Runnable> blockingQueue;// 线程池参数交给使用者决定public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();// 核心线程的任务private final Runnable coreTask = () ->{while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}};// 辅助线程的任务private final Runnable supportTask = () ->{while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");};public void execute(Runnable command){if (coreList.size() < corePoolSize){Thread thread = new Thread(coreTask);coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){Thread thread = new Thread(supportTask);supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}
}
coreTask、supportTask
封装为 CoreThread、SupportThread
public class MyThreadPool {private int corePoolSize;private int maxSize;private int timeout;private TimeUnit timeUnit;private BlockingQueue<Runnable> blockingQueue;public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit,BlockingQueue<Runnable> blockingQueue) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();public void execute(Runnable command){if (coreList.size() < corePoolSize){// 核心线程Thread thread = new CoreThread();coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){// 辅助线程Thread thread = new SupportThread();supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {throw new RuntimeException("阻塞队列满了");}}class CoreThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");}}
}
问题:上述 command 第二次添加到阻塞队列失败了,除了抛异常,还有什么其他处理方式?
- 定义一个 拒绝策略,由创建线程池的开发者决定。
最终版线程池
@FunctionalInterface
public interface RejectHandle {/*** 拒绝策略* @param rejectCommand 被拒绝的任务* @param threadPool 拒绝任务的线程池*/void reject(Runnable rejectCommand, MyThreadPool threadPool);}public class MyThreadPool {private final int corePoolSize;private final int maxSize;private final int timeout;private final TimeUnit timeUnit;private final BlockingQueue<Runnable> blockingQueue;private final RejectHandle rejectHandle;public MyThreadPool(int corePoolSize, int maxSize, int timeout, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectHandle rejectHandler) {this.corePoolSize = corePoolSize;this.maxSize = maxSize;this.timeout = timeout;this.timeUnit = timeUnit;this.blockingQueue = blockingQueue;this.rejectHandle = rejectHandler;}List<Thread> coreList = new ArrayList<>();List<Thread> supportList = new ArrayList<>();public void execute(Runnable command){if (coreList.size() < corePoolSize){Thread thread = new CoreThread();coreList.add(thread);thread.start();}if (blockingQueue.offer(command)) {return;}if (coreList.size() + supportList.size() < maxSize){Thread thread = new SupportThread();supportList.add(thread);thread.start();}if (!blockingQueue.offer(command)) {// 执行拒绝策略rejectHandle.reject(command,this);}}class CoreThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.take();command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread{@Overridepublic void run() {while (true){try {Runnable command = blockingQueue.poll(timeout, timeUnit);if (command == null){break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("辅助线程结束了!");}}
}
public class Test {public static void main(String[] args) {MyThreadPool threadPool = new MyThreadPool(2,6,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),((rejectCommand, Pool) -> {// 执行拒绝策略: 抛异常;丢弃阻塞队列中的第一个任务;丢弃当前任务 。。。} ));Runnable task = () -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName());};for (int i = 0;i < 5;i ++){threadPool.execute(task);}System.out.println("主线程没有被阻塞!");}
}