您当前的位置:首页 > 计算机 > 编程开发 > Java

进阶 - 线程池核心技术ThreadPoolExecutor(源码分析)

时间:05-13来源:作者:点击数:

前言

线程是很占用系统资源的,我们用线程池来统一管理线程就能够很好的解决这种资源管理问题。比如因为不需要创建、销毁线程,每次需要用的时候我就去拿,用完了之后再放回去,所以节省了很多资源开销,可以提高系统的运行速度。

一、为什么使用线程池?

假设我们有10万个任务需要执行,如果使用多线程形式,为了提高效率,很多人会想到创建10万个线程。但创建过多个线程是有弊端的,例如,线程过多会带来调度的开销,或者说额外的线程切换开销。

使用线程池我们能很好地解决这些弊端,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

二、线程池运行原理

(一)线程的复用

线程池核心思想就是线程复用,它是什么意思呢?字面理解,线程再一次使用。也就是说一个线程不断地获取任务去执行,执行不同的任务,这样的方式就可以实现线程复用。

(二)核心线程、非核心线程、等待队列

核心线程(corePool):线程池维护着多个“活”线程,它能通过while(true)不断检查是否有任务需要执行,从而避免多次调用、销毁产生的时间代价太大问题,这种“活”线程就被称为核心线程。每当有新任务提交时,首先检查核心线程数是否已满,没满就创建核心线程来执行任务,否则,放进等待队列中(假设等待队列此时还没满)

等待队列 (workQueue):等待队列用于存储当核心线程都在忙时,继续新增的任务,核心线程在执行完当前任务后,也会去等待队列拉取任务继续执行,这个队列一般是一个线程安全的阻塞队列,它的容量也可以由开发者根据业务来定制。

非核心线程:当等待队列满了,并且当前线程数没有超过最大线程数(maximumPoolSize),则会新建线程执行任务,这种情况下新建的线程我们通常称为非核心线程。如下图所示:

在这里插入图片描述

但是!!!

“核心线程”和“非核心线程”本质上并没有什么区别,创建出来的线程也根本没有标识去区分它们是核心还是非核心的,它们只是一个虚拟的概念。线程池只会去判断已有的线程数(包括核心和非核心)去跟核心线程数和最大线程数比较,来决定下一步的策略。

所以有个容易混淆的地方就是,当线程池中的线程超过了和核心线程数时,会将多出来的线程销毁,很多人误以为只会把“非核心线程”销毁掉,现在我们知道了,这两者之间并没有本质上的区别,所以,销毁的时候是随机的,可能是“核心线程”也可能是“非核心线程”。

一般什么情况下会产生“非核心线程”呢?自然是任务提交量非常大,仅有的核心线程数和等待队列长度不满足任务处理,就会创建非核心线程去帮助处理任务。有人把非核心线程和核心线程看做是外包公司的非核心员工与核心员工,我觉得也是十分贴切的。

二、创建线程池的方式

在线程池中存在着这样的关系模型

在这里插入图片描述

创建线程池的方法一般分为两种

第一种:通过Executors创建线程池(尽量不使用)

Executors 可以创建以下几种常用的线程池:

 Executors.newCachedThreadPool();   //可缓存的线程池
 Executors.newFixedThreadPool(10);  //定长线程池(10)
 Executors.newScheduledThreadPool(10);  //周期性线程池(10)  
 Executors.newSingleThreadExecutor();  //单一线程池
  • newCachedThreadPool():创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,如果没有可用的现有线程,将创建一个新线程并将其添加到池中。六十秒内未使用的线程将被终止并从缓存中删除。
  • newFixedThreadPool():创建一个定长线程池,可控制线程最大并发数,超出的线程会在等待队列中等待。
  • newScheduledThreadPool():创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
  • newSingleThreadExecutor ():创建一个单线程的线程池,只使用唯一的线程来执行任务,可以保证任务按照提交顺序来完成。

第二种:通过ThreadPoolExecutor创建线程池

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();

通过ThreadPoolExecutor的构造方法来了解它的七大参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize核心线程数量,核心线程会一直保留,不会被销毁。
  • maximumPoolSize最大线程数,当核心线程不能满足任务需要时,系统就会创建新的线程来执行任务。
  • keepAliveTime存活时间,核心线程之外的线程空闲多长时间就会被销毁。
  • timeUnit代表线程存活的时间单位。
  • BlockingQueue阻塞队列
  • threadFactory创建线程的工厂类
  • rejectedExecutionHandler拒绝策略,当所有线程(最大线程数)都在忙,并且任务队列处于满任务的状态,则会执行拒绝策略。

此外还需要了解一下线程池四种拒绝策略:

1、AbortPolicy:直接抛出异常,默认策略;

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

拓展1:为什么不推荐使用Executors创建线程池?

在阿里的规范文档中这样写道:

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,

这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:

  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

2)newCachedThreadPool和newScheduledThreadPool:

  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

(1)newFixedThreadPool和newSingleThreadExecutor产生的问题:

//定长线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//单一线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPoolnewSingleThreadExecutor中采用的是LinkedBlockingQueue阻塞队列存储任务,它的任务队列长度是的Integer.MAX_VALUE,也就意味着并发量很大的时候,任务可能会不断堆积到阻塞队列里,从而占用很大的内存导致OOM

public LinkedBlockingQueue() {  //LinkedBlockingQueue无限长
    this(Integer.MAX_VALUE);
}

@Native public static final int   MAX_VALUE = 0x7fffffff; //long int最大值

(2)newCachedThreadPool和newScheduledThreadPool产生的问题:

//可缓存的线程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
//周期性线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

newCachedThreadPoolScheduledThreadPoolExecutor它们的问题是允许创建的最大线程数为Integer.MAX_VALUE,也就是无限大,这样会导致如果高并发下任务数很多的时候,就会创建很多个线程处理任务,可能导致OOM。

三、ThreadPoolExecutor源码分析

通过ThreadPoolExecutor中提交任务的流程来分析源码

线程池的执行流程

在这里插入图片描述

线程池的五种状态

在了解源码前,要先知道线程池这五种状态

  • RUNNING: 运行状态,能够接受新的任务且会处理阻塞队列中的任务。
  • SHUTDOWN:关闭状态,不接受新任务,但是会处理阻塞队列中的任务,执行线程池的 shutDown()对应的就是此状态。
  • STOP:停止状态,不接受新的任务,也不会处理等待队列中的任务并且会中断正在执行的任务。调用线程池的shutDownNow()对应的是此状态
  • TIDYING: 整理,即所有的任务都停止了,线程池中线程数量等于0,会调用 terminated()如果你自己实现线程池的话。
  • TERMINATED:结束状态,terminated()方法执行完了。

五种状态之间的转换

在这里插入图片描述

阅读源码的时候就会发现,在提交任务的时候,大部分判断都会实时地根据当前线程池状态决定下一步步骤。

线程池的基本属性

//高3位表示线程池状态,低29位表示线程池中的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 五种状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取线程池中线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

AtomicInteger是一个原子操作类,保证线程安全,采用低29位表示线程的最大数量高3位表示5种线程池状态,维护两个参数,workCount和runState。workCount表示有效的线程数量,runState表示线程池的运行状态

execute()

提交任务采用的一般是ThreadPoolExecutor中的execute()方法,让我们来先看看它的源码

    public void execute(Runnable command) {
        if (command == null)  //没有传进线程(任务),抛出异常
            throw new NullPointerException();

        int c = ctl.get(); //拿到ctl的值
        if (workerCountOf(c) < corePoolSize) { //当前线程数小于核心线程数
            if (addWorker(command, true)) //调用addWorker添加核心线程执行command任务
                return;
            c = ctl.get(); //添加任务失败,重新得到ctl状态
        }
        //判断线程池是否是运行状态,且任务队列未满则把任务放进等待队列中
        if (isRunning(c) && workQueue.offer(command)) {
        	//重新检查ctl
            int recheck = ctl.get();
            //如果此时不是运行时状态,就把刚刚添加的任务移除
            if (! isRunning(recheck) && remove(command))
                reject(command);  //执行拒绝策略
             //如果当前线程数为0,就添加非核心线程工作
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //队列已满,创建非核心线程,失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

拓展2:execute和submit的区别是什么?

execute和submit都属于线程池的方法,execute只能提交Runnable类型的任务,而submit既能提交Runnable类型任务也能提交Callable类型任务。

execute会直接抛出任务执行时的异常,submit会吃掉异常,可通过Future的get方法将任务执行时的异常重新抛出。

execute所属顶层接口是Executor,submit所属顶层接口是ExecutorService,实现类ThreadPoolExecutor重写了execute方法,抽象类AbstractExecutorService重写了submit方法。

addWorker()

创建线程的核心是调用了addWorker()方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: //相当于goto,虽然不建议滥用,但这里使用又觉得没一点问题
    for (;;) { //死循环
        int c = ctl.get();  //拿到ctl值
        int rs = runStateOf(c);  //拿到线程池的状态
		
        /*
       *  如果线程池的状态到了SHUTDOWN或者之上的状态时候(非RUNNING),只有一种情况还需要继续添加线程,
       *  那就是线程池已经SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(所以firstTask必须为null)
       *  这里还继续添加线程的初衷是,加快执行等待队列中的任务,尽快让线程池关闭
       * */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
		//来到这里说明线程池状态为RUNNING
        for (;;) {
            int wc = workerCountOf(c); //获取当前线程的个数
            if (wc >= CAPACITY ||  //线程数大于容量
              //或如果core为true,就跟核心线程数进行比较,否则跟最大线程数比较
              //所以如果想添加核心线程数的时候,core参数就会设置为true
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              //通过CAS自旋,增加线程数+1,增加成功跳出双层循环,继续往下执行
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // 得到ctl值
            //如果线程状态发生改变,则继续回到retry,重新开始循环
            if (runStateOf(c) != rs) 
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
 	//走到这里,说明我们已经成功的将线程数+1了,但是真正的线程还没有被添加
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     //新建一个worker,worker就是线程池中执行任务的,本质上也是一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        	//加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
				//检查线程的状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                  //workers是一个HashSet,添加我们新增的Worker
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();  //启动worker
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

execute方法虽然没有加锁,但是在addWorker方法内部,加锁了,这样可以保证不会创建超过我们预期的线程数,大师在设计的时候,做到了在最小的范围内加锁,尽量减少锁竞争,可以看到,core参数,只是用来判断当前线程数是否超量的时候跟corePoolSize还是maxPoolSize比较,Worker本身无核心或者非核心的概念。

Worker()

再来看看Worker方法

 Worker(Runnable firstTask) {
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this);

 public void run() {
    runWorker(this);
   }
}

我们可以看到,在Worker构造方法中,实际上是调用了getThreadFactory().newThread工厂方法来新建线程。

runWorker()

实际上run调用的是runWorker()方法,来看看runWorker方法的实现

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;  //得到需要执行的任务
    w.firstTask = null;  //置空
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    /*
    * 我们可能从一个初始任务开始,在这种情况下我们不需要获取第一个任务。否则,只要 pool 正在运行,我们就从 getTask 获取任务。
    * */
    try {
    	//如果任务不为空或着通过getTask从队列中去任务也不为空
        while (task != null || (task = getTask()) != null) {
            w.lock();  //加锁,防止线程被中断,中断则抛出异常
            //判断线程是否需要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();  //开始执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()

现在我们知道,除了初始第一个任务外,线程池中的线程会通过getTask()方法不断从队列中拿去任务,以此实现线程复用,具体请看getTask()方法实现

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

       //检查线程池的状态,如果已经是STOP及以上的状态,或者已经SHUTDOWN,队列也是空的时候,直接return null,并将Worker数量-1
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 注意这里的allowCoreThreadTimeOut参数,字面意思是否允许核心线程超时,即如果我们设置为false,那么只有当线程数wc大于corePoolSize的时候才会超时
       //更直接的意思就是,如果设置allowCoreThreadTimeOut为false,那么线程池在达到corePoolSize个工作线程之前,不会让闲置的工作线程退出
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
		//从队列中取任务,根据timed选择是有时间期限的等待还是无时间期限的等待
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

现在再来回顾一下几个重点知识

1、线程池的线程是如何做到复用的。

线程池中的线程在循环中尝试取任务执行,这一步会被阻塞,如果设置了allowCoreThreadTimeOut为true,则线程池中的所有线程都会在keepAliveTime时间超时后还未取到任务而退出。或者线程池已经STOP,那么所有线程都会被中断,然后退出。

2、线程池是如何做到高效并发的。

看整个线程池的工作流程,有以下几个需要特别关注的并发点.

①: 线程池状态和工作线程数量的变更。这个由一个AtomicInteger变量 ctl来解决原子性问题。

②: 向工作Worker容器workers中添加新的Worker的时候。这个线程池本身已经加锁了。

③: 工作线程Worker从等待队列中取任务的时候。这个由工作队列本身来保证线程安全,比如LinkedBlockingQueue等。

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门