Juc并发编程包
一、介绍
关于Java如何创建线程,大家都可以马上能想到有两种方法,无非不就是继承 Thread
类和实现 Runnable
接口嘛,顶多再加上个实现 Callable
接口。而且 synchronized
解决并发问题,如果学艺不精,锁住的对象是哪个都不知道,实在是不友好。
所以,我们在企业开发中基本不这样使用线程。在线程的启动上,我们常使用线程池。对于线程池的使用,可以看我另一篇博客,讲到了线程池的使用。
本文将讲解,线程池所在的包 java.util.concurrent
,在这个包下,还有什么值得关注的类和方法。
附上java8在线文档,边看边学
二、线程安全集合
在使用的集合中,ArrayList
或者是 HashMap
都是平常我们接触比较多的。但很遗憾,这两个集合类,他们在多线程的情况下,并不是安全的。如果需要使用线程安全的集合,将要有特殊的方法和类。
我们先来演示一下,在多线程情况下,此类集合发生的问题。
1)不安全集合示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.banmoon.collection;
import java.util.ArrayList; import java.util.HashMap; import java.util.TreeSet;
public class Demo1 {
public static void main(String[] args) { testArrayList();
}
public static void testArrayList(){ ArrayList<String> list = new ArrayList<>(); for (int i = 1; i <= 10; i++) { new Thread(() -> { list.add(Thread.currentThread().getName()); System.out.println(list); }, "线程" + i).start(); } }
public static void testHashMap(){ HashMap<Integer, String> map = new HashMap<>(); for (int i = 0; i < 10; i++) { int temp = i; new Thread(() -> { map.put(temp, Thread.currentThread().getName()); System.out.println(map); }, "线程" + i).start(); } }
public static void testSet(){ TreeSet<String> set = new TreeSet<>(); for (int i = 1; i <= 10; i++) { new Thread(() -> { set.add(Thread.currentThread().getName()); System.out.println(set); }, "线程" + i).start(); } }
}
|
分别执行 testArrayList
、testHashMap
和 testSet
,执行结果如下
执行结果有时候会出现 java.util.ConcurrentModificationException
异常,字面意思就是并发修改异常,也就说明了原本喜欢用的 ArrayList
和 HashMap
是线程不安全的。
为何线程不安全,主要还是关键字 synchronized
,可以查看上述集合的添加方法,并没有添加这个关键字。所以,我们在多线程的时候,要避免使用以上这些不安全的集合类。
2)Collections
工具类之sync方法
上述讲的集合类都是线程不安全的,但是有办法使他们转换成线程安全的集合类。只需要 Collections
工具类使用对应的方法进行转换即可。
方法的实现就是将集合作为参数构造出了另一个线程安全的集合类。转换的方法还是比较多的,简单讲前三个就好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.banmoon.collection;
import java.util.*;
public class Demo2 {
public static void main(String[] args) { testArrayList();
}
public static void testArrayList(){ List<String> list = Collections.synchronizedList(new ArrayList<>()); for (int i = 1; i <= 10; i++) { new Thread(() -> { list.add(Thread.currentThread().getName()); System.out.println(list); }, "线程" + i).start(); } }
public static void testHashMap(){ Map<Integer, String> map = Collections.synchronizedMap(new HashMap<>()); for (int i = 0; i < 10; i++) { int temp = i; new Thread(() -> { map.put(temp, Thread.currentThread().getName()); System.out.println(map); }, "线程" + i).start(); } }
public static void testSet(){ Set<String> set = Collections.synchronizedSet(new TreeSet<>()); for (int i = 1; i <= 10; i++) { new Thread(() -> { set.add(Thread.currentThread().getName()); System.out.println(set); }, "线程" + i).start(); } }
}
|
就不贴执行结果了,自己可以试试,保证不出并发修改异常
3)juc下的安全集合
不对啊,上面的集合都在 java.util
下面,怎么没有juc什么事呢?别急,要来了,juc包下的线程安全集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.banmoon.collection;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet;
public class Demo3 {
public static void main(String[] args) { testArrayList();
}
public static void testArrayList(){ CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 10; i++) { new Thread(() -> { list.add(Thread.currentThread().getName()); System.out.println(list); }, "线程" + i).start(); } }
public static void testHashMap(){ ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>(); for (int i = 0; i < 10; i++) { int temp = i; new Thread(() -> { map.put(temp, Thread.currentThread().getName()); System.out.println(map); }, "线程" + i).start(); } }
public static void testSet(){ CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>(); for (int i = 1; i <= 10; i++) { new Thread(() -> { set.add(Thread.currentThread().getName()); System.out.println(set); }, "线程" + i).start(); } }
}
|
4)Vector类
关于有人提到一个叫 Vector
的类,也是线程安全的。的确是这样,线程是安全的,我们查看他的 add
方法
1 2 3 4 5 6 7 8 9 10 11 12
| public class Vector<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; } }
|
比较简单,主要使用了 synchronized
关键字,来保证了线程安全。
没什么问题,但更推荐使用新的 lock
锁写的 CopyOnWriteArrayList
等类。
至于什么是 lock
,它的优势在哪里,可以继续查看下一章
三、Lock锁
由文档可知,Lock是个接口。使用得从它的几个实现类中入手
1)简单使用
使用 ReentrantLock
类,来完成线程安全的取票操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.banmoon.collection;
import java.util.concurrent.locks.ReentrantLock;
public class Demo4 {
public static void main(String[] args) { TicketServer ticketServer = new TicketServer(); new Thread(ticketServer, "A").start(); new Thread(ticketServer, "B").start(); new Thread(ticketServer, "C").start(); }
}
class TicketServer implements Runnable{
private int ticketNum = 10;
ReentrantLock lock = new ReentrantLock();
@Override public void run() { String name = Thread.currentThread().getName();
while (true){ lock.lock(); try { if(ticketNum<=0) return; System.out.println(name + ":取到了第" + ticketNum + "张票"); Thread.sleep(200); ticketNum--; } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }
}
|
执行结果,与使用 synchronized
关键字无异
lock锁的基本操作
2)生产者消费者模式
在使用 synchronized
关键词时,线程之间使用 wait
方法进行阻塞释放锁,以及使用 notify
方法进行唤醒阻塞线程。
而在 Lock
锁中,需要创建锁的状态监视器,也就是 Condition
。在 Condition
中,也有相对应的方法,他们则是 await
方法和 signal
方法。
使用 Condition
类来实现生产者消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.banmoon.collection;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class Demo5 {
public static void main(String[] args) { MyDemo5 myDemo4 = new MyDemo5(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.increment(); }, "线程A").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.decrement(); }, "线程B").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.increment(); }, "线程C").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.decrement(); }, "线程D").start(); }
}
class MyDemo5{
private int number = 0;
private ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment(){ lock.lock(); try { while (number==1) condition.await(); number++; System.out.println(Thread.currentThread().getName() + ":" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public void decrement(){ lock.lock(); try { while (number==0) condition.await(); number--; System.out.println(Thread.currentThread().getName() + ":" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
}
|
执行结果,和 synchronized
关键字无异
3)精准唤醒
Condition
类和传统的 wait
方法、notify
方法不同,它可以实现精准的唤醒。
比如上面的生产消费模式,让A线程生产完成后,让B线程进行消费;B线程消费完成后,让C进行生产。
这一点在 Object
的 notify
方法是做不到的,notify
方法唤醒的虽然只有一条线程,但这是cpu进行调度的,人为并不可控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package com.banmoon.collection;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class Demo6 {
public static void main(String[] args) { MyDemo6 myDemo4 = new MyDemo6(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.increment(); }, "线程A").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.decrement(); }, "线程B").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.increment(); }, "线程C").start(); new Thread(() -> { for(int i = 0; i < 10; i++) myDemo4.decrement(); }, "线程D").start(); } }
class MyDemo6 {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private Condition condition4 = lock.newCondition();
public void increment(){ lock.lock(); try { while (number==1) await(); number++; System.out.println(Thread.currentThread().getName() + ":" + number); signal(); } finally { lock.unlock(); } }
public void decrement(){ lock.lock(); try { while (number==0) await(); number--; System.out.println(Thread.currentThread().getName() + ":" + number); signal(); } finally { lock.unlock(); } }
public void await(){ try { String name = Thread.currentThread().getName(); switch (name){ case "线程A": condition1.await(); break; case "线程B": condition2.await(); break; case "线程C": condition3.await(); break; case "线程D": condition4.await(); break; } } catch (InterruptedException e) { e.printStackTrace(); } }
public void signal(){ String name = Thread.currentThread().getName(); switch (name){ case "线程A": condition2.signal(); break; case "线程B": condition3.signal(); break; case "线程C": condition4.signal(); break; case "线程D": condition1.signal(); break; } }
}
|
执行结果,发现了确实按照了我们的顺序来进行唤醒
根据锁创建出监视状态对象,用来判断线程的等待和唤醒
简单来说,就是用 Condition
来做一个标记,我们需要做的就是,判断当前的条件,是否使用 Condition
进行等待或者唤醒。
上述代码会出现BUG,但我不说,哈哈哈哈哈。
大家可以把 17行的线程B
和 23行的线程D
位置互换,你会发现问题的。
可以试着解决一下这个BUG,可以更好理解 Condition
的精准唤醒。
4)读写锁
上述使用的都是 ReentrantLock
类,这次讲讲另外的两个。处于 ReentrantReadWriteLock
类下的两个静态内部类,ReadLock
和 WriteLock
为什么会有读写锁呢,它解决了同步带来的效率低下问题。
在变量的使用上,我们无非就是读和写,只有多线程写入才会造成线程安全的问题,而多线程读永远不会修改变量值,也就不会造成线程的安全问题了。
正因为如此,读写锁出现了,它可以限制只能单线程写入,但可以允许多线程读取变量,由此来保证效率的最大化使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.banmoon.collection;
import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Demo7 {
public static void main(String[] args) { MyDemo7 demo7 = new MyDemo7(); for (int i = 0; i < 10; i++) { new Thread(() -> { demo7.add(Thread.currentThread().getName()); }, "写线程" + i).start(); new Thread(() -> { demo7.toString(true); }, "读线程" + i).start(); }
}
}
class MyDemo7{
private List<String> list = new ArrayList<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void add(String str){ lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + ":正在添加"); list.add(str); System.out.println(Thread.currentThread().getName() + ":添加成功"); } finally { lock.writeLock().unlock(); } }
public void toString(boolean b){ lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + ":正在读取"); System.out.println(Thread.currentThread().getName() + ":" + list); System.out.println(Thread.currentThread().getName() + ":读取成功"); } finally { lock.readLock().unlock(); } }
}
|
执行结果,主要查看与 ReentrantLock
类的区别
可以看到的是,写入线程不会被抢锁,而读线程能被其他读线程插入。
四、辅助类
在juc并发包中,还有一些辅助工具类,让我们可以更好的使用多线程。
1)CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.banmoon.utils;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException { CountDownLatch count = new CountDownLatch(5); for (int i = 1; i <= 5; i++) { new Thread(() -> { try { System.out.println(StrUtil.format("{}:{}", Thread.currentThread().getName(), DateUtil.now())); TimeUnit.SECONDS.sleep(2); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }, "线程"+i).start(); }
count.await(); System.out.println("主线程:" + DateUtil.now()); }
}
|
CountDownLatch
对线程进行减法计数,计算还有多少线程没有完成任务。
直到指定的线程数,到达指定位置后,才进行下一步的操作。
2)CyclicBarrier
如果上述 CountDownLatch
是通过减法计算来达到屏障,那么 CyclicBarrier
就是通过加法计算来达到屏障。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.banmoon.utils;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.*;
public class CyclicBarrierTest {
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("======= 当前阶段已完成 ======="); }); new Thread(new Demo(cyclicBarrier), "线程A").start(); new Thread(new Demo(cyclicBarrier), "线程B").start(); new Thread(new Demo(cyclicBarrier), "线程C").start(); } }
class Demo implements Runnable{
private CyclicBarrier cyclicBarrier;
public Demo(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { try { String threadName = Thread.currentThread().getName(); System.out.println(StrUtil.format("{}:准备就绪", threadName)); TimeUnit.SECONDS.sleep(3); cyclicBarrier.await();
System.out.println(StrUtil.format("{}:上台发言", threadName)); TimeUnit.SECONDS.sleep(3); cyclicBarrier.await();
System.out.println(StrUtil.format("{}:散会回家", threadName)); TimeUnit.SECONDS.sleep(1);
} catch (Exception e) { e.printStackTrace(); } } }
|
与 CountDownLatch
类似,当线程全部达到一个共同屏障时,从而再向下进行执行。
但和其不一样的是,CyclicBarrier
可以执行多次,设立多点屏障。出现问题导致的计数异常后,也可以重新进行计数
3)Semaphore
Semaphore
通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
比如去餐厅吃饭,没有座位需要进行等待,直到别人吃完有座位后,才能进入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.banmoon.utils;
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":排队完成,抓紧吃饭"); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + ":吃完了离开"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, "线程" + i).start(); }
}
}
|
适用于有限的资源使用,对同一个资源最多允许几个线程可以访问它。直到访问结束后才释放,让其他线程进入可以访问。
和 synchornized
包住一段代码块比较类似,但 Semaphore
允许多个线程,而前者只有一个。
五、阻塞队列
在Java线程池的讲解中,我初步的讲了阻塞队列的功能。但在此,我还是得详细讲讲,什么是阻塞队列。
阻塞队列,顾名思义就是会阻塞的队列。而队列的基本操作就只有两个,存和取。所以阻塞就此产生,有些存会发生阻塞,有些取会发生阻塞。
下面就一起来看看juc包中的阻塞队列吧
1)ArrayBlockingQueue
Array结构,没问题吧,基于数组结构实现的队列。既然叫阻塞队列,那就必然会有阻塞,有阻塞会有锁吧。简单看看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity) { this(capacity, false); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } }
|
ArrayBlockingQueue
有一把 Lock
锁以及它的两个 Condition
监听器,分别来对队列为空或满的时候,进行阻塞操作。
上面源码只是一小部分,dequeue
和 enqueue
方法就交给你们去看啦,不要怕,挺简单的。
测试一下,取和存之间照成的阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package com.banmoon.queue;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil;
import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws InterruptedException { putTest(); System.out.println("========= 分割线 ========="); takeTest(); }
public static void putTest() throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); String ele = queue.take(); System.out.println(ele + "已取出:" + DateUtil.formatTime(new Date())); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
queue.put("1"); System.out.println(StrUtil.format("{}已插入,{}", 1, DateUtil.formatTime(new Date()))); queue.put("2"); System.out.println(StrUtil.format("{}已插入,{}", 2, DateUtil.formatTime(new Date()))); queue.put("3"); System.out.println(StrUtil.format("{}已插入,{}", 3, DateUtil.formatTime(new Date()))); queue.put("4"); System.out.println(StrUtil.format("{}已插入,{}", 4, DateUtil.formatTime(new Date()))); }
private static void takeTest() throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
for (int i = 1; i <= 3; i++) { String str = StrUtil.toString(i); new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); queue.put(str); System.out.println(StrUtil.format("{}已插入,{}", str, DateUtil.formatTime(new Date()))); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date())); System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date())); System.out.println(queue.take() + "已取出:" + DateUtil.formatTime(new Date()));
}
}
|
执行结果,不要问我为什么分割线下面取出插入搞反了,问就是打印慢了。在插入的结束的马上,就马上唤醒被阻塞的取的线程。
就这样会出现取出在前的打印问题。
2)LinkedBlockingQueue
在简单查看了 ArrayBlockingQueue
的源码,它只有一把 Lock
锁,
但 LinkedBlockingQueue
不同,它有两把锁。简单看看源码吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition(); }
|
对应的取存操作方法都差不多,这边的测试代码就不贴了。和上面的 ArrayBlockingQueue
差不多,把注释掉的代码打开换成 LinkedBlockingQueue
就好。
3)存取的4组API
在上述的阻塞队列中,只举例了一组存取的方法。不过除了这一组,还有其他存取操作的API。
这边以 ArrayBlockingQueue
为例,总结进行测试一下。
功能说明 |
存 |
取 |
队列空或者满的时候会抛出异常 |
add(E e) |
remove() |
满了还去存则直接返回false 空了还去取就会返回null |
offer(E e) |
poll() |
满了还去存就会一直阻塞,直到被唤醒 空了还去取就会一直阻塞,直到被唤醒 |
put(E e) |
take() |
满了还去存就会阻塞一段时间,超过后就返回false 空了还去取就会阻塞一段时间,超过时间后就会返回null |
offer(E e, long timeout, TimeUnit unit) |
poll(long timeout, TimeUnit unit) |
简单测试下这四组API,可以根据自己业务需求来选择对应的API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| package com.banmoon.queue;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;
public class FourApiTest {
public static void main(String[] args) throws InterruptedException {
test4(); }
private static void test1() { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); System.out.println(queue.add("A")); System.out.println(queue.add("B"));
System.out.println("======== 分割线 ========");
System.out.println(queue.remove()); System.out.println(queue.remove());
}
private static void test2() { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); System.out.println(queue.offer("A")); System.out.println(queue.offer("B")); System.out.println(queue.offer("C"));
System.out.println("======== 分割线 ========");
System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); }
private static void test3() throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); queue.put("A"); System.out.println("A已插入"); queue.put("B"); System.out.println("B已插入");
System.out.println("======== 分割线 ========");
System.out.println(queue.take()); System.out.println(queue.take());
}
private static void test4() throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); String template = "{}:{}"; System.out.println(StrUtil.format(template, queue.offer("A", 2, TimeUnit.SECONDS), DateUtil.now())); System.out.println(StrUtil.format(template, queue.offer("B", 2, TimeUnit.SECONDS), DateUtil.now())); System.out.println(StrUtil.format(template, queue.offer("C", 2, TimeUnit.SECONDS), DateUtil.now()));
System.out.println("======== 分割线 ========");
System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now())); System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now())); System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now())); }
}
|
test1
执行结果,会抛出异常
test2
执行结果,满了还去存则直接返回false,空了还去取就会返回null
test3
执行结果,可以看到程序一直都没有关闭。存的在等待位置,取的在等元素
test4
执行结果,注意看时间,不会死等
4)SynchronousQueue
这是一个比较特殊的阻塞队列,存取互相阻塞。生产者存入一个元素就马上阻塞,必须被另一个线程消费者取出这个元素,生产者才解锁。
简单测试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.banmoon.queue;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit;
public class SynchronousQueueTest {
public static void main(String[] args) { SynchronousQueue<String> queue = new SynchronousQueue<>(); new Thread(() -> { try { String name = Thread.currentThread().getName(); queue.put("A"); System.out.println(name + "插入A成功"); TimeUnit.SECONDS.sleep(1); queue.put("B"); System.out.println(name + "插入B成功"); TimeUnit.SECONDS.sleep(1); queue.put("C"); System.out.println(name + "插入C成功"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, "线程A").start();
new Thread(() -> { try { String name = Thread.currentThread().getName(); String template = "{}取出了{}"; System.out.println(StrUtil.format(template, name, queue.take())); TimeUnit.SECONDS.sleep(1); System.out.println(StrUtil.format(template, name, queue.take())); TimeUnit.SECONDS.sleep(1); System.out.println(StrUtil.format(template, name, queue.take())); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }, "线程B").start();
}
}
|
执行结果,存取交替进行,没问题
现在我们把线程B的代码注释掉,让其只有生产的线程。再次查看结果
好的,被阻塞了。也就是在15行存入后,线程A直接阻塞了,要不然怎么连16行的插入成功信息都没有打印呢?
所以,我们需要进入 SynchronousQueue
的源码,简单查看下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); } static final class TransferStack<E> extends Transferer<E> {} static final class TransferQueue<E> extends Transferer<E> {} private transient volatile Transferer<E> transferer; public SynchronousQueue() { this(false); }
public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } }
|
看到这,我有点蒙,看来还是逃避不过要查看 TransferStack
或者 TransferQueue
这两个内部类啊。后续开个专章来讲述源码好了。简单说下它的流程,transferer
方法
-
根据实参是否为null来判断出是哪种操作,并为其打上标记,这边分为 DATA
和 REQUEST
DATA
说明是存,REQUEST
说明是取
-
判断队列是否为空,为空就将此操作线程作为节点阻塞住
-
判断队列是否为空,不为空就判断队尾的节点模式和此操作的模式是否匹配
- 匹配的意思是指,一个
DATA
操作就要匹配一个 REQUEST
-
如果模式相等,没有匹配上,就将此操作线程作为节点阻塞住
-
如果模式不相等,可以匹配上,则将 DATA
操作的元素交给 REQUEST
,然后进行消除。
TransferStack
和 TransferQueue
的区别在与
判断队尾节点的模式是否相等,TransferQueue
是将头部的节点进行匹配消除,而 TransferStack
全部都是队尾的节点。这也体现的队列的栈的区别,一个是先进先出,一个是后进先出。
源码内部还使用到了CAS自旋锁,计划出一章关于锁类型的文章。
5)LinkedTransferQueue
如果理解了前面的 SynchronousQueue
的话,那么 LinkedTransferQueue
就很好理解了。
简单来说,SynchronousQueue
中的 TransferQueue
直接维护在 LinkedTransferQueue
里面,少了一层抽象内部类。
如果 SynchronousQueue
存取操作都会阻塞,只有配对上才会唤醒。那么 LinkedTransferQueue
做了一定的简化和增强,其中一项就是可以自己决定是否阻塞存取操作。
一起来测试一下吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package com.banmoon.queue;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil;
import java.util.Date; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit;
public class LinkedTransferQueueTest {
public static void main(String[] args) throws InterruptedException { test1();
}
private static void test1() throws InterruptedException { LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
new Thread(() -> { try { for (int i = 0; i < 3; i++) { String str = queue.take(); TimeUnit.SECONDS.sleep(1); System.out.println(StrUtil.format("取到了{},{}", str, DateUtil.formatTime(new Date()))); } } catch (InterruptedException e) { e.printStackTrace(); } }).start();
queue.transfer("A"); System.out.println("插入了A"); queue.transfer("B"); System.out.println("插入了B"); queue.transfer("C"); System.out.println("插入了C"); }
private static void test2() throws InterruptedException { LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
for (int i = 0; i < 3; i++) { new Thread(() -> { try { System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date()))); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
TimeUnit.SECONDS.sleep(2);
System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A"), DateUtil.formatTime(new Date()))); System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B"), DateUtil.formatTime(new Date()))); System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C"), DateUtil.formatTime(new Date()))); }
private static void test3() throws InterruptedException { LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
new Thread(() -> { for (int i = 0; i < 3; i++) { try { TimeUnit.SECONDS.sleep(1); System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date()))); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date()))); }
}
|
执行 test1
,transfer
方法插入的操作会阻塞,直到有取的操作进入相匹配,这效果和 SynchronousQueue
一样
执行 test2
。简单来说,存元素时,如果已有取操作阻塞了,将返回true。否则,直接返回false
执行 test3
,存操作时,没有取操作匹配,将会等待一段时间再进行返回
5)PriorityBlockingQueue
PriorityBlockingQueue
优先级阻塞队列,存元素时不会阻塞,取元素时为空则阻塞。和其他阻塞队列相比,他的数组维护了一个二叉堆,对元素进行了优先级的排序。
测试下列代码,自定义实现比较器,这边就按照数字大小排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.banmoon.queue;
import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(3, new MyComparator()); queue.put(1); queue.put(3); queue.put(2);
System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take());
}
}
class MyComparator implements Comparator<Integer> {
@Override public int compare(Integer o1, Integer o2) { if(o1==o2) return 0; return o1>o2? -1: 1; } }
|
执行结果
6)LinkedBlockingDeque
这个和 LinkedBlockingQueue
好像,我可以称其为 LinkedBlockingQueue
的升级版。
LinkedBlockingDeque
被称为双端队列,是因为 LinkedBlockingDeque
可以往链表两头插入元素。他的存储结构是链表,同 LinkedBlockingQueue
一致,又因为存取可以在双端进行,所以不能像 LinkedBlockingQueue
一样给两把锁,这里保持了和 ArrayBlockingQueue
一样,仅有一把锁保持线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.banmoon.queue;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest {
public static void main(String[] args) throws InterruptedException { LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(4); deque.put("A"); deque.put("B"); deque.put("C"); deque.putFirst("D");
System.out.println(deque.take()); System.out.println(deque.takeLast()); }
}
|
执行结果,此队列中还有其他的api,都可以指定在队首或者队尾存元素,取元素同理
六、最后
在以前,并没有熟悉去使用过并发包的东西,在这次整理后,我对并发包有一定的了解。尤其是阻塞队列这一块,在看源码的时候还是挺有意思的。
当然本文没有列出所有使用 Lock
锁的api方法,提供上jkd8在线文档,学习要对着文档。
关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。
我是半月,祝你幸福!!!