多线程---并发容器的使用
1. 容器概览
2. 容器的使用
1. Map
多线程环境下,几种Map的处理效率和线程安全问题
测试方案:准备10万条数据, 10个线程, 每个线程处理1万条数据, 并发执行。
数据准备
publicclassConstants{publicstaticfinalint COUNT =1000000;publicstaticfinalint THREAD_COUNT =100;}
1. HashTable
HashTable是线程安全的, 因为里面加了很多synchronized, 每次put操作, 都会锁整个Map
publicclassT01_TestHashtable{//HashMap是jdk1.0产生的, 最早出现, 里面的很多方法都加了synchronized锁, 所以效率比较低staticHashtable<UUID, UUID> m =newHashtable<>();staticint count =Constants.COUNT;static UUID[] keys =new UUID[count];static UUID[] values =new UUID[count];staticfinalint THREAD_COUNT =Constants.THREAD_COUNT;//先准备好测试用例, 减少干扰因素static{for(int i =0; i < count; i++){
keys[i]= UUID.randomUUID();
values[i]= UUID.randomUUID();}}/**
* 每个线程处理1万条数据,
*/staticclassMyThreadextendsThread{int start;int gap = count / THREAD_COUNT;publicMyThread(int start){this.start = start;}@Overridepublicvoidrun(){//从start开始, 到start+gap个结束for(int i = start; i < start + gap; i++){
m.put(keys[i], values[i]);}}}publicstaticvoidmain(String[] args){long start =System.currentTimeMillis();Thread[] threads =newThread[THREAD_COUNT];for(int i =0; i < threads.length; i++){//设置每个线程的起始值不一样
threads[i]=newMyThread(i *(count / THREAD_COUNT));}for(Thread t : threads){
t.start();}for(Thread t : threads){try{//主线程等待每个线程都执行结束
t.join();}catch(InterruptedException e){
e.printStackTrace();}}long end =System.currentTimeMillis();//插入执行的时间System.out.println(end - start);System.out.println(m.size());//-----------------------------------
start =System.currentTimeMillis();for(int i =0; i < threads.length; i++){
threads[i]=newThread(()->{for(int j =0; j <10000000; j++){
m.get(keys[10]);}});}for(Thread t : threads){
t.start();}for(Thread t : threads){try{//主线程需要等待所有子线程执行完
t.join();}catch(InterruptedException e){
e.printStackTrace();}}
end =System.currentTimeMillis();System.out.println(end - start);}}
- 执行结果
写耗时: 290
1000000
读耗时: 29602
可以看到执行时间在300毫秒左右
2. HashMap
sun在HashTablede的基础上推出了无锁的HashMap
publicclassT02_TestHashMap{//sun在HashTablede的基础上推出了无锁的HashMapstaticHashMap<UUID, UUID> m =newHashMap<>();staticint count =Constants.COUNT;static UUID[] keys =new UUID[count];static UUID[] values =new UUID[count];staticfinalint THREAD_COUNT =Constants.THREAD_COUNT;static{for(int i =0; i < count; i++){
keys[i]= UUID.randomUUID();
values[i]= UUID.randomUUID();}}staticclassMyThreadextendsThread{int start;int gap = count / THREAD_COUNT;publicMyThread(int start){this.start = start;}@Overridepublicvoidrun(){for(int i = start; i < start + gap; i++){//HashMap在resize(扩容)时, 可能会出现多个线程同时进行, 导致数据丢失甚至死锁情况
m.put(keys[i], values[i]);}}}publicstaticvoidmain(String[] args){long start =System.currentTimeMillis();Thread[] threads =newThread[THREAD_COUNT];for(int i =0; i < threads.length; i++){
threads[i]=newMyThread(i *(count / THREAD_COUNT));}for(Thread t : threads){
t.start();}for(Thread t : threads){try{
t.join();}catch(InterruptedException e){
e.printStackTrace();}}long end =System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());}}
- 执行结果
写耗时: 171
814683
读耗时: 2005
可以看到即使执行成功, 总数也不是10万条, 因为设计到Map的resize扩容, 多线程环境下, 同时进入扩容操作, 会出现问题, 具体参考: https://www.jianshu.com/p/e2f75c8cce01
3. SynchronizedHashMap
相当于有锁版的HashMap, 锁的粒度比HashTable小了一些
publicclassT03_TestSynchronizedHashMap{//相当于有锁版的HashMap, 锁的粒度比HashTable小了一些staticMap<UUID, UUID> m =Collections.synchronizedMap(newHashMap<UUID, UUID>());staticint count =Constants.COUNT;static UUID[] keys =new UUID[count];static UUID[] values =new UUID[count];staticfinalint THREAD_COUNT =Constants.THREAD_COUNT;static{for(int i =0; i < count; i++){
keys[i]= UUID.randomUUID();
values[i]= UUID.randomUUID();}}staticclassMyThreadextendsThread{int start;int gap = count / THREAD_COUNT;publicMyThread(int start){this.start = start;}@Overridepublicvoidrun(){for(int i = start; i < start + gap; i++){
m.put(keys[i], values[i]);}}}publicstaticvoidmain(String[] args){long start =System.currentTimeMillis();Thread[] threads =newThread[THREAD_COUNT];for(int i =0; i < threads.length; i++){
threads[i]=newMyThread(i *(count / THREAD_COUNT));}for(Thread t : threads){
t.start();}for(Thread t : threads){try{
t.join();}catch(InterruptedException e){
e.printStackTrace();}}long end =System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------
start =System.currentTimeMillis();for(int i =0; i < threads.length; i++){
threads[i]=newThread(()->{for(int j =0; j <10000000; j++){
m.get(keys[10]);}});}for(Thread t : threads){
t.start();}for(Thread t : threads){try{
t.join();}catch(InterruptedException e){
e.printStackTrace();}}
end =System.currentTimeMillis();System.out.println(end - start);}}
- 执行结果
写耗时: 334
1000000
读耗时: 29575
4. ConcurrentHashMap
ConcurrentHashMap并发插入的时候, 效率未必比其他Map高, 但是并发读取的时候效率很
/**
* ConcurrentHashMap并发插入的时候, 效率未必比其他Map高, 但是并发读取的时候效率很高
*/publicclassT04_TestConcurrentHashMap{//结合了JUC包里的一些锁staticMap<UUID, UUID> m =newConcurrentHashMap<>();staticint count =Constants.COUNT;static UUID[] keys =new UUID[count];static UUID[] values =new UUID[count];staticfinalint THREAD_COUNT =Constants.THREAD_COUNT;static{for(int i =0; i < count; i++){
keys[i]= UUID.randomUUID();
values[i]= UUID.randomUUID();}}staticclassMyThreadextendsThread{int start;int gap = count/THREAD_COUNT;publicMyThread(int start){this.start = start;}@Overridepublicvoidrun(){for(int i=start; i<start+gap; i++){
m.put(keys[i], values[i]);}}}publicstaticvoidmain(String[] args){long start =System.currentTimeMillis();Thread[] threads =newThread[THREAD_COUNT];for(int i=0; i<threads.length; i++){
threads[i]=newMyThread(i *(count/THREAD_COUNT));}for(Thread t : threads){
t.start();}for(Thread t : threads){try{
t.join();}catch(InterruptedException e){
e.printStackTrace();}}long end =System.currentTimeMillis();System.out.println("写耗时: "+(end - start));System.out.println(m.size());//-----------------------------------
start =System.currentTimeMillis();for(int i =0; i < threads.length; i++){
threads[i]=newThread(()->{for(int j =0; j <10000000; j++){
m.get(keys[10]);}});}for(Thread t : threads){
t.start();}for(Thread t : threads){try{
t.join();}catch(InterruptedException e){
e.printStackTrace();}}
end =System.currentTimeMillis();System.out.println("读耗时: "+(end - start));}}
写耗时: 160
1000000
读耗时: 1108
2. Collection
多线程环境下集合的使用, 由一个测试题引入
测试题:
有N张火车票,每张票都有一个编号
同时有10个窗口对外售票
请写一个模拟程序
1. ArrayList
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
*
*
* @author cyc
*/packagecom.cyc.juc.c_024_FromVectorToQueue;importjava.util.ArrayList;importjava.util.List;publicclassTicketSeller1ArrayList{staticList<String> tickets =newArrayList<>();static{for(int i =0; i <10000; i++){//提前将票添加到集合中
tickets.add("票编号:"+ i);}}publicstaticvoidmain(String[] args){for(int i =0; i <10; i++){newThread(()->{while(tickets.size()>0){//有可能只剩了一张票, 但是多个线程都通过了这个判断, 然后多线程直接开始remove, 所以会导致超卖的情况System.out.println("销售了--"+ tickets.remove(0));}}).start();}}}
执行结果
Exception in thread “Thread-7” java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.remove(ArrayList.java:505)
at com.cyc.juc.c_024_FromVectorToQueue.TicketSeller1ArrayList.lambda$main$0(TicketSeller1ArrayList.java:32)
at java.lang.Thread.run(Thread.java:748)
最后多个线程都进入了判断, 并开始卖最后一张票, 导致数组越界异常。
Queue和List对比
- Queue中提供了一些对线程友好的API offer, peek, poll
BlockingQueue
中的put方法和take方法可实现线程堵塞
2. Vector
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
*
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
*
* @author cyc
*/packagecom.cyc.juc.c_024_FromVectorToQueue;importjava.util.Vector;importjava.util.concurrent.TimeUnit;publicclassTicketSeller2Vector{staticVector<String> tickets =newVector<>();static{for(int i =0; i <1000; i++){
tickets.add("票 编号:"+ i);}}publicstaticvoidmain(String[] args){for(int i =0; i <10; i++){newThread(()->{while(tickets.size()>0){System.out.println("销售了--"+ tickets.remove(0));}}).start();}}}
- 执行结果可以正常输出
但是如果在
while(tickets.size()>0){System.out.println("销售了--"+ tickets.remove(0));}
之中还有其他业务逻辑需要执行, 假设执行10毫秒, 即代码如下
while(tickets.size()>0){//所谓的线程安全 , 是调用size()和remove()方法时加锁了, 但是在执行完size()之前和remove()之后, 并没有加锁,//这个过程就不是原子的try{TimeUnit.MILLISECONDS.sleep(10);}catch(InterruptedException e){
e.printStackTrace();}System.out.println("销售了--"+ tickets.remove(0));}
- 输出结果
Exception in thread "Thread-9"java.lang.ArrayIndexOutOfBoundsException:Array index out of range:0
at java.util.Vector.remove(Vector.java:834)
at com.cyc.juc.c_024_FromVectorToQueue.TicketSeller2Vector.lambda$main$0(TicketSeller2Vector.java:42)
at java.lang.Thread.run(Thread.java:748)
同样导致数据越界异常, 因为Vector的size和remove操作执行过程中不是原子的
- 优化
给tickets加锁, 并在代码中判断tickets.size()是否等于0
packagecom.cyc.juc.c_024_FromVectorToQueue;importjava.util.Vector;importjava.util.concurrent.TimeUnit;publicclassTicketSeller2Vector{staticVector<String> tickets =newVector<>();static{for(int i =0; i <1000; i++){
tickets.add("票 编号:"+ i);}}publicstaticvoidmain(String[] args){for(int i =0; i <10; i++){newThread(()->{while(tickets.size()>0){synchronized(tickets){//所谓的线程安全 , 是调用size()和remove()方法时加锁了, 但是在执行完size()之前和remove()之后, 并没有加锁,//这个过程就不是原子的if(tickets.size()<=0){break;}try{TimeUnit.MILLISECONDS.sleep(10);}catch(InterruptedException e){
e.printStackTrace();}System.out.println("销售了--"+ tickets.remove(0));}}}).start();}}}
3. LinkedList
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
*
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
*
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
*
* @author cyc
*/packagecom.cyc.juc.c_024_FromVectorToQueue;importjava.util.LinkedList;importjava.util.List;importjava.util.concurrent.TimeUnit;publicclassTicketSeller3VoctorWithSynchronize{staticList<String> tickets =newLinkedList<>();static{for(int i =0; i <1000; i++){
tickets.add("票 编号:"+ i);}}publicstaticvoidmain(String[] args){for(int i =0; i <10; i++){newThread(()->{while(true){//给tickets对象加锁, 保证原子性, 实现线程安全synchronized(tickets){if(tickets.size()<=0){break;}try{TimeUnit.MILLISECONDS.sleep(10);}catch(InterruptedException e){
e.printStackTrace();}System.out.println("销售了--"+ tickets.remove(0));}}}).start();}}}
4. ConcurrentLinkedQueue
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
*
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
*
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
*
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
*
* 使用ConcurrentQueue提高并发性
*
* @author cyc
*/publicclassTicketSeller4ConcurrentLinkedQueue{//多线程的程序应多考虑使用队列staticQueue<String> tickets =newConcurrentLinkedQueue<>();static{for(int i =0; i <1000; i++){
tickets.add("票 编号:"+ i);}}publicstaticvoidmain(String[] args){for(int i =0; i <10; i++){newThread(()->{while(true){String s = tickets.poll();if(s ==null){break;}else{System.out.println("销售了--"+ s);}}}).start();}}}
执行结果正常, 并且速度也很快
3. 并发容器的使用
1. ConcurrentHashMap
/**
* 几种并发容器Map的使用
* ConcurrentSkipListMap参考 https://blog.csdn.net/sunxianghuang/article/details/52221913
*/publicclassT01_ConcurrentMap{publicstaticvoidmain(String[] args){Map<String,String> map =newConcurrentHashMap<>();//高并发无序// Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX//TreeMapRandom r =newRandom();Thread[] threads =newThread[100];//倒数门栓CountDownLatch latch =newCountDownLatch(threads.length);long start =System.currentTimeMillis();//100个线程并发往map里各写入10000条数据for(int i =0; i < threads.length; i++){
threads[i]=newThread(()->{for(int j =0; j <10000; j++){
map.put("a"+ r.nextInt(100000),"a"+ r.nextInt(100000));// map.put("a" + m.getAndIncrement(), "a" + m.getAndIncrement());}
latch.countDown();});}Arrays.asList(threads).forEach(t -> t.start());try{
latch.await();}catch(InterruptedException e){
e.printStackTrace();}long end =System.currentTimeMillis();System.out.println(end - start);System.out.println(map.size());}}
2. CopyOnWriteArrayList
/**
* 写时复制容器 copy on write
* 多线程环境下,写时效率低,读时效率高
* 适合写少读多的环境
* 因为每次写的时候都要复制一份,
*
* @author cyc
*/packagecom.cyc.juc.c_025;importjava.util.Arrays;importjava.util.List;importjava.util.Random;importjava.util.Vector;importjava.util.concurrent.CopyOnWriteArrayList;publicclassT02_CopyOnWriteList{publicstaticvoidmain(String[] args){List<String> lists =// new ArrayList<>(); //这个会出并发问题!// new Vector();newCopyOnWriteArrayList<>();//写的时候加锁, 读的时候不加锁Random r =newRandom();Thread[] threads =newThread[100];for(int i=0; i<threads.length; i++){Runnable task =newRunnable(){@Overridepublicvoidrun(){for(int i=0; i<1000; i++){
lists.add("a"+ r.nextInt(10000));}}};
threads[i]=newThread(task);}//写操作runAndWriteComputeTime(threads);for(int i=0; i<threads.length; i++){Runnable task =()->{for(String list : lists){String s = list;}};
threads[i]=newThread(task);}//读操作runAndReadComputeTime(threads);System.out.println("list大小: "+lists.size());}privatestaticvoidrunAndReadComputeTime(Thread[] threads){long s1 =System.currentTimeMillis();Arrays.asList(threads).forEach(t->t.start());Arrays.asList(threads).forEach(t->{try{//主线程等待所有线程执行完
t.join();}catch(InterruptedException e){
e.printStackTrace();}});long s2 =System.currentTimeMillis();System.out.println("读耗时: "+(s2 - s1));}staticvoidrunAndWriteComputeTime(Thread[] threads){long s1 =System.currentTimeMillis();Arrays.asList(threads).forEach(t->t.start());Arrays.asList(threads).forEach(t->{try{//主线程等待所有线程执行完
t.join();}catch(InterruptedException e){
e.printStackTrace();}});long s2 =System.currentTimeMillis();System.out.println("写耗时: "+(s2 - s1));}}
- 先来测试同样为线程安全的collection容器Vector
输出结果
写耗时: 86
读耗时: 401
list大小: 100000
- 测试CopyOnWriteArrayList
输出结果
写耗时: 2769
读耗时: 41
list大小: 100000
可以看到CopyOnWriteArrayList写操作比较慢, 这正是由于每次写入都要进行复制操作 , 但是读数据非常快。
3. ConcurrentLinkedQueue
packagecom.cyc.juc.c_025;importjava.util.Queue;importjava.util.concurrent.ConcurrentLinkedQueue;/**
* ConcurrentLinkedQueue并发队列, 先进先出
*/publicclassT04_ConcurrentQueue{publicstaticvoidmain(String[] args){Queue<String> strs =newConcurrentLinkedQueue<>();for(int i=0; i<10; i++){
strs.offer("a"+ i);//add}System.out.println(strs);System.out.println(strs.size());//poll取出, 并删除一个System.out.println(strs.poll());System.out.println(strs.size());//peak 取出 , 不删除System.out.println(strs.peek());System.out.println(strs.size());}}
- 输出结果
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0
9
a1
9
4. LinkedBlockingQueue
- LinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLock
- LinkedBlockingQueue内部维护的是一个链表结构
- 在生产和消费的时候,需要创建Node对象进行插入或移除,大批量数据的系统中,其对于GC的压力会比较大
- LinkedBlockingQueue有默认的容量大小为:Integer.MAX_VALUE,当然也可以传入指定的容量大小
代码演示
/**
* BlockingQueue天生的阻塞队列
*/publicclassT05_LinkedBlockingQueue{//staticBlockingQueue<String> strs =newLinkedBlockingQueue<>();staticRandom r =newRandom();publicstaticvoidmain(String[] args){newThread(()->{for(int i =0; i <100; i++){try{
strs.put("a"+ i);//如果满了,就会等待TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));}catch(InterruptedException e){
e.printStackTrace();}}},"p1").start();for(int i =0; i <5; i++){newThread(()->{for(;;){try{System.out.println(Thread.currentThread().getName()+" take -"+ strs.take());//如果空了,就会等待}catch(InterruptedException e){
e.printStackTrace();}}},"c"+ i).start();}}}
5. ArrayBlockingQueue
- ArrayBlockingQueue生产者和消费者使用的是同一把锁
- ArrayBlockingQueue内部维护了一个数组在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例插入操作
- ArrayBlockingQueue在初始化的时候,必须传入一个容量大小的值
6. DelayQueue
packagecom.cyc.juc.c_025;importjava.util.Random;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.DelayQueue;importjava.util.concurrent.Delayed;importjava.util.concurrent.TimeUnit;/**
* 按照等待时间长短排序执行 , 可以自定义执行优先级
* 用户: 按照时间进行调度,
*/publicclassT07_DelayQueue{staticBlockingQueue<MyTask> tasks =newDelayQueue<>();staticRandom r =newRandom();staticclassMyTaskimplementsDelayed{String name;long runningTime;MyTask(String name,long rt){this.name = name;this.runningTime = rt;}@OverridepublicintcompareTo(Delayed o){if(this.getDelay(TimeUnit.MILLISECONDS)< o.getDelay(TimeUnit.MILLISECONDS)){return-1;}elseif(this.getDelay(TimeUnit.MILLISECONDS)> o.getDelay(TimeUnit.MILLISECONDS)){return1;}else{return0;}}@OverridepubliclonggetDelay(TimeUnit unit){return unit.convert(runningTime -System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@OverridepublicStringtoString(){return name +" "+ runningTime;}}publicstaticvoidmain(String[] args)throwsInterruptedException{long now =System.currentTimeMillis();MyTask t1 =newMyTask("t1", now +1000);MyTask t2 =newMyTask("t2", now +2000);MyTask t3 =newMyTask("t3", now +1500);MyTask t4 =newMyTask("t4", now +2500);MyTask t5 =newMyTask("t5", now +500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);System.out.println(tasks);for(int i =0; i <5; i++){System.out.println(tasks.take());}}}
7. SynchronousQueue
/**
* 容量为0
* SynchronousQueue 同步Queue
* 常用作一个线程给另一个容器下单任务
* @author fei
*/publicclassT08_SynchronusQueue{//容量为0publicstaticvoidmain(String[] args)throwsInterruptedException{BlockingQueue<String> strs =newSynchronousQueue<>();newThread(()->{try{System.out.println(strs.take());}catch(InterruptedException e){
e.printStackTrace();}}).start();
strs.put("aaa");//阻塞等待消费者消费//必须有消费者进行消费才可以继续put,如果没有消费者, 则这里就会堵塞在这里// strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());}}
8. LinkedTransferQueue
packagecom.cyc.juc.c_025;importjava.util.concurrent.LinkedTransferQueue;/**
* 给线程传递任务
* 用于需要得到执行结果, 才能继续往下执行, 类似MQ的一个返回处理结果机制
* @author cyc
*/publicclassT09_TransferQueue{publicstaticvoidmain(String[] args)throwsInterruptedException{LinkedTransferQueue<String> strs =newLinkedTransferQueue<>();newThread(()->{try{System.out.println(strs.take());}catch(InterruptedException e){
e.printStackTrace();}}).start();//和put的区别在于, 线程put完之后, 执行put的线程就可以走了,// 而transfer, 执行transfer的线程必须原地等着, 阻塞住, 直到有线程取走transfer的值才行//put是满了才等着, transfer是来了就得等着
strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/}}
3. 练习题
要求用线程顺序打印A1B2C3…Z26
1. 使用LockSupport
/**
* Locksupport park 当前线程阻塞(停止)
* unpark(Thread t)
*/publicclassT02_00_LockSupport{staticThread t1 =null, t2 =null;publicstaticvoidmain(String[] args)throwsException{char[] aC ="ABCDEFG".toCharArray();char[] aI ="1234567".toCharArray();
t1 =newThread(()->{for(char c : aC){System.out.print(c);LockSupport.unpark(t2);//叫醒t2LockSupport.park();//t1阻塞}},"t1");
t2 =newThread(()->{for(char c : aI){//保证t1先执行LockSupport.park();//T2阻塞System.out.print(c);LockSupport.unpark(t1);//叫醒T1}},"t2");
t1.start();
t2.start();}}
输出结果
A1B2C3D4E5F6G7
2. 使用synchronized, notify,wait
publicclassT06_00_sync_wait_notify{publicstaticvoidmain(String[] args){//新建一个需要被锁的对象finalObject o =newObject();char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();newThread(()->{synchronized(o){for(char c : aI){System.out.print(c);try{
o.notify();//叫醒第二个线程//这里为什么使用wait而不是sleep, wait可以释放锁, sleep不会释放锁
o.wait();//让出锁}catch(InterruptedException e){
e.printStackTrace();}}//必须,否则无法停止程序, 因为两个线程最后会有一个是wait状态,// 如果不进行o.notify(),则无法叫醒那个wait()的线程
o.notify();}},"t1").start();newThread(()->{synchronized(o){for(char c : aC){System.out.print(c);try{
o.notify();
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}
o.notify();}},"t2").start();}}
保证执行顺序 , 先打印A, 再打印1, 然后依次B,2,C,3…
使用CountDownLatch保证顺序
packagecom.cyc.juc.c_026_00_interview.A1B2C3;importjava.util.concurrent.CountDownLatch;/**
* 多线程保证执行顺序, 使A先输出, 再输出1
*/publicclassT07_00_sync_wait_notify_sort{privatestaticCountDownLatch latch =newCountDownLatch(1);publicstaticvoidmain(String[] args){finalObject o =newObject();char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();newThread(()->{try{
latch.await();}catch(InterruptedException e){
e.printStackTrace();}synchronized(o){for(char c : aI){System.out.print(c);try{//唤醒第二个线程
o.notify();//让出锁
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}
o.notify();}},"t1").start();newThread(()->{synchronized(o){for(char c : aC){System.out.print(c);//输出之后, 进行减1
latch.countDown();try{//唤醒第一个线程
o.notify();//让出锁
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}
o.notify();}},"t2").start();}}
使用volatile+标识字段
packagecom.cyc.juc.c_026_00_interview.A1B2C3;importjava.util.concurrent.CountDownLatch;/**
* 多线程保证执行顺序, 使A先输出, 再输出1
*/publicclassT07_00_sync_wait_notify_sort{privatestaticvolatileboolean t2Started =false;publicstaticvoidmain(String[] args){finalObject o =newObject();char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();newThread(()->{synchronized(o){while(!t2Started){try{
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}for(char c : aI){System.out.print(c);try{//唤醒第二个线程
o.notify();//让出锁
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}
o.notify();}},"t1").start();newThread(()->{synchronized(o){for(char c : aC){System.out.print(c);
t2Started =true;try{//唤醒第一个线程
o.notify();//让出锁
o.wait();}catch(InterruptedException e){
e.printStackTrace();}}
o.notify();}},"t2").start();}}
3. 使用Lock.newCondition()
/**
* 使用lock.newCondition()创建等待队列
* @author cyc
*/publicclassT08_00_lock_condition{publicstaticvoidmain(String[] args){char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();Lock lock =newReentrantLock();Condition condition = lock.newCondition();newThread(()->{try{
lock.lock();for(char c : aI){System.out.print(c);//唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中
condition.signal();//当前线程进入等待状态
condition.await();}
condition.signal();}catch(Exception e){
e.printStackTrace();}finally{
lock.unlock();}},"t1").start();newThread(()->{try{
lock.lock();for(char c : aC){System.out.print(c);
condition.signal();
condition.await();}
condition.signal();}catch(Exception e){
e.printStackTrace();}finally{
lock.unlock();}},"t2").start();}}
优化
/**
* Condition本质是锁资源上不同的等待队列
*/publicclassT09_00_lock_condition_two{publicstaticvoidmain(String[] args){char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();Lock lock =newReentrantLock();//condition实际上是一个等待队列Condition conditionT1 = lock.newCondition();Condition conditionT2 = lock.newCondition();newThread(()->{try{
lock.lock();for(char c : aI){System.out.print(c);
conditionT2.signal();
conditionT1.await();}
conditionT2.signal();}catch(Exception e){
e.printStackTrace();}finally{
lock.unlock();}},"t1").start();newThread(()->{try{
lock.lock();for(char c : aC){System.out.print(c);
conditionT1.signal();
conditionT2.await();}
conditionT1.signal();}catch(Exception e){
e.printStackTrace();}finally{
lock.unlock();}},"t2").start();}}
4. 使用Exchanger
packagecom.cyc.juc.c_026_00_interview.A1B2C3;importjava.util.concurrent.Exchanger;publicclassT12_00_Exchanger_Not_Work{privatestaticExchanger<String> exchanger =newExchanger<>();publicstaticvoidmain(String[] args){char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();newThread(()->{for(int i=0; i<aI.length; i++){System.out.print(aI[i]);try{//Exchanger类中的exchange(String x) 方法具有阻塞的特点,也就是说此方法被调用后等待其他线程来获取数据,// 如果没有其他线程取得数据,则就会一直阻塞等待下去。
exchanger.exchange("T1");}catch(InterruptedException e){
e.printStackTrace();}}}).start();newThread(()->{for(int i=0; i<aC.length; i++){try{
exchanger.exchange("T2");}catch(InterruptedException e){
e.printStackTrace();}System.out.print(aC[i]);}}).start();}}
5. 使用TransferQueue队列
packagecom.cyc.juc.c_026_00_interview.A1B2C3;importjava.util.concurrent.LinkedTransferQueue;importjava.util.concurrent.TransferQueue;/**
* TransferQueue
*/publicclassT13_TransferQueue{publicstaticvoidmain(String[] args){char[] aI ="1234567".toCharArray();char[] aC ="ABCDEFG".toCharArray();TransferQueue<Character> queue =newLinkedTransferQueue<>();newThread(()->{try{for(char c : aI){//第一个线程作为消费者,去take , 如果没有, 则等待System.out.print(queue.take());
queue.transfer(c);}}catch(InterruptedException e){
e.printStackTrace();}},"t1").start();newThread(()->{try{for(char c : aC){//将c交给对方去打印
queue.transfer(c);System.out.print(queue.take());}}catch(InterruptedException e){
e.printStackTrace();}},"t2").start();}}
6. 使用CAS-1
packagecom.cyc.juc.c_026_00_interview.A1B2C3;publicclassT03_00_cas{enumReadyToRun{T1, T2}//保证线程之间的可见性, 当r有改动时, 可以立即通知其他线程进行更新缓存staticvolatileReadyToRun r =ReadyToRun.T1;publicstaticvoidmain(String[] args){char[] aC ="ABCDEFG".toCharArray();char[] aI ="1234567".toCharArray();newThread(()->{for(char c : aC){while(r !=ReadyToRun.T1){}System.out.print(c);
r =ReadyToRun.T2;}},"t1").start();newThread(()->{for(char c : aI){while(r !=ReadyToRun.T2){}System.out.print(c);
r =ReadyToRun.T1;}},"t2").start();}}
7. 使用CAS-2
package com.cyc.juc.c_026_00_interview.A1B2C3;
import java.util.concurrent.atomic.AtomicInteger;
public class T05_00_AtomicInteger {
static AtomicInteger threadNo = new AtomicInteger(1);
public static void main(String[] args) {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
new Thread(() -> {
for (char c : aI) {
while (threadNo.get() != 1) {}
System.out.print(c);
threadNo.set(2);
}
}, "t1").start();
new Thread(() -> {
for (char c : aC) {
while (threadNo.get() != 2) {}
System.out.print(c);
threadNo.set(1);
}
}, "t2").start();
}
}
版权归原作者 意田天 所有, 如有侵权,请联系我们删除。