本文共 18015 字,大约阅读时间需要 60 分钟。
线程池:
创建有限的线程资源为更多的任务提供服务。享元模式
1).创建一个固定大小的线程池:
ExecutorService threadPool = Executors.newFixedThreadPool(2);
package ogr.westos.Demo;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Demo10 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { threadPool.submit(()->{ System.out.println(Thread.currentThread().getName()+"任务执行"); }); }// 执行带有返回结果的任务 Future future = threadPool.submit(() -> { System.out.println(Thread.currentThread().getName()+"执行计算..."); Thread.sleep(1000); return 10; }); System.out.println(future.get()); threadPool.shutdown(); // 不接收新任务,当所有任务运行结束,整个线程池关闭 }}
2)一个核心的ExecutorService的实现类:ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize(最大线程数)
keepAliveTime 生存时间- 针对救急线程
unit 时间单位 秒
workQueue 阻塞队列 如果任务超过了核心线程数,进入队列进行排队,直到有空闲的线程
如果任务过多,阻塞队列都放不下了,还会创建新的线程来救急
3).创建固定大小的线程池
Executors.newFixedThreadPool(2); 核心线程数=最大线程数(没有救急线程被创建) 阻塞队列 无界,可以放任意数量的任务, 适合执行数量有限,长时间运行的任务4).创建缓冲线程池
Executors.newCachedThreadPool() 核心线程数是0, 最大线程数是Integer的最大值(救急线程可以无限创建) 生存时间是60s 适合任务数比较密集,但每个任务执行时间较短的情况5)创建单线程线程池
Executors.newSingleThreadExecutor() 使用场景:希望多个任务排队执行区别:
Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改 Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改package day19;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;public class TestSingleThreadPool { public static void main(String[] args) { // 创建单线程线程池 /*ThreadPoolExecutor service2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); service2.setCorePoolSize(2);*/ ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 3; i++) { int x = i; service.submit(() -> { System.out.println("任务.." + x); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }}
5).带有日程安排功能的线程池
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
(1) 让任务推迟一段时间执行
参数1.任务对象, 参数2,3 推迟的时间 service.schedule(()->{ System.out.println("执行任务..."); }, 10L, TimeUnit.SECONDS);*/(2) 以一定的频率反复执行任务(任务不会重叠)
参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位
service.scheduleAtFixedRate(()->{ try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);*/(3).delay表示从上一个任务结束,到下一个任务开始之间的时间
service.scheduleWithFixedDelay(()->{ System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);package day19;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class TestScheduleThreadPool { public static void main(String[] args) { // 带有日程安排功能的线程池 ScheduledExecutorService service = Executors.newScheduledThreadPool(5); // 让任务推迟一段时间执行 // 参数1.任务对象, 参数2,3 推迟的时间 /*service.schedule(()->{ System.out.println("执行任务..."); }, 10L, TimeUnit.SECONDS);*/ // 以一定的频率反复执行任务(任务不会重叠) // 参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位 /*service.scheduleAtFixedRate(()->{ try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);*/ // delay表示从上一个任务结束,到下一个任务开始之间的时间 service.scheduleWithFixedDelay(()->{ System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);// service.shutdown(); }}package day19;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class TestScheduleThreadPool { public static void main(String[] args) { // 带有日程安排功能的线程池 ScheduledExecutorService service = Executors.newScheduledThreadPool(5); // 让任务推迟一段时间执行 // 参数1.任务对象, 参数2,3 推迟的时间 /*service.schedule(()->{ System.out.println("执行任务..."); }, 10L, TimeUnit.SECONDS);*/ // 以一定的频率反复执行任务(任务不会重叠) // 参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位 /*service.scheduleAtFixedRate(()->{ try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);*/ // delay表示从上一个任务结束,到下一个任务开始之间的时间 service.scheduleWithFixedDelay(()->{ System.out.println("hello"); }, 0, 1, TimeUnit.SECONDS);// service.shutdown(); }}
6).原子操作类
AtomicInteger
AtomicBooleanpackage day19;import java.util.concurrent.atomic.AtomicInteger;public class TestAtomicInteger { // 创建原子整数类 private static AtomicInteger i = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int j = 0; j < 5000; j++) { i.getAndIncrement(); // 获取并且自增 i++// i.incrementAndGet(); // 自增并且获取 ++i } }); Thread t2 = new Thread(() -> { for (int j = 0; j < 5000; j++) { i.getAndDecrement(); // 获取并且自减 i-- } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); }}
7).线程安全集合类
StringBuffer 线程安全
String 不可变类 , 都是线程安全的 Random 线程安全 Vector 实现了List,并且线程安全 Hashtable 实现了Map,并且线程安全5.0新增的线程安全集合类
ConcurrentHashMap 实现了Map,并且线程安全 ConcurrentSkipListMap 实现了Map(可排序),并且线程安全 CopyOnWriteArrayList 实现了List,并且线程安全8).阻塞队列
BlockingQueue 阻塞队列
队列 FIFO , first in first outQueue --> LinkedList
import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class TestQueue { public static void main(String[] args) throws InterruptedException { // 创建队列 Queuequeue = new LinkedList<>(); queue.offer("1"); queue.offer("2"); queue.offer("3"); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); // 线程安全的队列, 其中capacity 表示队列的元素上限 BlockingQueue queue1 = new ArrayBlockingQueue<>(3); // 或 new LinkedBlockingDeque<>(3); queue1.put("a"); queue1.put("b"); queue1.put("c"); System.out.println(queue1); /* queue1.put("d");// 如果放入多于上限的元素时,put方法会被阻塞*/ System.out.println(queue1.take()); System.out.println(queue1.take()); System.out.println(queue1.take()); System.out.println(queue1); System.out.println(queue1.take()); /*如果队列中没有元素了,take方法会被阻塞*/ }}
阻塞队列消费者生产者例子:
1) 没有使用阻塞队列
package day19;import java.util.ArrayList;import java.util.List;import java.util.Random;public class TestProduct { // 解耦生产和消费 private static Listproducts = new ArrayList<>(); private static Random r = new Random(); public static void main(String[] args) { // 生产者线程 new Thread(()->{ for (int i = 0; i < 10; i++) { synchronized (products) { while (products.size() == 5) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = new Product(i); System.out.println(Thread.currentThread().getName() + "生产了商品:" + p); products.add(p); products.notifyAll();// 唤醒消费者线程:可以消费了 } } }).start(); // 消费者线程 new Thread(()->{ for (int i = 0; i < 5; i++) { synchronized (products) { while(products.size() == 0) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = products.remove(0); System.out.println(Thread.currentThread().getName() +"消费了产品:"+p); products.notifyAll(); // 告诉生产者继续生产 } } }).start(); new Thread(()->{ for (int i = 0; i < 5; i++) { synchronized (products) { while(products.size() == 0) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = products.remove(0); System.out.println(Thread.currentThread().getName() +"消费了产品:"+p); products.notifyAll(); // 告诉生产者继续生产 } } }).start(); }}// 产品类class Product { private int i; public Product(int i) { this.i = i; } @Override public String toString() { return "Product{" + "i=" + i + '}'; }}
2)使用阻塞队列
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class TestProduct2 { private static BlockingQueuequeue = new ArrayBlockingQueue<>(5); public static void main(String[] args) { // 生产者线程 new Thread(()->{ for (int i = 0; i < 10; i++) { Product p = new Product(i); System.out.println(Thread.currentThread().getName()+"生产了:"+p); try { queue.put(p); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者线程 for (int j = 0; j < 5; j++) { new Thread(()->{ for (int i = 0; i < 2; i++) { try { Product p = queue.take(); System.out.println(Thread.currentThread().getName()+"消费了:"+p); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }}
9).线程局部变量
import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;public class TestThreadLocal { // 线程局部变量 private static ThreadLocallocal = new ThreadLocal() { @Override // 初始值 protected Object initialValue() { return new SimpleDateFormat("yyyy-MM-dd"); // 存入当前线程 } }; public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ try { SimpleDateFormat sdf = local.get(); // 获取本线程自己的局部变量 Date date = sdf.parse("1951-10-09"); // 每个线程使用的是自己的SimpleDateFormat因此没有争用 System.out.println(Thread.currentThread().getName() + " " + date); } catch (ParseException e) { e.printStackTrace(); } }).start(); } /*for (int i = 0; i < 10; i++) { new Thread(()->{ try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date date = sdf.parse("1951-10-09"); System.out.println(Thread.currentThread().getName() + " " + date); } catch (ParseException e) { e.printStackTrace(); } }).start(); }*/ }}
10).
juc 中的大部分类是通过无锁并发实现的(没有用synchronized)
CAS 机制 compare And swap 比较并交换
synchronized 可以称之为悲观锁 cas 体现的是乐观锁 首先不会给共享资源加锁,而是做一个尝试 先拿到旧值,查看旧值是否跟共享区域的值相等 如果不等,那么说明别的线程改动了共享区域的值,我的修改失败 如果相等,那么就让我的修改成功 如果修改失败,没关系,重新尝试int var5; // 修改失败,没关系,重新尝试 自旋 do { // 获取共享区域的最新值 var5 = this.getIntVolatile(var1, var2); // 10 // 比较并交换 最新值 最新值+1 } while(! this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5;
11).重入锁 ReentrantLock
.lock() 加锁
.unlock() 解锁synchronized 性能上比较 ReentrantLock 在高并发下低,ReentrantLock的内存占用会高一些
package thread;import java.util.concurrent.locks.ReentrantLock;public class TestReentrantLock { static int i = 0; public static void main(String[] args) throws InterruptedException { ReentrantLock rl = new ReentrantLock(); Thread t1 = new Thread(() -> { for (int j = 0; j < 5000; j++) { try { rl.lock(); // 加锁 i++; } finally { rl.unlock(); // 保证解锁一定被执行 } } }); Thread t2 = new Thread(() -> { for (int j = 0; j < 5000; j++) { try { rl.lock(); // 加锁 i--; } finally { rl.unlock(); // 保证解锁一定被执行 } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); }}
12).CountDownLatch
countdown 倒计时:当希望多个线程执行完毕后,再接着做下一步操作时,
package thread;import java.util.Date;import java.util.concurrent.CountDownLatch;public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { CountDownLatch cdl = new CountDownLatch(3);// 构造方法需要指定倒计时的数字 new Thread(()->{ System.out.println("线程1开始运行"+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1准备完成"+new Date()); cdl.countDown(); // 倒计时减1 }).start(); new Thread(()->{ System.out.println("线程2开始运行"+new Date()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程2准备完成"+new Date()); cdl.countDown(); // 倒计时减1 }).start(); new Thread(()->{ System.out.println("线程3开始运行"+new Date()); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程3准备完成"+new Date()); cdl.countDown(); // 倒计时减1 }).start(); // 主线程等待,直到倒计时为0 System.out.println("主线程等待"); cdl.await(); System.out.println("ready go...."); }}
13).循环栅栏
CyclicBarrier 可循环的 屏障(栅栏)
当满足CyclicBarrier设置的线程个数时,继续执行,没有满足则等待
与倒计时锁的区别:倒计时锁只能使用一次,倒计时结束这个对象就没用了。而循环栅栏可以重复利用。
package thread;import java.util.Date;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class TestCyclicBarrier { public static void main(String[] args) { // CyclicBarrier 可循环的 屏障(栅栏) // 当满足CyclicBarrier设置的线程个数时,继续执行,没有满足则等待 CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行 new Thread(()->{ System.out.println("线程1开始.."+new Date()); try { cb.await(); // 当个数不足时,等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..."+new Date()); }).start(); new Thread(()->{ System.out.println("线程2开始.."+new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } try { cb.await(); // 2 秒后,线程个数够2,继续运行 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..."+new Date()); }).start(); }}
14).信号量
Semaphore s = new Semaphore(3); // 限制了能同时运行的线程上限
package thread;import java.util.concurrent.Semaphore;public class TestSemaphore { public static void main(String[] args) { Semaphore s = new Semaphore(3); // 限制了能同时运行的线程上限 for (int i = 0; i < 10; i++) { new Thread(() -> { try { s.acquire(); // 获得此信号量 System.out.println("我是线程" + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); // 释放信号量 } }).start(); } }}
转载地址:http://gxarn.baihongyu.com/