Java线程池
一、介绍
线程池,顾名思义,这是管理一堆线程而出现的对象。与数据库的连接池一致,它的出现解决了线程的频繁创建和销毁,从而浪费大量资源的问题。
所以,线程池中有提前创建好的线程,使用时直接分配获取,使用完再由线程池管理是否销毁。
优点
-
降低资源消耗,也就是不需要重复多次的创建线程
-
更好的管理线程
- 比如可以获取当前运行的线程是什么
- 还在等待执行的任务有什么
二、使用线程池
在JDK5起提供了线程池的对象,ExecutorService
和Executors
其中,ExecutorService
和它的子类ThreadPoolExecutor
是线程池的关键
而Executors
是对应的工具类,里面有些工厂方法可以快速创建线程池
查看ThreadPoolExecutor
的构造方法
1 2 3 4 5 6 7 8 9 10
| public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {} }
|
参数 |
说明 |
corePoolSize |
核心线程数。就算目前空闲,也不会回收这几个线程 |
maximumPoolSize |
最大线程数。当前线程池可以容纳的最大线程数量 |
keepAliveTime |
线程保持存活时间。如果线程空间时间到达,将会进行销毁(除了核心线程) |
unit |
与keepAliveTime 一起使用,仅是个时间单位 |
workQueue |
工作等待队列。当线程池所有的线程都繁忙运行时,新添加的执行任务会暂时保留至此队列 |
threadFactory |
创建线程的线程工厂 |
handler |
拒绝策略。当队列满了后,还有执行任务进入时的策略 |
workQueue
参数需要传入一个BlockingQueue
,这是个双缓冲队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率,不能传入空对象,可设置容量大小,也可以不设置容量大小,那么它的容量就是Integer.MAX_VALUE
。常用的几种实现类
类 |
说明 |
ArrayBlockingQueue |
规定容量大小的阻塞队列 |
LinkedBlockingQueue |
既可以规定容量大小,也可以不规定的阻塞队列 |
SynchronizedQueue |
一个特殊的队列,生产消费必须交替完成的队列 生产一个元素后,必须要有进行消费后,才能继续往队列内生产元素 |
handler
拒绝策略
当线程池指定的队列容量满了时,将执行哪种拒绝任务的策略
策略类 |
说明 |
AbortPolicy |
默认,不执行新任务,直接抛出异常,提示线程池已满 |
DiscardPolicy |
不执行新任务,也不抛出异常 |
DiscardOldestPolicy |
它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。 |
CallerRunsPolicy |
直接在外层调用者的线程中调用新任务 |
1)小试牛刀
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.banmoon.pool;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new MyRunnable()); executorService.execute(new MyRunnable()); executorService.execute(new MyRunnable()); executorService.execute(new MyRunnable()); executorService.execute(new MyRunnable()); executorService.execute(() -> { System.out.println(Thread.currentThread().getName()); }); executorService.shutdown(); }
}
class MyRunnable implements Runnable{
@Override public void run() { System.out.println(Thread.currentThread().getName()); } }
|
2)Executors
工具类
关于此的三个相关方法源码,其中还有一些他们的重载,这边就不细细讲了。
这些工具类方法,主要是快速创建ThreadPoolExecutor
对象的方法,只是它们的参数各有所不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Executors { public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } }
|
方法 |
参数说明 |
效果 |
newCachedThreadPool |
核心线程数为0 最大线程数已调到Integer.MAX_VALUE
|
每提交一个线程任务,都将新创建一个新的线程来执行 如果需要执行的任务很多,这有可能会导致CPU100%的问题 |
newFixedThreadPool |
核心线程数和最大线程数一致 但队列长度为Integer.MAX_VALUE |
提交的任务将正常交给池子中的线程执行,执行完成也不会销毁,等待执行新的任务 如果执行的任务很多,队列会一直添加任务等待执行,可能会造成内存溢出的问题 |
newSingleThreadExecutor |
核心线程数和最大线程数都为1 但队列长度为Integer.MAX_VALUE |
和newFixedThreadPool 类似,但池子中只有一个线程 |
newScheduledThreadPool |
指定核心线程数, 使用的队列是DelayedWorkQueue |
用于处理延迟定时任务线程的线程池,可以定期执行 |
newWorkStealingPool |
并行级别 |
这个线程池主要用于线程任务的拆分和合并,有更大资源利用效率 |
根据需要来进行使用合适的线程池,测试下他们的执行方式和快慢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.banmoon.pool;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Demo2 {
public static void main(String[] args) { ExecutorService executorService1 = Executors.newCachedThreadPool(); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) { executorService1.execute(new MyDemo2(i));
}
executorService1.shutdown(); executorService2.shutdown(); executorService3.shutdown();
}
}
class MyDemo2 implements Runnable {
private Integer i;
public MyDemo2(Integer i) { this.i = i; }
@Override public void run() { System.out.println(StrUtil.format("{}:{},时间:{}", Thread.currentThread().getName(), i, DateUtil.now())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
newCachedThreadPool
执行结果可以看到,一共有100个线程被创建出来
newFixedThreadPool
执行结果,执行的永远都是那几个固定的线程,这里我们指定了10个线程,所以打印也是10个为一批来进行的。
newSingleThreadExecutor
执行结果,从头到尾就只有一个线程在执行
3)线程工厂
虽然有默认的线程工厂,但如果有需要进行处理的话,还是得记录一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.banmoon.pool;
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger;
public class Demo3 {
public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST")); for (int i = 0; i < 100; i++) { executor.execute(() -> { try { System.out.println(Thread.currentThread().getName()); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); }
}
class MyThreadFactory implements ThreadFactory{ private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private String poolName;
MyThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.poolName = poolName; }
@Override public Thread newThread(Runnable r) { String threadName = poolName + "-" + threadNumber.getAndIncrement(); Thread t = new Thread(group, r, threadName, 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
}
|
执行结果
4)拒绝策略
拒绝策略没什么好讲的,平常在使用时,注意下容量的大小,以及使用的策略。自己需要执行的任务数量多少,会不会照成内存溢出等,从这几个方面入手,选择最适合业务的队列容量和拒绝策略。
策略类 |
说明 |
AbortPolicy |
默认,不执行新任务,直接抛出异常,提示线程池已满 |
DiscardPolicy |
不执行新任务,也不抛出异常 |
DiscardOldestPolicy |
它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。 |
CallerRunsPolicy |
直接在外层调用者的线程中调用新任务 |
演示CallerRunsPolicy
,会在调用者的线程中,执行超出容量的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.banmoon.pool;
import java.util.concurrent.*;
public class Demo4 {
public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 1; i <= 100; i++) { executorService.execute(new MyDemo4(i)); } executorService.shutdown(); }
}
class MyDemo4 implements Runnable{
private Integer i;
public MyDemo4(Integer i) { this.i = i; }
@Override public void run() { try { System.out.println(Thread.currentThread().getName() + ":" + i); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
执行结果,上述线程池指定了最大线程数为20,队列容量为20。所以当执行第41个任务时,队列满了,将由调用者的线程来执行这个任务,此处是主线程
三、其他
1)执行任务的优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class ThreadPoolExecutor extends AbstractExecutorService { public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { } }
|
四、最后
线程池这东西干货还是挺多的,还有挺多没有整理完。比如说addWorker
方法,线程池的执行调度等
后续有什么新的理解继续补上,未完待续
关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。
我是半月,祝你幸福!