博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池
阅读量:3920 次
发布时间:2019-05-23

本文共 18015 字,大约阅读时间需要 60 分钟。

线程池:

           创建有限的线程资源为更多的任务提供服务。享元模式

1).创建一个固定大小的线程池:

                  ExecutorService threadPool = Executors.newFixedThreadPool(2);

package ogr.westos.Demo;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Demo10 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService threadPool = Executors.newFixedThreadPool(2);    for (int i = 0; i < 10; i++) {        threadPool.submit(()->{             System.out.println(Thread.currentThread().getName()+"任务执行");        });    }// 执行带有返回结果的任务        Future future = threadPool.submit(() -> {            System.out.println(Thread.currentThread().getName()+"执行计算...");            Thread.sleep(1000);            return 10;        });        System.out.println(future.get());        threadPool.shutdown(); // 不接收新任务,当所有任务运行结束,整个线程池关闭    }}

2)一个核心的ExecutorService的实现类:ThreadPoolExecutor

           ThreadPoolExecutor(int corePoolSize,

                                             int maximumPoolSize,
                                             long keepAliveTime,
                                             TimeUnit unit,
                                             BlockingQueue<Runnable> workQueue)

corePoolSize 核心线程数目 (最多保留的线程数)

maximumPoolSize(最大线程数)

keepAliveTime 生存时间- 针对救急线程

unit 时间单位 秒

workQueue 阻塞队列 如果任务超过了核心线程数,进入队列进行排队,直到有空闲的线程

如果任务过多,阻塞队列都放不下了,还会创建新的线程来救急

3).创建固定大小的线程池

             Executors.newFixedThreadPool(2);
             核心线程数=最大线程数(没有救急线程被创建)
             阻塞队列  无界,可以放任意数量的任务,
            适合执行数量有限,长时间运行的任务

4).创建缓冲线程池

            Executors.newCachedThreadPool()
            核心线程数是0, 最大线程数是Integer的最大值(救急线程可以无限创建)
            生存时间是60s
            适合任务数比较密集,但每个任务执行时间较短的情况

5)创建单线程线程池

            Executors.newSingleThreadExecutor()
            使用场景:希望多个任务排队执行

区别:

              Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
              Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

package day19;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;public class TestSingleThreadPool {    public static void main(String[] args) {        // 创建单线程线程池        /*ThreadPoolExecutor service2 = (ThreadPoolExecutor)                Executors.newFixedThreadPool(1);        service2.setCorePoolSize(2);*/        ExecutorService service = Executors.newSingleThreadExecutor();        for (int i = 0; i < 3; i++) {            int x = i;            service.submit(() -> {                System.out.println("任务.." + x);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }    }}

5).带有日程安排功能的线程池

ScheduledExecutorService service = Executors.newScheduledThreadPool(5);

          (1) 让任务推迟一段时间执行

                  参数1.任务对象, 参数2,3 推迟的时间
                        service.schedule(()->{
                                System.out.println("执行任务...");
                        }, 10L, TimeUnit.SECONDS);*/

          (2) 以一定的频率反复执行任务(任务不会重叠)

                  参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位

                       service.scheduleAtFixedRate(()->{
                       try {
                            Thread.sleep(1200);
                            } catch (InterruptedException e) {
                                  e.printStackTrace();
                            }
                            System.out.println("hello");
                            }, 0, 1, TimeUnit.SECONDS);*/

           (3).delay表示从上一个任务结束,到下一个任务开始之间的时间

                      service.scheduleWithFixedDelay(()->{
                               System.out.println("hello");
                      }, 0, 1, TimeUnit.SECONDS);

package day19;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class TestScheduleThreadPool {    public static void main(String[] args) {        // 带有日程安排功能的线程池        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);        // 让任务推迟一段时间执行        // 参数1.任务对象, 参数2,3 推迟的时间        /*service.schedule(()->{            System.out.println("执行任务...");        }, 10L, TimeUnit.SECONDS);*/        // 以一定的频率反复执行任务(任务不会重叠)        // 参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位        /*service.scheduleAtFixedRate(()->{            try {                Thread.sleep(1200);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("hello");        }, 0, 1, TimeUnit.SECONDS);*/        // delay表示从上一个任务结束,到下一个任务开始之间的时间        service.scheduleWithFixedDelay(()->{            System.out.println("hello");        }, 0, 1, TimeUnit.SECONDS);//        service.shutdown();    }}package day19;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class TestScheduleThreadPool {    public static void main(String[] args) {        // 带有日程安排功能的线程池        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);        // 让任务推迟一段时间执行        // 参数1.任务对象, 参数2,3 推迟的时间        /*service.schedule(()->{            System.out.println("执行任务...");        }, 10L, TimeUnit.SECONDS);*/        // 以一定的频率反复执行任务(任务不会重叠)        // 参数1,任务对象, 参数2,初始推迟时间, 参数3,4 时间间隔和单位        /*service.scheduleAtFixedRate(()->{            try {                Thread.sleep(1200);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("hello");        }, 0, 1, TimeUnit.SECONDS);*/        // delay表示从上一个任务结束,到下一个任务开始之间的时间        service.scheduleWithFixedDelay(()->{            System.out.println("hello");        }, 0, 1, TimeUnit.SECONDS);//        service.shutdown();    }}

6).原子操作类

             AtomicInteger

             AtomicBoolean

package day19;import java.util.concurrent.atomic.AtomicInteger;public class TestAtomicInteger {    // 创建原子整数类    private static AtomicInteger i = new AtomicInteger(0);    public static void main(String[] args) throws InterruptedException {        Thread t1 = new Thread(() -> {            for (int j = 0; j < 5000; j++) {                i.getAndIncrement();  // 获取并且自增  i++//                i.incrementAndGet();  // 自增并且获取  ++i            }        });        Thread t2 = new Thread(() -> {            for (int j = 0; j < 5000; j++) {                i.getAndDecrement(); // 获取并且自减  i--            }        });        t1.start();        t2.start();        t1.join();        t2.join();        System.out.println(i);    }}

7).线程安全集合类

           StringBuffer 线程安全

           String       不可变类 , 都是线程安全的
           Random   线程安全
           Vector      实现了List,并且线程安全
           Hashtable 实现了Map,并且线程安全

           5.0新增的线程安全集合类

           ConcurrentHashMap 实现了Map,并且线程安全
           ConcurrentSkipListMap 实现了Map(可排序),并且线程安全
           CopyOnWriteArrayList 实现了List,并且线程安全

8).阻塞队列

         BlockingQueue  阻塞队列

         队列 FIFO , first in first out

         Queue -->  LinkedList

import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class TestQueue {    public static void main(String[] args) throws InterruptedException {        // 创建队列        Queue
queue = new LinkedList<>(); queue.offer("1"); queue.offer("2"); queue.offer("3"); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); // 线程安全的队列, 其中capacity 表示队列的元素上限 BlockingQueue
queue1 = new ArrayBlockingQueue<>(3); // 或 new LinkedBlockingDeque<>(3); queue1.put("a"); queue1.put("b"); queue1.put("c"); System.out.println(queue1); /* queue1.put("d");// 如果放入多于上限的元素时,put方法会被阻塞*/ System.out.println(queue1.take()); System.out.println(queue1.take()); System.out.println(queue1.take()); System.out.println(queue1); System.out.println(queue1.take()); /*如果队列中没有元素了,take方法会被阻塞*/ }}

阻塞队列消费者生产者例子:

   1) 没有使用阻塞队列

package day19;import java.util.ArrayList;import java.util.List;import java.util.Random;public class TestProduct {    // 解耦生产和消费    private static List
products = new ArrayList<>(); private static Random r = new Random(); public static void main(String[] args) { // 生产者线程 new Thread(()->{ for (int i = 0; i < 10; i++) { synchronized (products) { while (products.size() == 5) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = new Product(i); System.out.println(Thread.currentThread().getName() + "生产了商品:" + p); products.add(p); products.notifyAll();// 唤醒消费者线程:可以消费了 } } }).start(); // 消费者线程 new Thread(()->{ for (int i = 0; i < 5; i++) { synchronized (products) { while(products.size() == 0) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = products.remove(0); System.out.println(Thread.currentThread().getName() +"消费了产品:"+p); products.notifyAll(); // 告诉生产者继续生产 } } }).start(); new Thread(()->{ for (int i = 0; i < 5; i++) { synchronized (products) { while(products.size() == 0) { try { products.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(r.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } Product p = products.remove(0); System.out.println(Thread.currentThread().getName() +"消费了产品:"+p); products.notifyAll(); // 告诉生产者继续生产 } } }).start(); }}// 产品类class Product { private int i; public Product(int i) { this.i = i; } @Override public String toString() { return "Product{" + "i=" + i + '}'; }}

      2)使用阻塞队列

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class TestProduct2 {    private static BlockingQueue
queue = new ArrayBlockingQueue<>(5); public static void main(String[] args) { // 生产者线程 new Thread(()->{ for (int i = 0; i < 10; i++) { Product p = new Product(i); System.out.println(Thread.currentThread().getName()+"生产了:"+p); try { queue.put(p); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者线程 for (int j = 0; j < 5; j++) { new Thread(()->{ for (int i = 0; i < 2; i++) { try { Product p = queue.take(); System.out.println(Thread.currentThread().getName()+"消费了:"+p); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }}

 9).线程局部变量

import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;public class TestThreadLocal {    //             线程局部变量    private static ThreadLocal
local = new ThreadLocal() { @Override // 初始值 protected Object initialValue() { return new SimpleDateFormat("yyyy-MM-dd"); // 存入当前线程 } }; public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ try { SimpleDateFormat sdf = local.get(); // 获取本线程自己的局部变量 Date date = sdf.parse("1951-10-09"); // 每个线程使用的是自己的SimpleDateFormat因此没有争用 System.out.println(Thread.currentThread().getName() + " " + date); } catch (ParseException e) { e.printStackTrace(); } }).start(); } /*for (int i = 0; i < 10; i++) { new Thread(()->{ try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date date = sdf.parse("1951-10-09"); System.out.println(Thread.currentThread().getName() + " " + date); } catch (ParseException e) { e.printStackTrace(); } }).start(); }*/ }}

10).

           juc 中的大部分类是通过无锁并发实现的(没有用synchronized)

           CAS 机制  compare And swap 比较并交换

           synchronized 可以称之为悲观锁
           cas 体现的是乐观锁
           首先不会给共享资源加锁,而是做一个尝试
           先拿到旧值,查看旧值是否跟共享区域的值相等
           如果不等,那么说明别的线程改动了共享区域的值,我的修改失败
           如果相等,那么就让我的修改成功
           如果修改失败,没关系,重新尝试  

int var5;       // 修改失败,没关系,重新尝试 自旋        do {           // 获取共享区域的最新值            var5 = this.getIntVolatile(var1, var2); // 10                    // 比较并交换                                      最新值   最新值+1        } while(! this.compareAndSwapInt(var1, var2, var5, var5 + var4));        return var5;

11).重入锁  ReentrantLock

       .lock() 加锁

       .unlock() 解锁

        synchronized 性能上比较 ReentrantLock 在高并发下低,ReentrantLock的内存占用会高一些

package thread;import java.util.concurrent.locks.ReentrantLock;public class TestReentrantLock {    static int i = 0;    public static void main(String[] args) throws InterruptedException {        ReentrantLock rl = new ReentrantLock();        Thread t1 = new Thread(() -> {            for (int j = 0; j < 5000; j++) {                try {                    rl.lock(); // 加锁                    i++;                } finally {                    rl.unlock(); // 保证解锁一定被执行                }            }        });        Thread t2 = new Thread(() -> {            for (int j = 0; j < 5000; j++) {                try {                    rl.lock(); // 加锁                    i--;                } finally {                    rl.unlock(); // 保证解锁一定被执行                }            }        });        t1.start();        t2.start();        t1.join();        t2.join();        System.out.println(i);    }}

12).CountDownLatch

       countdown 倒计时:当希望多个线程执行完毕后,再接着做下一步操作时,

package thread;import java.util.Date;import java.util.concurrent.CountDownLatch;public class TestCountDownLatch {    public static void main(String[] args) throws InterruptedException {        CountDownLatch cdl = new CountDownLatch(3);// 构造方法需要指定倒计时的数字        new Thread(()->{            System.out.println("线程1开始运行"+new Date());            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("线程1准备完成"+new Date());            cdl.countDown(); // 倒计时减1        }).start();        new Thread(()->{            System.out.println("线程2开始运行"+new Date());            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("线程2准备完成"+new Date());            cdl.countDown(); // 倒计时减1        }).start();        new Thread(()->{            System.out.println("线程3开始运行"+new Date());            try {                Thread.sleep(1500);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("线程3准备完成"+new Date());            cdl.countDown(); // 倒计时减1        }).start();        // 主线程等待,直到倒计时为0        System.out.println("主线程等待");        cdl.await();        System.out.println("ready go....");    }}

13).循环栅栏

          CyclicBarrier   可循环的 屏障(栅栏)

          当满足CyclicBarrier设置的线程个数时,继续执行,没有满足则等待

          与倒计时锁的区别:倒计时锁只能使用一次,倒计时结束这个对象就没用了。而循环栅栏可以重复利用。

package thread;import java.util.Date;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class TestCyclicBarrier {    public static void main(String[] args) {        // CyclicBarrier   可循环的 屏障(栅栏)        // 当满足CyclicBarrier设置的线程个数时,继续执行,没有满足则等待        CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行        new Thread(()->{            System.out.println("线程1开始.."+new Date());            try {                cb.await(); // 当个数不足时,等待            } catch (InterruptedException e) {                e.printStackTrace();            } catch (BrokenBarrierException e) {                e.printStackTrace();            }            System.out.println("线程1继续向下运行..."+new Date());        }).start();        new Thread(()->{            System.out.println("线程2开始.."+new Date());            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            try {                cb.await(); // 2 秒后,线程个数够2,继续运行            } catch (InterruptedException e) {                e.printStackTrace();            } catch (BrokenBarrierException e) {                e.printStackTrace();            }            System.out.println("线程2继续向下运行..."+new Date());        }).start();    }}

14).信号量

       Semaphore s = new Semaphore(3); // 限制了能同时运行的线程上限

package thread;import java.util.concurrent.Semaphore;public class TestSemaphore {    public static void main(String[] args) {        Semaphore s = new Semaphore(3); // 限制了能同时运行的线程上限        for (int i = 0; i < 10; i++) {            new Thread(() -> {                try {                    s.acquire(); // 获得此信号量                    System.out.println("我是线程" + Thread.currentThread().getName());                    try {                        Thread.sleep(2000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    s.release(); // 释放信号量                }            }).start();        }    }}

 

转载地址:http://gxarn.baihongyu.com/

你可能感兴趣的文章
Linux用户权限
查看>>
Linux文档目录
查看>>
三层交换机 VS 二层交换机
查看>>
GUN,Linux,GUN/Linux
查看>>
Centos设定PATH
查看>>
Linux操作系统4.5.6.7代差别
查看>>
文件系统EXT3,EXT4和XFS的区别
查看>>
Centos7 udev
查看>>
Nmcli 网络管理命令行工具
查看>>
Linux IP地址配置
查看>>
firewalld和iptables
查看>>
SELinux
查看>>
nmcli双网卡绑定
查看>>
nmcli 网卡链路绑定team
查看>>
Linux下profile和bashrc四种的区别
查看>>
Linux文件查看指令整理
查看>>
Linux的三个时间参数
查看>>
Linux 用户ID和组ID
查看>>
Linux /etc/passwd文件
查看>>
Linux 档案搜寻指令整理
查看>>