Java线程池

一、介绍

线程池,顾名思义,这是管理一堆线程而出现的对象。与数据库的连接池一致,它的出现解决了线程的频繁创建和销毁,从而浪费大量资源的问题。

所以,线程池中有提前创建好的线程,使用时直接分配获取,使用完再由线程池管理是否销毁。

优点

  • 降低资源消耗,也就是不需要重复多次的创建线程

  • 更好的管理线程

    • 比如可以获取当前运行的线程是什么
    • 还在等待执行的任务有什么

二、使用线程池

在JDK5起提供了线程池的对象,ExecutorServiceExecutors

其中,ExecutorService和它的子类ThreadPoolExecutor是线程池的关键

Executors是对应的工具类,里面有些工厂方法可以快速创建线程池

查看ThreadPoolExecutor的构造方法

1
2
3
4
5
6
7
8
9
10
public class ThreadPoolExecutor extends AbstractExecutorService {

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
}
参数 说明
corePoolSize 核心线程数。就算目前空闲,也不会回收这几个线程
maximumPoolSize 最大线程数。当前线程池可以容纳的最大线程数量
keepAliveTime 线程保持存活时间。如果线程空间时间到达,将会进行销毁(除了核心线程)
unit keepAliveTime一起使用,仅是个时间单位
workQueue 工作等待队列。当线程池所有的线程都繁忙运行时,新添加的执行任务会暂时保留至此队列
threadFactory 创建线程的线程工厂
handler 拒绝策略。当队列满了后,还有执行任务进入时的策略

workQueue参数需要传入一个BlockingQueue,这是个双缓冲队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率,不能传入空对象,可设置容量大小,也可以不设置容量大小,那么它的容量就是Integer.MAX_VALUE。常用的几种实现类

说明
ArrayBlockingQueue 规定容量大小的阻塞队列
LinkedBlockingQueue 既可以规定容量大小,也可以不规定的阻塞队列
SynchronizedQueue 一个特殊的队列,生产消费必须交替完成的队列
生产一个元素后,必须要有进行消费后,才能继续往队列内生产元素

handler拒绝策略

当线程池指定的队列容量满了时,将执行哪种拒绝任务的策略

策略类 说明
AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
DiscardPolicy 不执行新任务,也不抛出异常
DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
CallerRunsPolicy 直接在外层调用者的线程中调用新任务

1)小试牛刀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.banmoon.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo1 {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);

executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
// lambda表达式
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
// 关闭线程池,如果不关闭,线程池将一直存在,池子内保留着核心线程,等待着调用
executorService.shutdown();
}

}

class MyRunnable implements Runnable{

@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}

image-20211211195656264

2)Executors工具类

关于此的三个相关方法源码,其中还有一些他们的重载,这边就不细细讲了。

这些工具类方法,主要是快速创建ThreadPoolExecutor对象的方法,只是它们的参数各有所不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Executors {

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
}
方法 参数说明 效果
newCachedThreadPool 核心线程数为0
最大线程数已调到Integer.MAX_VALUE
每提交一个线程任务,都将新创建一个新的线程来执行
如果需要执行的任务很多,这有可能会导致CPU100%的问题
newFixedThreadPool 核心线程数和最大线程数一致
但队列长度为Integer.MAX_VALUE
提交的任务将正常交给池子中的线程执行,执行完成也不会销毁,等待执行新的任务
如果执行的任务很多,队列会一直添加任务等待执行,可能会造成内存溢出的问题
newSingleThreadExecutor 核心线程数和最大线程数都为1
但队列长度为Integer.MAX_VALUE
newFixedThreadPool类似,但池子中只有一个线程
newScheduledThreadPool 指定核心线程数,
使用的队列是DelayedWorkQueue
用于处理延迟定时任务线程的线程池,可以定期执行
newWorkStealingPool 并行级别 这个线程池主要用于线程任务的拆分和合并,有更大资源利用效率

根据需要来进行使用合适的线程池,测试下他们的执行方式和快慢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.banmoon.pool;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo2 {

public static void main(String[] args) {
ExecutorService executorService1 = Executors.newCachedThreadPool();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newSingleThreadExecutor();

for (int i = 0; i < 100; i++) {
executorService1.execute(new MyDemo2(i));
// executorService2.execute(new MyDemo2(i));
// executorService3.execute(new MyDemo2(i));
}

executorService1.shutdown();
executorService2.shutdown();
executorService3.shutdown();

}

}

class MyDemo2 implements Runnable {

private Integer i;

public MyDemo2(Integer i) {
this.i = i;
}

@Override
public void run() {
System.out.println(StrUtil.format("{}:{},时间:{}", Thread.currentThread().getName(), i, DateUtil.now()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

newCachedThreadPool执行结果可以看到,一共有100个线程被创建出来

image-20211212150430276

newFixedThreadPool执行结果,执行的永远都是那几个固定的线程,这里我们指定了10个线程,所以打印也是10个为一批来进行的。

image-20211212150922546

newSingleThreadExecutor执行结果,从头到尾就只有一个线程在执行

image-20211212154044131

3)线程工厂

虽然有默认的线程工厂,但如果有需要进行处理的话,还是得记录一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.banmoon.pool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo3 {

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}

}


class MyThreadFactory implements ThreadFactory{
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private String poolName;

MyThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
this.poolName = poolName;
}

@Override
public Thread newThread(Runnable r) {
String threadName = poolName + "-" + threadNumber.getAndIncrement();
Thread t = new Thread(group, r, threadName, 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

}

执行结果

image-20211212163007164

4)拒绝策略

拒绝策略没什么好讲的,平常在使用时,注意下容量的大小,以及使用的策略。自己需要执行的任务数量多少,会不会照成内存溢出等,从这几个方面入手,选择最适合业务的队列容量和拒绝策略。

策略类 说明
AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
DiscardPolicy 不执行新任务,也不抛出异常
DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
CallerRunsPolicy 直接在外层调用者的线程中调用新任务

演示CallerRunsPolicy,会在调用者的线程中,执行超出容量的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.banmoon.pool;

import java.util.concurrent.*;

public class Demo4 {

public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 100; i++) {
executorService.execute(new MyDemo4(i));
}
executorService.shutdown();
}

}

class MyDemo4 implements Runnable{

private Integer i;

public MyDemo4(Integer i) {
this.i = i;
}

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行结果,上述线程池指定了最大线程数为20,队列容量为20。所以当执行第41个任务时,队列满了,将由调用者的线程来执行这个任务,此处是主线程

image-20211212164523094

三、其他

1)执行任务的优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ThreadPoolExecutor extends AbstractExecutorService {

public void execute(Runnable command) {
// 判断是否为空
if (command == null)
throw new NullPointerException();

// 判断当前正在运行的线程数是否小于核心线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加任务至线程执行,成功添加则结束
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果核心线程都有在运行,将任务放至队列中
if (isRunning(c) && workQueue.offer(command)) {
// 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列推入任务失败了,那将直接添加至线程执行
else if (!addWorker(command, false))
// 如果任务添加至线程失败,则将进行拒绝策略
reject(command);
}

/**
* 会从线程工厂获取线程,并添加执行任务
* @param firstTask 执行的任务
* @param core 是否可以添加至核心线程
* @return true:成功添加至线程执行
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
}
}

image-20211212183817010

四、最后

线程池这东西干货还是挺多的,还有挺多没有整理完。比如说addWorker方法,线程池的执行调度等

后续有什么新的理解继续补上,未完待续

关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。

我是半月,祝你幸福!