Juc并发编程包

一、介绍

关于Java如何创建线程,大家都可以马上能想到有两种方法,无非不就是继承 Thread类和实现 Runnable接口嘛,顶多再加上个实现 Callable接口。而且 synchronized解决并发问题,如果学艺不精,锁住的对象是哪个都不知道,实在是不友好。

所以,我们在企业开发中基本不这样使用线程。在线程的启动上,我们常使用线程池。对于线程池的使用,可以看我另一篇博客,讲到了线程池的使用

本文将讲解,线程池所在的包 java.util.concurrent,在这个包下,还有什么值得关注的类和方法。

附上java8在线文档,边看边学

二、线程安全集合

在使用的集合中,ArrayList或者是 HashMap都是平常我们接触比较多的。但很遗憾,这两个集合类,他们在多线程的情况下,并不是安全的。如果需要使用线程安全的集合,将要有特殊的方法和类。

我们先来演示一下,在多线程情况下,此类集合发生的问题。

1)不安全集合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.banmoon.collection;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.TreeSet;

/**
* 线程不安全的集合
*/
public class Demo1 {

public static void main(String[] args) {
testArrayList();
// testHashMap();
// testSet();
}

public static void testArrayList(){
ArrayList<String> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(Thread.currentThread().getName());
System.out.println(list);
}, "线程" + i).start();
}
}

public static void testHashMap(){
HashMap<Integer, String> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
int temp = i;
new Thread(() -> {
map.put(temp, Thread.currentThread().getName());
System.out.println(map);
}, "线程" + i).start();
}
}

public static void testSet(){
TreeSet<String> set = new TreeSet<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
set.add(Thread.currentThread().getName());
System.out.println(set);
}, "线程" + i).start();
}
}

}

分别执行 testArrayListtestHashMaptestSet,执行结果如下

image-20211218201452050

image-20211218201829175

image-20211218202646969

执行结果有时候会出现 java.util.ConcurrentModificationException异常,字面意思就是并发修改异常,也就说明了原本喜欢用的 ArrayListHashMap是线程不安全的。

为何线程不安全,主要还是关键字 synchronized,可以查看上述集合的添加方法,并没有添加这个关键字。所以,我们在多线程的时候,要避免使用以上这些不安全的集合类。

2)Collections工具类之sync方法

上述讲的集合类都是线程不安全的,但是有办法使他们转换成线程安全的集合类。只需要 Collections工具类使用对应的方法进行转换即可。

image-20211218204316219

方法的实现就是将集合作为参数构造出了另一个线程安全的集合类。转换的方法还是比较多的,简单讲前三个就好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.banmoon.collection;

import java.util.*;

/**
* 线程安全集合
*/
public class Demo2 {

public static void main(String[] args) {
testArrayList();
// testHashMap();
// testSet();
}

public static void testArrayList(){
List<String> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(Thread.currentThread().getName());
System.out.println(list);
}, "线程" + i).start();
}
}

public static void testHashMap(){
Map<Integer, String> map = Collections.synchronizedMap(new HashMap<>());
for (int i = 0; i < 10; i++) {
int temp = i;
new Thread(() -> {
map.put(temp, Thread.currentThread().getName());
System.out.println(map);
}, "线程" + i).start();
}
}

public static void testSet(){
Set<String> set = Collections.synchronizedSet(new TreeSet<>());
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
set.add(Thread.currentThread().getName());
System.out.println(set);
}, "线程" + i).start();
}
}

}

就不贴执行结果了,自己可以试试,保证不出并发修改异常

3)juc下的安全集合

不对啊,上面的集合都在 java.util下面,怎么没有juc什么事呢?别急,要来了,juc包下的线程安全集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.banmoon.collection;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* juc并发包下的集合
*/
public class Demo3 {

public static void main(String[] args) {
testArrayList();
// testHashMap();
// testSet();
}

public static void testArrayList(){
CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(Thread.currentThread().getName());
System.out.println(list);
}, "线程" + i).start();
}
}

public static void testHashMap(){
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
int temp = i;
new Thread(() -> {
map.put(temp, Thread.currentThread().getName());
System.out.println(map);
}, "线程" + i).start();
}
}

public static void testSet(){
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
set.add(Thread.currentThread().getName());
System.out.println(set);
}, "线程" + i).start();
}
}

}

4)Vector类

关于有人提到一个叫 Vector的类,也是线程安全的。的确是这样,线程是安全的,我们查看他的 add方法

1
2
3
4
5
6
7
8
9
10
11
12
public class Vector<E>
extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable
{

public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
}

比较简单,主要使用了 synchronized关键字,来保证了线程安全。

没什么问题,但更推荐使用新的 lock锁写的 CopyOnWriteArrayList等类。

至于什么是 lock,它的优势在哪里,可以继续查看下一章

三、Lock锁

由文档可知,Lock是个接口。使用得从它的几个实现类中入手

image-20211219170625977

1)简单使用

使用 ReentrantLock类,来完成线程安全的取票操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.banmoon.collection;

import java.util.concurrent.locks.ReentrantLock;

/**
* ReentrantLock
* 1、创建公共Lock锁
* 2、使用lock方法进行上锁
* 3、在finally代码块中释放锁
*/
public class Demo4 {

public static void main(String[] args) {
TicketServer ticketServer = new TicketServer();
new Thread(ticketServer, "A").start();
new Thread(ticketServer, "B").start();
new Thread(ticketServer, "C").start();
}

}

class TicketServer implements Runnable{

private int ticketNum = 10;

// 创建锁
ReentrantLock lock = new ReentrantLock();

@Override
public void run() {
String name = Thread.currentThread().getName();

while (true){
// 上锁
lock.lock();
try {
if(ticketNum<=0)
return;
System.out.println(name + ":取到了第" + ticketNum + "张票");
// 模拟网络延迟
Thread.sleep(200);
ticketNum--;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 最后始终都要释放锁
lock.unlock();
}
}
}

}

执行结果,与使用 synchronized关键字无异

image-20211219173052726

lock锁的基本操作

  • 创建公共Lock锁,注意所有的线程访问到的 lock都是同一个

  • 使用lock方法进行上锁

  • 在finally代码块中释放锁,必须要手动释放

2)生产者消费者模式

在使用 synchronized关键词时,线程之间使用 wait方法进行阻塞释放锁,以及使用 notify方法进行唤醒阻塞线程。

而在 Lock锁中,需要创建锁的状态监视器,也就是 Condition。在 Condition中,也有相对应的方法,他们则是 await方法和 signal方法。

image-20211219180257685

使用 Condition类来实现生产者消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.banmoon.collection;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 简单的生产者消费者模式
*/
public class Demo5 {

public static void main(String[] args) {
MyDemo5 myDemo4 = new MyDemo5();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.increment();
}, "线程A").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.decrement();
}, "线程B").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.increment();
}, "线程C").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.decrement();
}, "线程D").start();
}

}

class MyDemo5{

private int number = 0;

private ReentrantLock lock = new ReentrantLock();

/**
* 通过lock创建出监视状态对象
*/
Condition condition = lock.newCondition();

public void increment(){
lock.lock();
try {
while (number==1)
condition.await();
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement(){
lock.lock();
try {
while (number==0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

执行结果,和 synchronized关键字无异

image-20211219180546212

3)精准唤醒

Condition类和传统的 wait方法、notify方法不同,它可以实现精准的唤醒。

比如上面的生产消费模式,让A线程生产完成后,让B线程进行消费;B线程消费完成后,让C进行生产。

这一点在 Objectnotify方法是做不到的,notify方法唤醒的虽然只有一条线程,但这是cpu进行调度的,人为并不可控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.banmoon.collection;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 使用多个Condition实现精准唤醒
*/
public class Demo6 {

public static void main(String[] args) {
MyDemo6 myDemo4 = new MyDemo6();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.increment();
}, "线程A").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.decrement();
}, "线程B").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.increment();
}, "线程C").start();
new Thread(() -> {
for(int i = 0; i < 10; i++) myDemo4.decrement();
}, "线程D").start();
}
}

class MyDemo6 {

private int number = 0;

private Lock lock = new ReentrantLock();

private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private Condition condition4 = lock.newCondition();

public void increment(){
lock.lock();
try {
while (number==1)
await();
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
signal();
} finally {
lock.unlock();
}
}

public void decrement(){
lock.lock();
try {
while (number==0)
await();
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
signal();
} finally {
lock.unlock();
}
}

/**
* 判断当前线程,并触发等待
*/
public void await(){
try {
String name = Thread.currentThread().getName();
switch (name){
case "线程A": condition1.await(); break;
case "线程B": condition2.await(); break;
case "线程C": condition3.await(); break;
case "线程D": condition4.await(); break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 判断当前线程,并触发唤醒
*/
public void signal(){
String name = Thread.currentThread().getName();
switch (name){
case "线程A": condition2.signal(); break;
case "线程B": condition3.signal(); break;
case "线程C": condition4.signal(); break;
case "线程D": condition1.signal(); break;
}
}

}

执行结果,发现了确实按照了我们的顺序来进行唤醒

image-20211219233155451

根据锁创建出监视状态对象,用来判断线程的等待和唤醒

简单来说,就是用 Condition来做一个标记,我们需要做的就是,判断当前的条件,是否使用 Condition进行等待或者唤醒。

上述代码会出现BUG,但我不说,哈哈哈哈哈。

大家可以把 17行的线程B23行的线程D位置互换,你会发现问题的。

可以试着解决一下这个BUG,可以更好理解 Condition的精准唤醒。

4)读写锁

上述使用的都是 ReentrantLock类,这次讲讲另外的两个。处于 ReentrantReadWriteLock类下的两个静态内部类,ReadLockWriteLock

为什么会有读写锁呢,它解决了同步带来的效率低下问题。

在变量的使用上,我们无非就是读和写,只有多线程写入才会造成线程安全的问题,而多线程读永远不会修改变量值,也就不会造成线程的安全问题了。

正因为如此,读写锁出现了,它可以限制只能单线程写入,但可以允许多线程读取变量,由此来保证效率的最大化使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.banmoon.collection;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 读写锁
*/
public class Demo7 {

public static void main(String[] args) {
MyDemo7 demo7 = new MyDemo7();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
demo7.add(Thread.currentThread().getName());
}, "写线程" + i).start();
new Thread(() -> {
demo7.toString(true);
}, "读线程" + i).start();
}

}

}

class MyDemo7{

private List<String> list = new ArrayList<>();

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// private ReentrantLock lock = new ReentrantLock();

public void add(String str){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + ":正在添加");
list.add(str);
System.out.println(Thread.currentThread().getName() + ":添加成功");
} finally {
lock.writeLock().unlock();
}
}

public void toString(boolean b){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + ":正在读取");
System.out.println(Thread.currentThread().getName() + ":" + list);
System.out.println(Thread.currentThread().getName() + ":读取成功");
} finally {
lock.readLock().unlock();
}
}

}

执行结果,主要查看与 ReentrantLock类的区别

image-20211220230240374

可以看到的是,写入线程不会被抢锁,而读线程能被其他读线程插入。

四、辅助类

在juc并发包中,还有一些辅助工具类,让我们可以更好的使用多线程。

1)CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.banmoon.utils;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {

public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(5);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
System.out.println(StrUtil.format("{}:{}", Thread.currentThread().getName(), DateUtil.now()));
TimeUnit.SECONDS.sleep(2);
count.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程"+i).start();
}

// 等待上面几个线程完成,才能进行下面的逻辑
count.await();
System.out.println("主线程:" + DateUtil.now());
}

}

CountDownLatch对线程进行减法计数,计算还有多少线程没有完成任务。

直到指定的线程数,到达指定位置后,才进行下一步的操作。

image-20211223212721997

2)CyclicBarrier

如果上述 CountDownLatch是通过减法计算来达到屏障,那么 CyclicBarrier就是通过加法计算来达到屏障。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.banmoon.utils;

import cn.hutool.core.util.StrUtil;

import java.util.concurrent.*;

public class CyclicBarrierTest {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("======= 当前阶段已完成 =======");
});
new Thread(new Demo(cyclicBarrier), "线程A").start();
new Thread(new Demo(cyclicBarrier), "线程B").start();
new Thread(new Demo(cyclicBarrier), "线程C").start();
}

}

class Demo implements Runnable{

private CyclicBarrier cyclicBarrier;

public Demo(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
System.out.println(StrUtil.format("{}:准备就绪", threadName));
TimeUnit.SECONDS.sleep(3);
// 等待所有线程都就绪
cyclicBarrier.await();

System.out.println(StrUtil.format("{}:上台发言", threadName));
TimeUnit.SECONDS.sleep(3);
// 等待所有线程都上台发言
cyclicBarrier.await();

System.out.println(StrUtil.format("{}:散会回家", threadName));
TimeUnit.SECONDS.sleep(1);

} catch (Exception e) {
e.printStackTrace();
}
}
}

CountDownLatch类似,当线程全部达到一个共同屏障时,从而再向下进行执行。

但和其不一样的是,CyclicBarrier可以执行多次,设立多点屏障。出现问题导致的计数异常后,也可以重新进行计数

image-20211223212858229

3)Semaphore

Semaphore通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

比如去餐厅吃饭,没有座位需要进行等待,直到别人吃完有座位后,才能进入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.banmoon.utils;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + ":排队完成,抓紧吃饭");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + ":吃完了离开");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "线程" + i).start();
}

}

}

适用于有限的资源使用,对同一个资源最多允许几个线程可以访问它。直到访问结束后才释放,让其他线程进入可以访问。

synchornized包住一段代码块比较类似,但 Semaphore允许多个线程,而前者只有一个。

image-20211223221946293

五、阻塞队列

在Java线程池的讲解中,我初步的讲了阻塞队列的功能。但在此,我还是得详细讲讲,什么是阻塞队列。

阻塞队列,顾名思义就是会阻塞的队列。而队列的基本操作就只有两个,存和取。所以阻塞就此产生,有些存会发生阻塞,有些取会发生阻塞。

下面就一起来看看juc包中的阻塞队列吧

1)ArrayBlockingQueue

Array结构,没问题吧,基于数组结构实现的队列。既然叫阻塞队列,那就必然会有阻塞,有阻塞会有锁吧。简单看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

// 内容存在这的
final Object[] items;

// 锁,保护所有的访问
final ReentrantLock lock;

// 关于取操作的 Condition
private final Condition notEmpty;

// 关于存操作的 Condition
private final Condition notFull;

// 构造方法,传入一个容量。还有两个重载,具体自己看源码吧
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 取数
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 上锁
lock.lockInterruptibly();
try {
// 判断个数是否为0,是则阻塞进行等待
while (count == 0)
notEmpty.await();
// 返回取数结果
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}


// 存数
public void put(E e) throws InterruptedException {
// 检查元素是否为空,为空则会抛出空指针异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 上锁
lock.lockInterruptibly();
try {
// 如果个数已经和队列容量相等了,则阻塞进行等待
while (count == items.length)
notFull.await();
// 存数
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}

}

ArrayBlockingQueue有一把 Lock锁以及它的两个 Condition监听器,分别来对队列为空或满的时候,进行阻塞操作。

上面源码只是一小部分,dequeueenqueue方法就交给你们去看啦,不要怕,挺简单的。

测试一下,取和存之间照成的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {

public static void main(String[] args) throws InterruptedException {
putTest();
System.out.println("========= 分割线 =========");
takeTest();
}

public static void putTest() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

new Thread(() -> {
try {
// 3秒后再取出
TimeUnit.SECONDS.sleep(3);
String ele = queue.take();
System.out.println(ele + "已取出:" + DateUtil.formatTime(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

queue.put("1");
System.out.println(StrUtil.format("{}已插入,{}", 1, DateUtil.formatTime(new Date())));
queue.put("2");
System.out.println(StrUtil.format("{}已插入,{}", 2, DateUtil.formatTime(new Date())));
queue.put("3");
System.out.println(StrUtil.format("{}已插入,{}", 3, DateUtil.formatTime(new Date())));
queue.put("4");
System.out.println(StrUtil.format("{}已插入,{}", 4, DateUtil.formatTime(new Date())));
}

private static void takeTest() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

// 其他线程插入数据
for (int i = 1; i <= 3; i++) {
String str = StrUtil.toString(i);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
queue.put(str);
System.out.println(StrUtil.format("{}已插入,{}", str, DateUtil.formatTime(new Date())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

// 直接取数
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date()));
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date()));
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date()));


}

}

执行结果,不要问我为什么分割线下面取出插入搞反了,问就是打印慢了。在插入的结束的马上,就马上唤醒被阻塞的取的线程。

就这样会出现取出在前的打印问题。

image-20211224221719403

2)LinkedBlockingQueue

在简单查看了 ArrayBlockingQueue的源码,它只有一把 Lock锁,

LinkedBlockingQueue不同,它有两把锁。简单看看源码吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

// 好的出现了,静态内部类,链表形式的指针引向。一个节点对象存一个元素,同时指向下一个节点对象
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}

// 容量
private final int capacity;

// 当前的元素个数
private final AtomicInteger count = new AtomicInteger();

// 头部节点
transient Node<E> head;

// 最后的节点
private transient Node<E> last;

// take、poll、etc等方法,取操作持有的锁
private final ReentrantLock takeLock = new ReentrantLock();

// 不为空的状态Condition
private final Condition notEmpty = takeLock.newCondition();

// put、offer、etc等方法,存操作持有的锁
private final ReentrantLock putLock = new ReentrantLock();

// 容量未满的状态Condition
private final Condition notFull = putLock.newCondition();

}

对应的取存操作方法都差不多,这边的测试代码就不贴了。和上面的 ArrayBlockingQueue差不多,把注释掉的代码打开换成 LinkedBlockingQueue就好。

3)存取的4组API

在上述的阻塞队列中,只举例了一组存取的方法。不过除了这一组,还有其他存取操作的API。

这边以 ArrayBlockingQueue为例,总结进行测试一下。

功能说明
队列空或者满的时候会抛出异常 add(E e) remove()
满了还去存则直接返回false
空了还去取就会返回null
offer(E e) poll()
满了还去存就会一直阻塞,直到被唤醒
空了还去取就会一直阻塞,直到被唤醒
put(E e) take()
满了还去存就会阻塞一段时间,超过后就返回false
空了还去取就会阻塞一段时间,超过时间后就会返回null
offer(E e, long timeout, TimeUnit unit) poll(long timeout, TimeUnit unit)

简单测试下这四组API,可以根据自己业务需求来选择对应的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class FourApiTest {

public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
// test3();
test4();
}

/**
* 会抛出异常
*/
private static void test1() {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
System.out.println(queue.add("A"));
System.out.println(queue.add("B"));
// System.out.println(queue.add("C"));// 队列已满,将报错

System.out.println("======== 分割线 ========");

System.out.println(queue.remove());
System.out.println(queue.remove());
// System.out.println(queue.remove());// 队列已空,将报错
}

/**
* 满了还去存则直接返回false,空了还去取就会返回null
*/
private static void test2() {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
System.out.println(queue.offer("A"));
System.out.println(queue.offer("B"));
System.out.println(queue.offer("C"));// 队列已满,存失败就直接返回false

System.out.println("======== 分割线 ========");

System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());// 队列已空,直接返回null
}

/**
* 满了还去存就会一直阻塞,直到被唤醒,空了还去取就会一直阻塞,直到被唤醒
* @throws InterruptedException
*/
private static void test3() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
queue.put("A");
System.out.println("A已插入");
queue.put("B");
System.out.println("B已插入");
// queue.put("C");// 队列已满,到此将会阻塞
// System.out.println("C已插入");

System.out.println("======== 分割线 ========");

System.out.println(queue.take());
System.out.println(queue.take());
// System.out.println(queue.take());// 队列已空,将会持续阻塞
}

/**
* 满了还去存就会阻塞一段时间,超过后就返回false
* 空了还去取就会阻塞一段时间,超过时间后就会返回null
*/
private static void test4() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
String template = "{}:{}";
System.out.println(StrUtil.format(template, queue.offer("A", 2, TimeUnit.SECONDS), DateUtil.now()));
System.out.println(StrUtil.format(template, queue.offer("B", 2, TimeUnit.SECONDS), DateUtil.now()));
System.out.println(StrUtil.format(template, queue.offer("C", 2, TimeUnit.SECONDS), DateUtil.now()));// 队列已满,等待2秒后返回false

System.out.println("======== 分割线 ========");

System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));// 队列已空,等待2秒后返回null
}

}

test1执行结果,会抛出异常

image-20211225114945857

test2执行结果,满了还去存则直接返回false,空了还去取就会返回null

image-20211225115737618

test3执行结果,可以看到程序一直都没有关闭。存的在等待位置,取的在等元素

image-20211225122143643

test4执行结果,注意看时间,不会死等

image-20211225124803048

4)SynchronousQueue

这是一个比较特殊的阻塞队列,存取互相阻塞。生产者存入一个元素就马上阻塞,必须被另一个线程消费者取出这个元素,生产者才解锁。

简单测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.banmoon.queue;

import cn.hutool.core.util.StrUtil;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {

public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
String name = Thread.currentThread().getName();
queue.put("A");
System.out.println(name + "插入A成功");
TimeUnit.SECONDS.sleep(1);
queue.put("B");
System.out.println(name + "插入B成功");
TimeUnit.SECONDS.sleep(1);
queue.put("C");
System.out.println(name + "插入C成功");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程A").start();

new Thread(() -> {
try {
String name = Thread.currentThread().getName();
String template = "{}取出了{}";
System.out.println(StrUtil.format(template, name, queue.take()));
TimeUnit.SECONDS.sleep(1);
System.out.println(StrUtil.format(template, name, queue.take()));
TimeUnit.SECONDS.sleep(1);
System.out.println(StrUtil.format(template, name, queue.take()));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程B").start();

}

}

执行结果,存取交替进行,没问题

image-20211225160353672

现在我们把线程B的代码注释掉,让其只有生产的线程。再次查看结果

image-20211225160559994

好的,被阻塞了。也就是在15行存入后,线程A直接阻塞了,要不然怎么连16行的插入成功信息都没有打印呢?

所以,我们需要进入 SynchronousQueue的源码,简单查看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

// 首先出现了就是一个抽象静态内部类,还有它的两个子类
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}

// Transferer的子类,也是一个静态内部类。栈结构,后进先出
static final class TransferStack<E> extends Transferer<E> {}

// Transferer的子类,也是一个静态内部类。队列结构,先进先出
static final class TransferQueue<E> extends Transferer<E> {}

// 转让器
private transient volatile Transferer<E> transferer;

// 无参构造器
public SynchronousQueue() {
this(false);
}

// 构造器,传入是否公平,如果公平,则创建队列结构的转让器,否则创建栈结构的转让器
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

// 存方法
public void put(E e) throws InterruptedException {
// 判断是否为空
if (e == null) throw new NullPointerException();
// 直接通过转让器来存
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}

// 取方法
public E take() throws InterruptedException {
// 又是通过转让器来取?还是同样的方法?
E e = transferer.transfer(null, false, 0);
// 如果取到的元素为null,直接抛出打断异常
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

}

看到这,我有点蒙,看来还是逃避不过要查看 TransferStack或者 TransferQueue这两个内部类啊。后续开个专章来讲述源码好了。简单说下它的流程,transferer方法

  1. 根据实参是否为null来判断出是哪种操作,并为其打上标记,这边分为 DATAREQUEST

    1. DATA说明是存,REQUEST说明是取
  2. 判断队列是否为空,为空就将此操作线程作为节点阻塞住

  3. 判断队列是否为空,不为空就判断队尾的节点模式和此操作的模式是否匹配

    1. 匹配的意思是指,一个 DATA操作就要匹配一个 REQUEST
  4. 如果模式相等,没有匹配上,就将此操作线程作为节点阻塞住

  5. 如果模式不相等,可以匹配上,则将 DATA操作的元素交给 REQUEST,然后进行消除。

TransferStackTransferQueue的区别在与

判断队尾节点的模式是否相等,TransferQueue是将头部的节点进行匹配消除,而 TransferStack全部都是队尾的节点。这也体现的队列的栈的区别,一个是先进先出,一个是后进先出。

源码内部还使用到了CAS自旋锁,计划出一章关于锁类型的文章。

5)LinkedTransferQueue

如果理解了前面的 SynchronousQueue的话,那么 LinkedTransferQueue就很好理解了。

简单来说,SynchronousQueue中的 TransferQueue直接维护在 LinkedTransferQueue里面,少了一层抽象内部类。

如果 SynchronousQueue存取操作都会阻塞,只有配对上才会唤醒。那么 LinkedTransferQueue做了一定的简化和增强,其中一项就是可以自己决定是否阻塞存取操作。

一起来测试一下吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.Date;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

public class LinkedTransferQueueTest {

public static void main(String[] args) throws InterruptedException {
test1();
// test2();
// test3();
}

/**
* 效果与SynchronousQueue基本一致
* transfer插入时,没有匹配的取操作则会阻塞
* @throws InterruptedException
*/
private static void test1() throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

new Thread(() -> {
try {
for (int i = 0; i < 3; i++) {
String str = queue.take();
TimeUnit.SECONDS.sleep(1);
System.out.println(StrUtil.format("取到了{},{}", str, DateUtil.formatTime(new Date())));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

queue.transfer("A");
System.out.println("插入了A");
queue.transfer("B");
System.out.println("插入了B");
queue.transfer("C");
System.out.println("插入了C");
}

/**
* 存元素时
* 如果有取操作阻塞的话,则进行匹配,返回true
* 没有取操作的话,不阻塞,直接返回false
*/
private static void test2() throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

// 等待三个取的操作都执行完
TimeUnit.SECONDS.sleep(2);

System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A"), DateUtil.formatTime(new Date())));
System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B"), DateUtil.formatTime(new Date())));
System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C"), DateUtil.formatTime(new Date())));
}

/**
* 存操作时,没有取操作匹配,将会等待一段时间再进行返回
* @throws InterruptedException
*/
private static void test3() throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

new Thread(() -> {
for (int i = 0; i < 3; i++) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
}


}

执行 test1transfer方法插入的操作会阻塞,直到有取的操作进入相匹配,这效果和 SynchronousQueue一样

image-20211226123010389

执行 test2。简单来说,存元素时,如果已有取操作阻塞了,将返回true。否则,直接返回false

image-20211226123619477

执行 test3,存操作时,没有取操作匹配,将会等待一段时间再进行返回

image-20211226130016488

5)PriorityBlockingQueue

PriorityBlockingQueue优先级阻塞队列,存元素时不会阻塞,取元素时为空则阻塞。和其他阻塞队列相比,他的数组维护了一个二叉堆,对元素进行了优先级的排序。

测试下列代码,自定义实现比较器,这边就按照数字大小排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.banmoon.queue;

import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTest {

public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(3, new MyComparator());
queue.put(1);
queue.put(3);
queue.put(2);

System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());

}

}

class MyComparator implements Comparator<Integer> {

@Override
public int compare(Integer o1, Integer o2) {
if(o1==o2)
return 0;
return o1>o2? -1: 1;
}
}

执行结果

image-20211228220357330

6)LinkedBlockingDeque

这个和 LinkedBlockingQueue好像,我可以称其为 LinkedBlockingQueue的升级版。

LinkedBlockingDeque被称为双端队列,是因为 LinkedBlockingDeque可以往链表两头插入元素。他的存储结构是链表,同 LinkedBlockingQueue一致,又因为存取可以在双端进行,所以不能像 LinkedBlockingQueue一样给两把锁,这里保持了和 ArrayBlockingQueue一样,仅有一把锁保持线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.banmoon.queue;

import java.util.concurrent.LinkedBlockingDeque;

public class LinkedBlockingDequeTest {

public static void main(String[] args) throws InterruptedException {
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(4);
deque.put("A");
deque.put("B");
deque.put("C");
deque.putFirst("D");

System.out.println(deque.take());
System.out.println(deque.takeLast());
}

}

执行结果,此队列中还有其他的api,都可以指定在队首或者队尾存元素,取元素同理

image-20211228222728181

六、最后

在以前,并没有熟悉去使用过并发包的东西,在这次整理后,我对并发包有一定的了解。尤其是阻塞队列这一块,在看源码的时候还是挺有意思的。

当然本文没有列出所有使用 Lock锁的api方法,提供上jkd8在线文档,学习要对着文档。

关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。

我是半月,祝你幸福!!!