0%

线程池分析

Java线程池技术

线程池概述

  • 池化技术应该是最常用的提高程序性能的手段,包括线程池与数据库连接池,常量池等等
  • 创建与销毁线程是比较耗费时间的,不利于处理Java程序的高并发,因此引入线程池,也就是维护一组可用的线程,如果有任务,就立即将线程池的空闲线程分配给任务,提升性能,如果线程池内所有的线程都是忙状态的话,可以将任务放到任务队列,或者创建一个新的线程并放入线程池,用于处理新的任务
  • 使用线程池的好处

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

    • 在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

      为什么呢?

      使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题

    • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。

    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控线程过少,无法充分利用硬件资源,线程过多,又会导致过度切换,过多的内存消耗,降低系统稳定性

      • 线程池监控的方法

        • SpringBoot 中的 Actuator 组件

        • 通过ThreadPoolExecutor的自有接口获取线程池信息

          image-20210415154301230

线程池使用的注意事项

  • 使用ThreadPoolExecutor构造函数构建线程池而不是Executors工厂类,下文有具体的解释

  • 显式的定义线程池名字,以业务名字作区分,便于定位问题

    • 可以使用自定义的ThreadFactory

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      import java.util.concurrent.Executors;
      import java.util.concurrent.ThreadFactory;
      import java.util.concurrent.atomic.AtomicInteger;
      /**
      * 线程工厂,它设置线程名称,有利于我们定位问题。
      */
      public final class NamingThreadFactory implements ThreadFactory {

      private final AtomicInteger threadNum = new AtomicInteger();
      private final ThreadFactory delegate;
      private final String name;

      /**
      * 创建一个带名字的线程池生产工厂
      */
      public NamingThreadFactory(ThreadFactory delegate, String name) {
      this.delegate = delegate;
      this.name = name; // TODO consider uniquifying this
      }

      @Override
      public Thread newThread(Runnable r) {
      Thread t = delegate.newThread(r);
      t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
      return t;
      }

      }
    • 使用guava的ThreadFactoryBuilder

      1
      2
      3
      4
      ThreadFactory threadFactory = new ThreadFactoryBuilder()
      .setNameFormat(threadNamePrefix + "-%d")
      .setDaemon(true).build();
      ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
  • 不同的业务使用不同的线程池

    • 一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务
    • 有依赖关系的任务在使用同一个线程池在稍高的并发状况下可能会出现一种逻辑上的死锁,大概来说就是父任务A中调用了子任务B,父任务与子任务共用一个线程池,当父任务占据了全部的核心线程资源,并且子任务仍未执行时,无法退出对核心线程的占用,而与此同时子任务只能堆积在任务队列中,无法获得线程资源,如果又使用了无界队列的话,则会一直堆积直到OOM,具体的参考线程池运用不当的一次线上事故

线程池类的继承、实现关系–Executor框架

Executor框架

  • Executor 框架是 Java5 之后引进的,Executor框架目的是将任务提交和任务如何运行分离开来的机制。用户不再需要从代码层考虑设计任务的提交运行,只需要调用Executor框架实现类的execute方法就可以提交任务执行,然后就不用管了(如果需要返回值则调用submit方法做后续处理)

    • 可以联想到构建线程实例的两个方法,推荐使用传入Runnable参数而不是复写Thread类,其目的就是为了能够将任务与线程解耦
    • 应该说在线程池中,任务的提交与任务执行的解耦是通过任务阻塞队列完成的,任务阻塞队列成为了实质上的消息队列
  • Executor框架有助于避免this逃逸问题

    补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用。调用尚未构造完全的对象的方法可能引发令人疑惑的错误,如果用volatile或者final修饰的话应该就能解决这个问题了,不知道Executor框架的出现是如何有助于解决此问题的呢?—不是很清楚

    实际上可查看ThreadPoolExecutor类的Worker类,发现thread实例确实使用final修饰(实现安全发布),不知道说的是不是这里

  • 除了说Executor框架,还有一种说法就是JUC框架,也就是java.util.concurrent这个包下的所有的多线程相关类的总称,下面是Executor框架的继承关系图

    • 对Executor框架中每个类的大致描述(继承关系描述)
      • Executor:一个接口,其定义了一个接收Runnable对象的方法execute,该方法接收一个Runable实例。
      • ExecutorService:Executor的子类接口,其定义了可以接收Callable对象的方法submit,返回 Future 对象,同时提供execute方法。
      • ScheduledExecutorService:ExecutorService的子类接口,支持定期执行任务,定义了schedulescheduleAtFixedRateschduleWithFixedDelay方法。
      • AbstractExecutorService:抽象类,提供 ExecutorService 中定义的方法的默认实现,比如对submit方法做了实现。
      • ThreadPoolExecutor:继承AbstractExecutorService,用于创建线程池。
      • ForkJoinPool: 继承AbstractExecutorService,Fork 将大任务分叉为多个小任务,然后将小任务分配给线程池去执行,Join 是获得小任务的结果,类似于map reduce、并行计算。
      • ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService,用于创建带定时任务的线程池。
      • Executors:实现ExecutorService接口的静态工厂类,提供了一系列工厂方法用于创建线程池。
        • Executors内部有实现ExecutorService的内部类,用来创建一些特殊的线程池,其实就是创建一些包装类用来为线程池子提供一些特定的特征

Executor的框架结构

任务的提交

向普通线程池提交任务有两种方法:

  1. execute方法

    1. 只接受Runnable的任务,也不会对Runnable类型的任务做包装,直接提交给线程执行或者放进任务阻塞队列,不提供返回值,源码分析见下文
  2. submit方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

    /**
    * @throws RejectedExecutionException {@inheritDoc}
    * @throws NullPointerException {@inheritDoc}
    */
    public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
    }

    /**
    * @throws RejectedExecutionException {@inheritDoc}
    * @throws NullPointerException {@inheritDoc}
    */
    public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

    // Runnable与Callable的任务统一包装为FutureTask类型
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }


    // FutureTask对于Runnable的构造
    public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

    public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
    throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
    }

    static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
    }
    public T call() {
    task.run();
    return result;
    }
    }
    • ThreadPoolExecutor没有实现自己的submit方法,而是沿用的父类AbstractExecutorService的实现
    • 传入Runnable的任务时可以指定一个返回值,也可以不指定,如果指定的话,Runnable转为RunnableAdapter后的结果返回值就是传入的返回值,如果没有传入就是返回null,对应的使用FutureTask获得的返回值也就是对应的返回值,这个传入的结果值的作用类似于是一个flag
      • 因此可以说,虽然传入Runnable参数也可以获得Future的返回,可以用来判断异步任务的执行程度,以及处理任务运行的异常,但是获得有效返回值是不可能的,如果真想获得有效返回值应该去用Callable构建任务咯
submit方法中任务的封装
  • 使用submit提交任务在线程池中使用Callable-Future凭借的是FutureTask
    • submit方法内部会将提交的任务(无论是Runnable还是Callable)都通过newTaskFor方法转化为FutureTask再提交给execute方法
      • 当提交的为Runnable时,Runnble任务会在FutureTask内部进一步被封装为RunnableAdapter(父类是Callable)类型
    • submit方法的返回对象的类型也是FutureTask
  • FutureTask类实现的是RunnableFuture接口,而后者继承了RunnableFuture两个接口
    • 从线程池的角度来看此接口将任务与返回结合,可以直接将引用提交给execute方法,然后再将引用返回即可使用到两个接口的功能
    • Future只是一个接口,而它里面的cancel,get,isDone等方法要自己实现起来都是非常复杂的。所以JDK提供了一个FutureTask类(接口的实现)来供我们使用
    • FutureTask能够在高并发环境下确保任务只执行一次(执行前有状态判断,并且内部状态的维护是单向不可逆的)
    • 通过实现Runable接口,将自己改造成任务,以监控run方法中的任务执行情况,来更新Future的状态
补充
  • 补充CallableRunnable的差异:

    • 前者的执行方法内部可以有返回值,并且如果无法得到有效返回值还可以抛出异常,后者的执行方法中没有返回值也不能抛出异常
从源码角度分析FutureTask
  • Callable接口提供了线程执行任务并提供任务执行结果的功能,但是线程池执行的是异步任务,如何有效的获取异步任务的返回?靠的就是Future接口提供的功能,Executor架构的submit方法将二者结合,而FutureTask对Future接口做了具体的实现,即如何异步获取Callable任务的返回结果,如何做到这一点的?实现Runnable接口,监控任务的执行,以得到有效的返回结果

  • FutureTask在jdk1.7时依赖AQS构建,在JDK1.8后改为使用CAS+state变量的维护+WaitNode类型的链表来维护等待获取异步结果的线程(线程的阻塞使用的又是LockSupport)

    • 分析的过程中要注意,可能有多个线程尝试执行任务,但是要保证只执行一次
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    // 1. 从构造函数看起,可以接受Callable以及Runnable类型的任务,并将Ruunable任务转为RunnableAdapter类型的任务

    private volatile int state; // 维护任务的执行状态
    private Callable<V> callable; // 维护任务本身

    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW; // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

    // 注意此类是Executors工厂类提供
    static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
    }
    public T call() {
    task.run();
    return result;
    }
    }

    // 2. 入口函数run方法,此方法就是线程池内工作线程执行的方法

    // 维护正在执行当前任务的线程
    private volatile Thread runner;

    public void run() {

    // 避免重复执行
    // 执行前需要设置runner为当前线程
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
    null, Thread.currentThread()))
    return;
    try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
    // 执行异步任务
    result = c.call();
    ran = true;
    } catch (Throwable ex) {
    // 异步任务执行出错
    result = null;
    ran = false;
    // 执行异常状态更新
    setException(ex);
    }
    if (ran)
    // 执行任务执行完毕后的状态更新
    set(result);
    }
    } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    // 判断是否有线程中断发生,并及时响应
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    }


    // 3. set方法执行任务完成后的状态更新

    // 维护认知执行结果
    private Object outcome;

    protected void set(V v) {
    // 将状态更新为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    // 更新结果值
    outcome = v;
    // lazy 设置为NORMAL状态
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
    }
    }

    // 任务执行出现异常,返回异常信息,任务执行结束
    protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    finishCompletion();
    }
    }



    // 4. Future接口的常用 API
    // 判断任务是否取消
    public boolean isCancelled() {
    return state >= CANCELLED;
    }

    // 判断任务执行是否已经结束
    public boolean isDone() {
    return state != NEW;
    }

    // 取消任务
    // true----即便任务已经开始执行了也要中断
    // false----如果任务已经执行了,就先执行吧
    public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
    try { // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
    try {
    Thread t = runner;
    if (t != null)
    t.interrupt();
    } finally { // final state
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    }
    }
    } finally {
    finishCompletion();
    }
    return true;
    }

    // 阻塞获取异步结果
    public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
    s = awaitDone(false, 0L);
    return report(s);
    }

    // 超时阻塞
    public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
    throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    throw new TimeoutException();
    return report(s);
    }

    // 阻塞或超时阻塞函数 使用LockSupport

    // 等待线程队列的头节点
    private volatile WaitNode waiters;


    private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;

    // 自旋中判断任务执行的状态
    for (;;) {
    // 判断线程是否中断
    if (Thread.interrupted()) {
    removeWaiter(q);
    throw new InterruptedException();
    }

    int s = state;

    // 任务结束了,尝试获取返回值吧
    if (s > COMPLETING) {
    if (q != null)
    q.thread = null;
    return s;
    }
    // 还要再等等,等下一个时间片到的时候再看看
    else if (s == COMPLETING) // cannot time out yet
    Thread.yield();

    // 任务还没有开始执行
    else if (q == null)
    q = new WaitNode();

    // 排队
    else if (!queued)
    // 本次循环排好队,下次循环就得去阻塞了
    // 维护头节点
    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
    q.next = waiters, q);
    // 设置了超时
    else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
    removeWaiter(q);
    return state;
    }
    LockSupport.parkNanos(this, nanos);
    }
    else
    // 排hao队的线程执行阻塞
    LockSupport.park(this);
    }
    }

    // 5. 任务执行完毕(或者异常退出)后执行的方法
    // 唤醒等待的线程,可以尝试获取结果了
    private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
    if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
    // 在循环中不断唤醒等待链表中的线程
    for (;;) {
    Thread t = q.thread;
    if (t != null) {
    q.thread = null;
    LockSupport.unpark(t);
    }
    WaitNode next = q.next;
    if (next == null)
    break;
    q.next = null; // unlink to help gc
    q = next;
    }
    break;
    }
    }
    // 钩子函数等待被子类实现
    done();
    // 帮助GC
    callable = null; // to reduce footprint
    }
    • 无论是AQS还是FutureTask中,LockSupport以及其他的一些线程挂起方法总是在for死循环中使用的,这是因为,一方面需要线程自旋的去判断能否跳出自旋获得锁资源或者其他的什么资源,另一方面如果符合挂起条件后,就不再自旋,避免浪费CPU,等到被唤醒后,重新进入循环,判断条件是否能获得对应资源,而不是立即跳出循环
    • 注意有一个done函数作为钩子函数,在任务执行完毕后调用

向定时任务线程池中提交任务有以下三种方法(ScheduledExecutorService中定义)

  • schedule 一次性的延迟任务执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public ScheduledFuture<?> schedule(Runnable command,
    long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
    new ScheduledFutureTask<Void>(command, null,
    triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
    }


    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
    long delay,
    TimeUnit unit) {
    if (callable == null || unit == null)
    throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
    new ScheduledFutureTask<V>(callable,
    triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
    }

    public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
    }

    public Future<?> submit(Runnable task) {
    return schedule(task, 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Runnable task, T result) {
    return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }

    public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
    }
    • 这里值得补充的是,ScheduledThreadPoolExecutor重写了executesubmit方法,两个方法内部实际上都是简单地调用schedule方法来实现的
    • schedule支持Runnable或者是Callable的任务,另外两个方法只支持Runnable
  • scheduleAtFixedRate 从上一轮任务开始时刻算起的定时循环任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (period <= 0)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }
  • scheduleWithFixedDelay 从上一次任务结束开始算起的定时循环任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (delay <= 0)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }
    • 后两者的区别见图

      • 引出两个问题:

        • 如果在fixed-rate模式下,任务的执行时间大于间隔时间,那么任务是怎样安排执行的呢? 答案是:后续的任务会在上一个任务执行完毕后再开始执行,而不管执行间隔了,也就是延迟执行,而不是并发执行
        • 如果做定时间隔任务时,前边的任务出现异常,后续的任务会继续执行吗? 答:一旦出现异常,当前的任务与后续的任务都不会再执行,而是卡住,可以能通过自定义afterExecute钩子方法来处理异常,保证抛出异常的任务取消,而其他任务继续执行
  • 总结

    • 显然定时任务线程池的入口方法就不是execute了,而是delayedExecute,源码分析在后边
    • 与submit方法一样,定时任务线程池提交任务的几个方法都对提交的任务做了包装,具体的源码分析在后面,其之所以与submit包装性质不一样是因为,submit的包装是为了使用Future的特性,而定时任务线程池中的包装是为了实现定时任务的基本功能,源码分析在后面

销毁(关闭)线程池

  1. shutdown方法 关闭线程池,执行此方法可以使线程池的状态由RUNNING变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕

    1. 执行shutdown方法后,可以执行awaitTermination方法,则会等待指定的时间让线程池关闭,若在指定时间内关闭则返回true,否则false

    2. shutdown源码分析

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      // 上锁
      mainLock.lock();
      try {
      // 判断调用者是否有权限shutdown线程池
      checkShutdownAccess();
      // CAS 设置线程池状态为SHUTDOWN
      advanceRunState(SHUTDOWN);
      // 中断所有空闲线程
      interruptIdleWorkers();
      // 钩子函数
      onShutdown(); // hook for ScheduledThreadPoolExecutor
      } finally {
      // 解锁
      mainLock.unlock();
      }
      // 尝试终止线程池
      tryTerminate();
      }
      • 对于interruptIdleWorkers函数的解析与tryTerminate的解析放在了后边
  2. shutdownNow方法 闭线程池,执行此方法可以是线程的状态从RUNNING变为 STOP。线程池不再接受新任务,也不再处理队列中的任务,终止当前正在运行的任务,并返回正在等待执行的任务列表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    // 上锁
    mainLock.lock();
    try {
    // 判断调用者是否有权限shutdown线程池
    checkShutdownAccess();
    // CAS 设置线程池状态为STOP
    advanceRunState(STOP);
    // 中断所有线程
    interruptWorkers();
    // 从队列中获取剩余的未执行的工作列表
    tasks = drainQueue();
    } finally {
    mainLock.unlock();
    }
    // 尝试终止线程池
    tryTerminate();
    // 返回未执行的任务列表
    return tasks;
    }
    • interruptWorkers的解析放到了后文中
  3. terminate方法,彻底终止线程池,此方法只在ThreadPoolExecutor中提供,默认实现为空函数,真正执行线程池terminate的方法是tryTerminate方法,terminated作为钩子函数在tryTerminate函数中被调用,线程池的状态由TYDING状态转为TERMINATED状态

  4. 使用如下两个方法来判断线程池是否完全关闭

    1. isTerminated() 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true,或者是执行shutdownNow后,线程池内的线程全部被中断,工作线程数量为0后返回true(看起来是在描述TIDYING状态,实际上在TYDING状态下执行tryTerminate才会转为TERMINATE状态)

      1
      // ThreadPoolExecutor中的实现public boolean isTerminated() {  return runStateAtLeast(ctl.get(), TERMINATED);}
    2. isShutdown() 当调用 shutdown() 方法后返回为 true。

      1
      // ThreadPoolExecutor中的实现public boolean isShutdown() {  return ! isRunning(ctl.get());}

线程池的创建

使用Executors工厂类创建线程池

  • 创建线程池的最方便的做法是使用Executors工厂类,可以创建普通的线程池与可以执行定时任务的线程池,但是简单的创建方法意味着封装的程度高,就会导致自由度低,甚至有一些风险

普通的线程池

  • 固定线程数量的线程池

    1
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  return new ThreadPoolExecutor(nThreads, nThreads,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>(),                                threadFactory);}// 默认任务队列的长度是Integer.MAX_VALUEpublic LinkedBlockingQueue() {  this(Integer.MAX_VALUE);}
    • 核心线程数量与最大线程数量一致,也就是核心线程就是全部可用的线程了;keepAlive设置为0,因为没有非核心线程
    • 使用的是LinkedBlockingQueue作为任务队列,默认的长度是Integer.MaxValue,因此带来隐患
      • 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;所以一旦corePoolSize设置不对的话,将会有大量任务干等着,并且性能也没有完全发挥
      • 允许创建的线程个数虽然有限制,但是允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的任务,从而导致 OOM
        • 注意这里说出现OOM的意思不是说队列中的任务数量超过 Integer.MAX_VALUE 造成溢出,这是不可能的情况,如果真的达到这个数量,向队列中添加新的任务就会失败,出现OOM是因为队列中存储太多的任务实例了,因此可能导致占用内存过大导致可分配内存不够进入导致OOM
    • 当任务数量大于Integer.MAX_VALUE后,继续提交新的任务就直接会执行拒绝策略了
    • 使用场景在于维持固定的并发度
  • 仅有一个线程的线程池

    • 可以视为是固定线程数量线程池的特值情况,即nThreads为1的情况

      1
      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {  // 使用包装类包装过的  return new FinalizableDelegatedExecutorService    (new ThreadPoolExecutor(1, 1,                            0L, TimeUnit.MILLISECONDS,                            new LinkedBlockingQueue<Runnable>(),                            threadFactory));}
      • Executors中提供了不少包装类,用来实现特定功能

        1
        static class FinalizableDelegatedExecutorService  extends DelegatedExecutorService {  FinalizableDelegatedExecutorService(ExecutorService executor) {    super(executor);  }  // 线程池实例回收器可以执行shutdown方法  protected void finalize() {    super.shutdown();  }}// 只暴露被包装的ExecutorService实例中的所有接口方法,而不暴露其余实现类自定义的方法static class DelegatedExecutorService extends AbstractExecutorService {  // ...}
      • 在创建单线程线程池时使用包装类的目的在于:保证线程池实例维持单线程运行,不允许通过ThreadPoolExecutor的一些set方法来添加线程数量,比如setCorePoolSize

        • 使用了代理的设计模式(静态代理)
      • 因为可以看做是newFixedThreadPool的特例,所以隐患之处也一样,线程为1的目的在于保证所有任务按照执行顺序执行

  • 动态分配线程数量的线程池

    • 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

      1
      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>(),                                      threadFactory);}
      • 核心线程数量设置为0,没有核心线程,全员非核心线程,都会在60秒(注意是秒不是毫秒)内,接不到活时被回收
      • 动态分配与按需创建的功能的实现应归功于SynchronousQueue类型的任务队列,这个队列不会缓存任务,而是如果有空闲线程就一定会交给空闲线程执行,没有空闲线程就直接创建新线程
        • 在execute方法中首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前线程池中有线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),则任务成功添加到队列,并且随即被空闲线程消费,execute()方法执行完成;如果没有空闲线程则创建非核心线程去执行任务
      • 虽然队列使用的是有界队列,但是最大线程数量是Integer.MAX_VALUE,这意味着线程池可以不受控的一直接受任务,直到栈空间OOM
      • 使用场景或者优势在于灵活回收灵活分配

执行定时任务的线程池

1
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  return new ScheduledThreadPoolExecutor(corePoolSize);}public static ScheduledExecutorService newScheduledThreadPool(  int corePoolSize, ThreadFactory threadFactory) {  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,        new DelayedWorkQueue(), threadFactory, handler);}
  • 虽然队列使用的是有界队列,但是最大线程数量是Integer.MAX_VALUE,这意味着线程池可以不受控的一直接受任务,直到栈空间OOM

    • 需要注意的是,ScheduledTHreadPoolExecutor是ThreadPoolExecutord的子类,每一个构造函数中定义的最大线程数量都是Integer.MaxValue,所以不用工厂类自己手动创建也一样,无非就是能多自定义一个拒绝策略而已
      • 定时任务的实现依赖于延迟队列DelayedWorkQueue
      • 超时参数设置全部默认为0
  • 实现定时任务有其他的方案,比如springboot的@Scheduled注解与quartz等等

    备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等

ThreadPoolExecutor线程池原理

  • 直接使用ThreadPoolExecutor构造线程池实例,使用有界队列和有限数量的线程数会保证安全

线程池的状态

  • 在关闭线程池章节中,查看源码实际上会发现线程池有许多状态:

    1
    /**     * The main pool control state, ctl, is an atomic integer packing     * two conceptual fields     *   workerCount, indicating the effective number of threads     *   runState,    indicating whether running, shutting down etc     *     * In order to pack them into one int, we limit workerCount to   * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2     * billion) otherwise representable. If this is ever an issue in     * the future, the variable can be changed to be an AtomicLong,     * and the shift/mask constants below adjusted. But until the need     * arises, this code is a bit faster and simpler using an int.     *     * The workerCount is the number of workers that have been     * permitted to start and not permitted to stop.  The value may be     * transiently different from the actual number of live threads,     * for example when a ThreadFactory fails to create a thread when     * asked, and when exiting threads are still performing     * bookkeeping before terminating. The user-visible pool size is     * reported as the current size of the workers set.     *     * The runState provides the main lifecycle control, taking on values:     *     *   RUNNING:  Accept new tasks and process queued tasks     *   SHUTDOWN: Don't accept new tasks, but process queued tasks     *   STOP:     Don't accept new tasks, don't process queued tasks,     *             and interrupt in-progress tasks     *   TIDYING:  All tasks have terminated, workerCount is zero,     *             the thread transitioning to state TIDYING     *             will run the terminated() hook method     *   TERMINATED: terminated() has completed     *     * The numerical order among these values matters, to allow     * ordered comparisons. The runState monotonically increases over     * time, but need not hit each state. The transitions are:     *     * RUNNING -> SHUTDOWN     *    On invocation of shutdown(), perhaps implicitly in finalize()     * (RUNNING or SHUTDOWN) -> STOP     *    On invocation of shutdownNow()     * SHUTDOWN -> TIDYING     *    When both queue and pool are empty     * STOP -> TIDYING     *    When pool is empty     * TIDYING -> TERMINATED     *    When the terminated() hook method has completed     *     * Threads waiting in awaitTermination() will return when the     * state reaches TERMINATED.     *     * Detecting the transition from SHUTDOWN to TIDYING is less     * straightforward than you'd like because the queue may become     * empty after non-empty and vice versa during SHUTDOWN state, but     * we can only terminate if, after seeing that it is empty, we see     * that workerCount is 0 (which sometimes entails a recheck -- see     * below).     */// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;
    • 将注释翻译下来就是对着几个线程池状态的具体描述:

      • 其中五个状态

        RUNNING:接收新的任务,处理队列中的任务;

        SHUTDOWN:不接收新的任务,但处理队列中的任务

        STOP:不接收新的任务,不处理队列中的任务,中断正在执行的任务

        TIDYING:所有任务都终止,有效线程数为0线程过度到TIDYING时会调用terminated钩子方法

        TERMINATED:terminated()(实际上是tryTerminate方法)方法执行完毕后进入该状态;

      • 状态之间的转换

        RUNNING -> SHUTDOWN:调用shutdown方法;

        (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法;

        SHUTDOWN -> TIDYING:当线程池和任务队列都为空(队列中没有未执行的任务了,并且所有线程都完成了工作处于赋闲状态);

        STOP -> TIDYING:当线程池中工作线程数量为0(其实就是变为stop状态,对所有正在执行任务的线程执行中断,也不再处理队列中未处理的任务,一旦中断全部完成,所有工作线程数量就为0了,直接进入tidying状态,也不管队列中的任务了);

        TIDYING -> TERMINATED:当terminated(实际上是tryTerminate方法)方法执行完毕

      • 状态转换示意图

        image-20210419143355740
  • 线程池状态是由命名为ctl的AtomicIntegr的成员变量持有的(共32位),包含以下两个信息:

    • 线程池状态-最高3位

    • 线程池中线程数量-低29位

      1
      // 初始化线程池状态-RUNNING 0工作线程private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 设置位数 高3位与低29位分别表示线程池状态与线程池工作线程数量private static final int COUNT_BITS = Integer.SIZE - 3;// 线程的最大数量大概是5亿多private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;  // 111private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 000private static final int STOP       =  1 << COUNT_BITS;  // 001private static final int TIDYING    =  2 << COUNT_BITS;  // 010private static final int TERMINATED =  3 << COUNT_BITS;  // 011// Packing and unpacking ctl// 根据ctl获取线程池状态private static int runStateOf(int c)     { return c & ~CAPACITY; }// 根据ctl获取线程池中工作线程数量private static int workerCountOf(int c)  { return c & CAPACITY; }// 使用runstate与workercount组装ctl,初始状态下rs 为RUNNING wc为0private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {  return c < s;}private static boolean runStateAtLeast(int c, int s) {  return c >= s;}// 注意大小比较关系private static boolean isRunning(int c) {  return c < SHUTDOWN;}
      • 事实上Integer.MAX_VALUE大于CAPACITY,所以可以说实际上下边的Executors中的构造函数中的最大线程池数量是根本无法达到的
      • 为什么要把线程池数量和线程池状态维护在一个变量中?
        • 因为事实上是需要维护线程池内有效线程数量和线程池状态的一致性的(源码中实际上许多地方是同时判断线程池状态与线程池内有效线程的数量的),如果二者分开维护会因为维护二者的一致性而浪费锁资源;
        • 并且可以通过位运算去分别计算得到线程池状态与工作线程数量,效率也是很高的

ThreadPoolExecutor的整体架构与运行流程

image-20210419133922120

  • 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收

Executor框架内的核心类–ThreadPoolExecutor

  • 学习ThreadPoolExecutor构造方法(针对参数最多的学习)
1
public ThreadPoolExecutor(int corePoolSize,​                              int maximumPoolSize,​                              long keepAliveTime,​                              TimeUnit unit,​                              BlockingQueue<Runnable> workQueue,​                              ThreadFactory threadFactory,​                              RejectedExecutionHandler handler) {​        if (corePoolSize < 0 ||​            maximumPoolSize <= 0 ||​            maximumPoolSize < corePoolSize ||​            keepAliveTime < 0)​            throw new IllegalArgumentException();​        if (workQueue == null || threadFactory == null || handler == null)​            throw new NullPointerException();​        this.acc = System.getSecurityManager() == null ?​                null :​                AccessController.getContext();​        this.corePoolSize = corePoolSize;​        this.maximumPoolSize = maximumPoolSize;​        this.workQueue = workQueue;​        this.keepAliveTime = unit.toNanos(keepAliveTime);​        this.threadFactory = threadFactory;​        this.handler = handler;​    }
  • ThreadPoolExecutor 3 个最重要的参数

    • corePoolSize : 核心线程数定义了最小可以同时运行的线程数量,所谓最小的含义就是,这些线程创建后即便没有任务执行,是空闲的,也要维持运行,除非设置了allowCoreThreadTimeOut,如果设置为true,则在超过keepAliveTime之后,空闲的核心线程也会被回收
      • 注意这里会有一个误解需要澄清:并不是线程池启动起来后就立即维护起corePoolSize个线程,也是按需求来的,最先分配任务的corePoolSize自动成为核心线程
    • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。(注意是可以同时运行的最大的线程数量,不是当前正在运行的线程数量
    • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。拿队列当缓存,直到队列放满无法继续投放或者有空闲线程消费队列中的人物
      • 需要注意的是这个队列存储的仅仅是execute方法提交的Runnable任务,而不是其他的什么复杂的结构
  • ThreadPoolExecutor其他常见参数:

    • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁(注意这里针对的是核心线程外的线程,核心线程也就是最小维持数量的线程会一直维持运行),而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
      • 其起作用的机制在于getTask函数中调用的阻塞队列的poll函数的超时设置
    • unit : keepAliveTime 参数的时间单位。
    • threadFactory : 线程工厂。
    • handler :饱和策略。关于饱和策略下面单独介绍一下。当提交的任务过多而不能及时处理(这里的及时处理指的是任务队列已经满了,并且线程池已经达到了允许的最大线程量,)时,我们可以定制策略来处理任务
  • 饱和策略(对应的是任务拒绝模块)

    • 定义:饱和就是当任务队列满了,并且线程池当前同时运行的线程数量已经达到设定的最大值时的状态,更准确地定义应该是任务拒绝策略,而不仅仅是饱和策略,不仅仅线程池饱和的时候会执行拒绝,当线程池状态double-check线程池状态不是running状态时,也要对新提交的任务执行拒绝策略

    • 任务的拒绝是通过reject函数完成的, 默认提供4个拒绝策略,当然也可以实现自己的拒绝策略

      1
      final void reject(Runnable command) {  handler.rejectedExecution(command, this);}
    • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理,此policy也是使用Executors工具类创建线程池以及我们不指定饱和策略使用ThreadPoolExecutor构造函数时的默认的饱和策略

      • 如果是比较关键的业务,推荐使用此拒绝策略,这样在系统不能承载更多任务时,及时通过异常发现
      1
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  throw new RejectedExecutionException("Task " + r.toString() +                                       " rejected from " +                                       e.toString());}
    • ThreadPoolExecutor.CallerRunsPolicy:让提交任务的线程去运行任务,也就是直接在调用execute方法的线程(一般是主线程)中运行(run)被拒绝的任务,如果执行程序(线程池)已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能(因为Main线程去处理新提交的任务去了,就无法处理新的请求了)。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略

      • 此策略适合处理大量计算任务的任务类型
        • 多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕
      • 此策略可以做分流,请求可以分别缓存在线程池工作队列->工作线程(一般是main线程)->TCP 层->客户端 达成了一定程度的可伸缩功能
      1
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  // 如果线程池还在RUNNING状态  if (!e.isShutdown()) {    // 提交任务的当前线程直接执行任务    r.run();  }}
    • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。(直接丢弃掉,甚至不会抛出异常

      1
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  // 直接为空}
    • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求(所谓的抛弃最早的未处理的任务请求,就是抛弃下一个待处理的任务,处于头部的任务),丢弃后再次尝试提交新的任务

      1
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  if (!e.isShutdown()) {    // 直接兀自取出下一个任务    e.getQueue().poll();    // 重新提交任务    e.execute(r);  }}
    • 对于以上几种饱和策略的理解补充:

  • 任务队列(线程安全的阻塞队列)

    • 如果execute是线程池的任务调度器,是作为整个线程池的入口的话,那么任务缓冲队列可以看作是整个线程池的神经中枢,作为神经中枢,其起到了解耦任务与线程的操作,之所以有这一层缓冲,才有了所谓的调度—其实是一个生产者、消费者模式,其中生产者(一般是创建线程池的线程,main线程)投放任务,消费者(线程池中的工作线程)取任务去执行
    • BlockingQueue除了提供阻塞方法之外一般也会同时提供超时阻塞版本以及非阻塞版本,线程池中三种类型的方法都有使用
    • 显然是一个队列模型,构造函数要求的任务队列类型是BlockingQueue<Runnable>,但是这个是一个接口类,真正可以使用的实现类有如下几种(具体的描述看自己总结的线程安全的容器这个文章):
      • LinkedBlockingQueue 无界队列,所谓的无界队列就是队列长度默认设置为Integr.MAX_VALUE,使用工具类创建线程池时,newFixedThreadPool与newSingleThreadExecutor都是使用的此类型的队列
      • ArrayBlockingQueue 有界队列,一般自己使用的就是此队列,数组实现,一旦创建大小不可更改
      • DelayedWorkQueue 定时任务的线程池使用的是此队列,按照任务的下一次执行的时间的早晚进行排序
      • SynchronousQueue 直接提交,也就是不把任务存储,而知直接提交给线程,newCachedTHreadPool使用的就是这个
      • 除此之外还有多种任务队列可供使用,包括优先级队列,双端队列等等
    • 在普通线程池中,提交任务使用的是boolean offer(E e),是一个非阻塞的方法(即队列满了后直接返回false),消费任务使用的是E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;前者是阻塞的,后者是超时阻塞(keepAliveTime)
      • 一定注意区分阻塞方法与非阻塞的区别不在于是否要等待锁,锁肯定是要等待的,不得不阻塞的,重要的是在队列满了,或者队列为空时是否要阻塞
  • 线程工厂

    • 在创建线程池的时候,还可以传入一个重要的参数就是线程工厂,默认情况下的线程池创建线程的过程都是其内部的DefaultThreadFactory,但是如果要用自定义的方式创建线程,以使用同一个模板批量的创建线程的话,就需要用到这个线程工厂的参数
任务调度
  • 任务调度是整个线程池的入口,是整个线程池的核心所在,而这个任务调度对应的实际上就是execute方法

    • execute作为任务调度方法的大致运作流程是根据线程池的运行状态,工作线程的数量与运行策略来决定新提交的任务的三种可能的去向:
      • 直接申请线程执行
      • 缓冲到阻塞队列中
      • 使用配置好的拒绝策略,拒绝任务的执行
  • 线程池中最重要的方法一定是任务的提交执行方法,又由于submit内部实际调用了execute方法,所以直接查看ThreadPoolExecutor的execute方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


    // 获取当前线程池内正在运行的线程数量
    private static int workerCountOf(int c) {

    return c & CAPACITY;

    }


    // 任务队列
    private final BlockingQueue<Runnable> workQueue;



    public void execute(Runnable command) {

    // 首先肯定是检查的任务的有效性,如果为null就要报空指针异常

    if (command == null)

    throw new NullPointerException();

    /*

    ​ \* Proceed in 3 steps:

    ​ *

    ​ \* 1. If fewer than corePoolSize threads are running, try to

    ​ \* start a new thread with the given command as its first

    ​ \* task. The call to addWorker atomically checks runState and

    ​ \* workerCount, and so prevents false alarms that would add

    ​ \* threads when it shouldn't, by returning false.

    ​ *

    ​ \* 2. If a task can be successfully queued, then we still need

    ​ \* to double-check whether we should have added a thread

    ​ \* (because existing ones died since last checking) or that

    ​ \* the pool shut down since entry into this method. So we

    ​ \* recheck state and if necessary roll back the enqueuing if

    ​ \* stopped, or start a new thread if there are none.

    ​ *

    ​ \* 3. If we cannot queue task, then we try to add a new

    ​ \* thread. If it fails, we know we are shut down or saturated

    ​ \* and so reject the task.

    ​ */





    // 检查完提交的任务的有效性之后就要执行如上英文注释的三个步骤的处理了

    // 获得线程池的状态与当前运行的线程的数量的记录(ThreadPoolExecutor类中定义了五种线程池状态)
    // ctl更像是一个线程池的运行时上下文的状态维护变量

    int c = ctl.get();

    // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize

    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。

    if (workerCountOf(c) < corePoolSize) {

    if (addWorker(command, true))

    return;
    // addWorker失败,重新获得线程池状态,以进行下一步判断
    ​ c = ctl.get();

    ​ }

    // 2.如果当前之行的线程数量大于等于 corePoolSize 或者addWorker失败(失败的原因可能是有效线程的数量已经大于corePoolSize,所以需要缓存任务,在addWorker函数中确实有这样的逻辑)后就会走到这里

    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会将任务加入到任务队列中,并且判断是否能加入到任务队列中

    if (isRunning(c) && workQueue.offer(command)) {

    // 成功将任务添加到任务队列中

    // 再次检查线程池中的线程状态,并再次检查线程池中是否有可用的线程,因为自从上一次检查后,可能有线程已经完成了工作或者线程池已经shutdown了

    int recheck = ctl.get();

    // 如果线程池状态不是running状态,就要从任务队列中移除任务,相当于一次回滚,并执行构造函数中参数指定的饱和策略

    if (! isRunning(recheck) && remove(command))
    // 执行回滚
    ​ reject(command);

    // 如果当前线程池是running状态并且工作线程为0(之前运行的工作线程被回收了,corePoolSize也有可能被回收)就新创建一个线程,其中worker的初始任务为null
    //
    else if (workerCountOf(recheck) == 0)

    ​ addWorker(null, false);

    ​ }

    // 3. 任务队列已经满了,或者线程池已经不是running状态了 因此做最后的尝试,在创建一个新的线程试试,如果创建失败,表示线程池已经满了,因此执行饱和策略
    else if (!addWorker(command, false))

    ​ reject(command);

    }
    • 事实上,源代码的注释上就已经说的很清楚了。

      • 文字说明
        • 首先要保证的是,线程池状态必须是RUNNING状态才会接受新的任务,如果不是RUNNING状态,会直接执行拒绝策略reject函数
        • 如果当前线程池中的有效线程池数量小于corePoolSize说明还可以无脑添加新的线程并使其执行新的任务(这也说明corePoolSize数量的线程也是懒创建的,不是默认就自动维护这么多数量的线程
        • 如果大于的话,就要尝试将新的任务缓存到任务队列中,如果任务队列没有满就加到任务队列,如果满了,没办法只能继续扩张线程数量了,此时判断当前的有效线程数量是否小于maximumPoolSize,如果小于,则创建新的线程并用来执行新的任务;如果有效线程数量已经大于maximumPoolSize,只能去执行拒绝策略了
      • 不要忽略源码中的double-check机制,在成功将任务加到缓存队列中后,要double-check线程池的状态是否还是RUNNING状态,如果不是及时回滚提交的任务,执行拒绝策略,除此之外还要检查工作线程是否已经全部被回收,如果全部被回收,需要创建一个空的Worker以备用,不能让线程池为空(如果线程池一直为空的话,下一个任务相当于纯创建一个线程,无法发挥线程池的性能优势了)
线程管理与任务的获取
  • 线程池为了方便的掌握线程的状态与维护线程的周期,设计了工作线程对象Worker

  • Worker实现了Runnable的接口(使得Worker可以作为线程任务被执行,相当于将提交的任务做了包装,毕竟线程执行任务的行为需要Worker干预与FutureTask一样)和继承了AQS类(控制线程的中断,维护线程的生命周期)

    • 在Worker中实现了任务与完成任务的线程的结合
    1
    private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable{  final Thread thread;  Runnable firstTask;  // ...}
    • Worker中最重要的两个成员变量

      • thread,在Worker的构造函数中被创建,使用的是ThreadPoolExecutor创建时传入的threadFactory去执行线程的构建

      • firstTask,firstTask用来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,执行完firstTask后再去队列中取后续的任务如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务

        • 上边的描述针对的是一般的线程池,对于定时任务线程池来说,添加核心线程时,firstTask也是为null
        image-20210423201541482
  • addWorker函数(任务调度时创建核心线程执行任务或者创建非核心线程,并启动工作线程)

    1
    // 全局锁,并发操作必备private final ReentrantLock mainLock = new ReentrantLock();// 跟踪线程池的最大大小,应该只有在持有全局锁mainLock的前提下才访问此属性private int largestPoolSize;// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合private final HashSet<Worker> workers = new HashSet<>();//获取线程池状态private static int runStateOf(int c)     { return c & ~CAPACITY; }//判断线程池的状态是否为 Runningprivate static boolean isRunning(int c) {  return c < SHUTDOWN;}// 返回true表示创建并启动线程成功// firstaTask就是这个线程的初始任务// 第二个参数为true表示新的worker也就是工作线程是尝试加到corePool中还是maximumPool中private boolean addWorker(Runnable firstTask, boolean core) {        // retry标志位常用于多循环嵌套的流程控制        retry:        for (;;) {           // 获取线程池状态            int c = ctl.get();            int rs = runStateOf(c);                        // 如果状态>= SHUTDOWN 表示线程池是正在关闭(SHUTDOWN)或者已经关闭(>SHUTDOWN)状态的处置方法:            // 如果此时是shutdown状态,并且没有分配初始任务,并且任务队列不为空,则是允许创建新的worker的(此时新创建的worker用来在SHUTDOWN状态下,执行任务队列中剩余的任务),违反任一则是不允许的,比如线程池已经关闭(>SHUTDOWN)或者是SHUTDOWN状态,但是附加了自己的初始任务,是不允许的,只能执行队列中剩余的任务,或者队列已经为空了,不再需要新的worker了,也会创建失败            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {               // 获取线程池中的线程数量                int wc = workerCountOf(c);                // 判断当前线程池的数量与哪个值比较,如果已经达到最终的最大值CAPACITY,立即返回false                // 否则根据参数判断与哪个值比较,是最小值还是最大值比较,如果目标是创建核心线程,就和corePoolSize比较,如果已经达到设计大小了,就创建失败,如果目标是创建非核心线程,就和maximumPoolSize比较                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                // 上一步判断后还可以添加worker,就使用CAS增加worker计数                if (compareAndIncrementWorkerCount(c))                    // 跳出整个循环                    break retry;                // CAS 失败                c = ctl.get();                // 如果线程池状态发生了改变,                if (runStateOf(c) != rs)                    // 从头开始执行整个外部的for循环,重新根据线程池状态进行判决                    continue retry;                // 省略的else表示的就是CAS失败的原因是线程数量被同步修改了,只需要重新执行内部的for循环,根据线程数量进行判决即可            }        }          // 线程数量成功更新,    // 初始化工作线程启动成功标志        boolean workerStarted = false;     // 初始化工作线程创建成功标志        boolean workerAdded = false;        Worker w = null;        try {            // 创建worker实例            w = new Worker(firstTask);            // 获得worker持有的线程实例            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;               // 加锁                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    // 获取线程池状态                    int rs = runStateOf(ctl.get());          // < shutdown也就是running状态下执行操作                      // 或者是在SHUTDOWN状态下,并且firstTask为空时执行下述操作                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        // 判断线程是否已经启动了,若已经启动了,抛出异常                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                       // 将新创建的worker实例添加到worker集合中,是一个HashSet集合                        workers.add(w);                       // 更新largestPoolSize                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                       // 设置worker添加成功标志位                        workerAdded = true;                    }                } finally {                    // 释放锁                    mainLock.unlock();                }                if (workerAdded) {                    // 启动worker                    // 实际上执行的是runWorker函数,参数为worker实例                    t.start();                    // 设置worker启动成功标志位                    workerStarted = true;                }            }        } finally {            if (! workerStarted)               // worker启动失败,要执行回滚               // 从工作线程集合中移除新添加的Worker实例               // 线程池状态中线程池数量-1               // tryTerminate              addWorkerFailed(w);        }        // 返回worker是否启动的状态        return workerStarted;  }
    • 线程池中的Worker存储在HashSet结构中,只能在加锁的时候可以访问此线程集

      • 线程池通过维护workers这个集合来维护线程不被回收,当需要回收时,只需要将其引用消除,也就是将Worker对象消除即可,jvm会完成后续的回收(详见线程回收小节)
    • 线程池本身如何保证在运行时保证线程安全呢—–使用ReentrantLock,同时Condition用来支持awaitTermination的实现

      1
      private final ReentrantLock mainLock = new ReentrantLock();private final Condition termination = mainLock.newCondition();public boolean awaitTermination(long timeout, TimeUnit unit)        throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (;;) {                if (runStateAtLeast(ctl.get(), TERMINATED))                    return true;                if (nanos <= 0)                    return false;              	// 在tryTerminate方法中被唤醒                nanos = termination.awaitNanos(nanos);            }        } finally {            mainLock.unlock();        }    }
    • addWorker源码中,也能看见线程池状态与线程池数量共同决定流程走向的场景,这就是为什么要把这两个状态维护在一个变量中的原因

    • 流程图

    image-20210426092911659
  • runWorker函数,worker开始执行任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    /*
    * 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢?
    */

    // Worker的线程实例是在Worker构造函数中完成初始化的,注意,传入newThread的是this,也就是Worker实例本身被当做一个Runnable任务提交到了线程中,所以调用线程实例的start方法时,就会执行Runnable任务也就是Worker实例的run方法

    Worker(Runnable firstTask) {
    setState(-1); // 将AQS计数设置为-1,目的是为了在worker初始化导致runWorker被执行的期间内不被中断
    this.firstTask = firstTask;
    // thread成员变量由默认的或者指定的线程工厂创建,传入的Runnable参数是Worker实例本身
    this.thread = getThreadFactory().newThread(this);
    }

    // 新线程启动后执行此方法
    public void run() {
    runWorker(this);
    }



    // addWorker:
    // 创建worker实例
    w = new Worker(firstTask);
    final Thread t = w.thread;

    // 开启新线程后执行Runnbale参数的run方法,也就是Worker实例的run方法
    t.start()

    /*
    * 上述代码是addWorker中的启动线程的代码
    * 下边的代码是runWorker中的代码
    */


    // 实际在新线程中执行的方法
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 首先保存初始任务。再清空初始任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // worker 允许中断,此时worker的状态是空闲状态,可以被回收(中断)

    // 初始化线程异常退出标志位
    boolean completedAbruptly = true;
    try {
    // 执行初始化任务,或者while循环不断的阻塞以从任务队列获得新的任务,除非getTask返回null,表示已经无法获得任务,需要执行线程回收
    while (task != null || (task = getTask()) != null) {
    // 成功的获取了任务
    // 开始执行任务,不允许中断,此时线程是非空闲状态
    w.lock();

    // 执行recheck 如果线程池已经关闭了,并且当前线程还没有中断,就要执行对当前线程的中断,否则要保证当前线程不是中断状态
    if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
    wt.interrupt();
    try {
    // 钩子函数,默认的钩子函数的函数体为空,可以去构造ThreadPoolExecutor的子类去复写此钩子函数
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    // 执行任务
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    // 钩子函数与beforeExecute同理
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    // worker的完成的任务数量加1,注意此时是线程安全的
    w.completedTasks++;
    // 释放锁
    w.unlock();
    }
    }
    // 线程不是因为异常退出的,而是因为无法获得任务导致退出的
    completedAbruptly = false;
    } finally {
    // while循环已无法通过getTask获得新的任务了,具体的原因参考后续的getTask方法
    // 执行线程回收
    // 如果是因为task执行时出现异常,completedAbruptly为true,否则为false
    processWorkerExit(w, completedAbruptly);
    }
    }
    • 总的来说,Worker启动后的命运就是孜孜不倦的获取任务与执行任务,一旦不能及时的接受任务就会被销毁,这就是打工人?

    • 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢?这一块比较绕,可以直接看代码的注释

      • 这里就体现了Worker类实现Runnable接口的作用,就是将自己包装为可执行任务,以便于干预线程执行任务的过程
    • 注意方法中出现的对于Worker的声明周期维护

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 1. Worker初始化的时候设置state为-1,避免工作线程初始化到正常执行任务这段时间内被回收
      Worker(Runnable firstTask) {
      setState(-1);
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
      }

      // 2. 进入runWorker函数后,此时线程需要自己努力获取任务并执行任务了,如果一直获取不到,是可能被回收的,因此要释放锁

      // 3. 如果领导了任务,要设置加锁,表示自己在工作了,不要回收自己

      // 4. 任务执行完毕或者执行任务出现异常要释放锁
    • 注意两个钩子函数,可以在子类中复写以实现其功能

      • beforeExecute
      • afterExecute
    • 导致出现线程回收的原因有两个

      • 任务执行过程中出现异常,注意每次catch后,都会记录下异常,然后重新抛出异常,导致跳出最外边的try代码块
      • 无法从队列中获取新的任务,跳出while循环具体原因参考geTask方法
  • getTask方法,也就是从队列中获取任务的方法也是很重要的,主要功能是核心线程获取任务或保持阻塞,非核心线程获取任务,或超时返回null,进而线程生命周期结束

    • 在这个函数中,充分体现了核心线程与非核心线程的区别,以及keepAlive参数起作用的机制;同时也注意allowCoreThreadTimeOut参数的作用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    private Runnable getTask() {
    // 获取任务超时的标志位
    boolean timedOut = false;

    for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);


    //线程池状态是STOP之后的状态,表示已经不处理任务了,或者是SHUTDOWN时,任务队列已经为空,想处理也没的处理了,就直接返回null,worker会被直接回收
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    // 工作线程数量-1
    decrementWorkerCount();
    return null;
    }

    int wc = workerCountOf(c);

    //是否超时控制,allowCoreThreadTimeOut默认false,代表不对核心线程做超时限制,对于超出核心线程的线程需要控制超时
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


    //当线程数大于最大线程数,即线程池已经满了,或者需要做超时控制且上次获取任务就已经超时这两个任一的条件下
    //且线程数大于1或者队列为空,尝试将线程数减一并返回null
    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    // 失败重试,重新根据线程池状态与线程池中线程数量做判断
    continue;
    }

    try {
    //当需要超时控制时,在keepAliveTime时间内没有获取到任务的话会设置超时标志位,如果没有超时限制,则调用take获取任务,此时线程是阻塞等待获取任务的
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    // 阻塞等待获取任务时,整个worker并没有加锁,也就是被认为是空闲状态,可能会被回收掉
    timedOut = false;
    }
    }
    }
    • 这里需要补充的就是任务队列的poll与take方法虽然名称差异比较大,但是唯一的差异在于前者是加了超时时间,后者是阻塞

    • 注意很关键的一句判断boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;,直接决定了对于核心线程与非核心线程的不同的对待方式

      image-20210422141443980

    • getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收

      • 比如在下边这段代码中

        1
        if ((wc > maximumPoolSize || (timed && timedOut))    && (wc > 1 || workQueue.isEmpty())) {  if (compareAndDecrementWorkerCount(c))    return null;  // 失败重试,重新根据线程池状态与线程池中线程数量做判断  continue;}
        • 当前线程池内的线程数量超过最大值会进行线程回收
        • 存在超时设置,并且上一次获取任务已经超时时,如果任务队列还有不少任务,线程数量又恰好只有一个,是不会对当前这个独苗进行回收的,而是再试试
    • decrementWorkerCountcompareAndDecrementWorkerCount二者的区别是什么

      • decrementWorkerCount内部是在循环调用compareAndDecrementWorkerCount,换句话说就是,必须要尝试将工作线程数量-1,因为确实不需要此线程了,而compareAndDecrementWorkerCount直接拿来用,只是尝试一次将工作线程-1,如果失败的话,就要重新根据状态做出可能与之前不同的判断
  • 线程回收,processWorkerExit

    • 实际上,关于线程回收,是有两种场景的:1. 主动的线程回收,比如processWorkerExit函数这样的(runWorker函数中调用) 2. 探查式的回收,或者说是被动的回收,比如interruptIdleWorkers(shutdown函数、tryTerminate方法中调用)3. 强制回收

    • 主动回收:在runWorker函数中,如果无法再获得任务,就会跳出执行此线程回收函数,实际上线程池中线程的回收依赖的是JVM的自动回收,线程池要做的只是把线程的引用消除而已

      1
      private void processWorkerExit(Worker w, boolean completedAbruptly) {        // 一个标志位,是否是因为发生线程异常,所以进入的此方法        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted            // 工作线程数-1            decrementWorkerCount();        final ReentrantLock mainLock = this.mainLock;        // 加锁,因为要进行审计计数了        mainLock.lock();        try {            // 统计此worker的完成的任务数目            completedTaskCount += w.completedTasks;           // 从线程池中移除此线程            // 执行remove方法完毕后,实际上已经完成了线程的回收,但是由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程-----即所谓的线程状态自适应的过程            workers.remove(w);        } finally {            mainLock.unlock();        }         // 尝试中断、回收空闲线程        tryTerminate();        int c = ctl.get();        if (runStateLessThan(c, STOP)) {            if (!completedAbruptly) {                // 线程池状态是RUNNING或SHUTDOWN状态并且并非因为异常导致线程关闭的情况下                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                if (min == 0 && ! workQueue.isEmpty())                    min = 1;                // 如果线程够用,就直接返回,否则还要添加一个worker到线程池                if (workerCountOf(c) >= min)                    return; // replacement not needed            }          // 如果因为线程异常导致的线程关闭的话,还需要再向线程池中补充一个worker          // 或者是此时线程数量不能满足最小要求时也要再添加一个worker            addWorker(null, false);        }    }
      • workers访问时要加锁
      • 在工作线程回收的方法中,比如getTask方法与本方法中总能发现对于线程池的线程的自适应调整过程
    • 被动回收:上述代码中提到的tryTerminate方法,也就是在某worker结束生命周期后判断线程池是否要关闭线程池以及回收空闲线程,以便有效的管理线程池的生命周期,在所有可能线程池可能已经终止的地方都调用了此方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      final void tryTerminate() {
      for (;;) {
      int c = ctl.get();
      //当线程池状态是RUNNING(状态正常)或者已经TIDYING或者已经TERMINATED(线程已经快关闭了)或者SHUTDOWN且还有任务没有被执行(SHUTDOWN状态需要处理完队列中的任务),直接返回
      if (isRunning(c) ||
      runStateAtLeast(c, TIDYING) ||
      (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
      return;
      // 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量不为0,需要最多终止1个空闲的线程,上边所述的stop状态或者shutdown状态并且queue为空统称为终止流程开始的状态
      // 如果线程数不为0,则中断一个阻塞等待任务的空闲的工作线程
      if (workerCountOf(c) != 0) {
      // 尝试中断最多一个阻塞等待任务的空闲的工作线程
      interruptIdleWorkers(ONLY_ONE);
      return;
      }
      // 如果当前工作线程数量为0就准备关闭线程池
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      // 尝试设置线程池状态为tidying状态
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
      try {
      // 如果设置成功调用此钩子方法
      terminated();
      } finally {
      // 钩子方法执行完毕后,设置状态为TERMINATED,并设置线程数量为0
      ctl.set(ctlOf(TERMINATED, 0));
      // 通知调用awaitTermination的主线程,已经进入了TERMINATION状态
      termination.signalAll();
      }
      return;
      }
      } finally {
      mainLock.unlock();
      }
      // CAS失败的话,就重新根据状态进行判断
      }
      }
      • 基本流程如下
        • 如果线程池状态正常、线程池为SHUTDOWN状态但是还有存量任务要执行(嘛事没有)或者已经要结束了(TYDING、TERMINATE)(已经凉透了)则直接返回
        • 如果是STOP状态或者SHUTDOWN状态但是已经没有存量任务了,此时判断工作线程数量是否为0,不为0则尝试回收一个空闲线程,为0的话则说明要转TYDING状态与TERMINATED状态了,直接设置状态为TYDING,执行termintaed钩子函数,再设置为TERMINATED状态,唤醒awaitTermination的等待
      • 调用了tryTerminate方法的地方有
        • addWorkerFailed
        • processWorkerExit
        • shutdown
        • shutdownNow
        • remove从队列中移除某任务
        • purge从队列中移除所有被取消的任务
      • 在被动回收过程中,最重要的就是能了解线程的当前状态,在主动回收中尚且可以知道线程是需要回收的,但是被动回收时实际上并不清楚线程池中线程的状态,Worker通过继承AQS,使用AQS来实现不可重入的独占锁(使用AQS的独占模式)这个功能
        • 没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态
        • lock方法一旦获取了独占锁,表示当前线程正在执行任务中,如果正在执行任务,则不应该中断线程
        • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断
      image-20210427175959020
    • 被动回收:interruptIdleWorkers,中断空闲线程,使其不再阻塞等待任务,最主要的是在shutdown函数、tryTerminate中调用

      1
      private void interruptIdleWorkers(boolean onlyOne) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    for (Worker w : workers) {      Thread t = w.thread;      // 判断线程是否已经被中断,是的话就什么都不做      // 若未被中断,还要尝试获取worker的锁,此时如果worker如果已经通过lock方法获取了锁,则因为其不可重入的特性,导致此处为false,即对该worker不做任务处理      // 使用tryLock方法来判断线程池中的线程是否是空闲状态      if (!t.isInterrupted() && w.tryLock()) {        try {          // 执行线程中断          t.interrupt();        } catch (SecurityException ignore) {        } finally {          // worker释放锁          w.unlock();        }      }      // 如果未true,最多只会中断一个空闲线程,也可能一个线程也没有中断      // 如果为false,则会持续遍历全部的worker,并尝试中断所有的空闲的线程      if (onlyOne)        break;    }  } finally {    mainLock.unlock();  }}public void shutdown() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    checkShutdownAccess();    advanceRunState(SHUTDOWN);    interruptIdleWorkers();    onShutdown(); // hook for ScheduledThreadPoolExecutor  } finally {    mainLock.unlock();  }  tryTerminate();}// shutdown中调用了中断所有空闲线程的方法private void interruptIdleWorkers() {  interruptIdleWorkers(false);}
      • 使用Worker的tryLock函数尝试获取锁,如果对应的Worker正在执行任务,则获取锁失败,否则获取成功,表明线程空闲,则执行中断

      • 在shutdown函数中是回收全部的空闲线程,tryTerminate是尝试回收一个

      • 只有在线程池终止流程开始状态下(线程池状态准备转入TIDYING状态,但是还有空闲线程的时候),传入的参数为true,其余调用都是false,也就是中断所有的空闲线程

        • 为什么仅在tryTerminate方法中,传入的参数为true,也就是最多中断一个空闲的线程呢?(解释的不是很清除,自己不是很懂…….)

          • 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量还不为0,这说明,有可能是剩余的所有线程都是阻塞,而不能传递shutdown的指令,在线程池终止流程开始的状态下,必须最多使一个阻塞在等待获取任务的线程中断,才能传播shutdown信号,以免所有的线程陷入等待而无法关闭线程池

          • 中断一个空闲线程,也能保证在线程池已经是SHUTDOWN状态后,新来的Worker也能最终退出

          • 综上,为了保证线程池未来最终能够终止,总是仅中断一个空闲的工作程序就足够了,但是shutdown会中断所有空闲的工作程序,以便多余的工作程序迅速退出

          • 参考interruptIdleWorkers的注释

          Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted).
          Params:
          onlyOne – If true, interrupt at most one worker. This is called only from tryTerminate when termination is otherwise enabled but there are still other workers. In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting. Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit. To guarantee eventual termination, it suffices to always interrupt only one idle worker, but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.

    • 强制回收:interruptWorkers shutdownNow中调用,所有线程都要回收

      1
      public List<Runnable> shutdownNow() {  List<Runnable> tasks;  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    checkShutdownAccess();    advanceRunState(STOP);    interruptWorkers();    tasks = drainQueue();  } finally {    mainLock.unlock();  }  tryTerminate();  return tasks;}private void interruptWorkers() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {    for (Worker w : workers)      // 遍历所有工作线程进行回收      w.interruptIfStarted();  } finally {    mainLock.unlock();  }}// 定义在worker类中,粗暴的打断所有的已经执行过runWorker方法的workervoid interruptIfStarted() {  Thread t;  // getState() >= 0即state != -1,也就是不是刚初始化的Worker,而是已经运行runWorker的Worker  // 直接在线程层面执行中断,而不管worker此时是否是正在运行的状态(不用去获取worker的锁)  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {    try {      t.interrupt();    } catch (SecurityException ignore) {    }  }}
      • 执行shutdownNow函数后要立即中断所有正在工作的线程,因此不去尝试获取锁而是直接执行线程的中断
  • 从AQS的角度理解Worker的生命周期

    image-20210717144933245

    Worker使用的是AQS的独占模式,使用独占的特性来判断Worker本身是空闲状态(未上锁)还是工作状态(上锁)

    1
    //1. worker初始化Worker(Runnable firstTask) {  setState(-1); // 设置AQS计数标志为-1,其目的是为了防止初始化到runWorker执行这段时间内被中断  this.firstTask = firstTask;  this.thread = getThreadFactory().newThread(this);}// 2. runWorker函数final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  Runnable task = w.firstTask;  w.firstTask = null;  // 至此worker是被以被中断的,也就是进入了空闲状态  w.unlock();  // ...  w.lock();}// worker释放锁public void unlock()      { release(1); }// 独占模式下释放资源public final boolean release(int arg) {  if (tryRelease(arg)) {    Node h = head;    if (h != null && h.waitStatus != 0)      unparkSuccessor(h);    return true;  }  return false;}protected boolean tryRelease(int unused) {  // 设置独占的线程为null  setExclusiveOwnerThread(null);  // 设置状态为0  setState(0);  return true;}// worker上锁public void lock()        { acquire(1); }public final void acquire(int arg) {  if (!tryAcquire(arg) &&      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    selfInterrupt();}// worker的实现,其实根本没有用到参数--1 因为规定就是状态1为上锁的状态,所以直接用的常量1protected boolean tryAcquire(int unused) {  // 尝试获得worker的锁,必须保证锁状态的旧状态是0,才能设置状态为1  if (compareAndSetState(0, 1)) {    // 设置当前线程为独占线程    setExclusiveOwnerThread(Thread.currentThread());    return true;  }  return false;}// interruptIdleWorkers函数执行时尝试中断空闲的线程,会通过尝试获取锁的方法来判断线程的状态// 在tryAcquire方法中尝试设置状态为1,但是状态的当前值应是0(即执行unlock()之后),才能设置成功// 这一点也保证了,在Worker初始化设置状态为-1到runWorker的状态设置为0时,是能够保证不被中断的public boolean tryLock()  { return tryAcquire(1); }

ScheduledThreadPoolExecutor线程池原理

  • 上边说过了,定时任务线程池的核心入口就是delayedExecute,但是在说这个关键的入口方法之前,不得说下,调用方法前对于提交的任务的包装,包装这一块设计到的类比较多,先用一张类图大致把握

    • 之所以在定时任务线程池中需要着重讲解任务的包装是因为其不同于普通线程池中对于任务的包装主要是用来实现Future的相关功能实现异步的获取任务执行的返回结果或者是异常,定时任务线程池中对于任务的包装可以直接用来实现定时执行逻辑

    image-20210425151258617

  • 首先包装为ScheduledFutureTask

    1
    private class ScheduledFutureTask<V>            extends FutureTask<V> implements RunnableScheduledFuture<V> {private long time;private final long period;// 用于包装schedule(Runnable)提交的任务// result为null,ns是纳秒为单位的,要触发执行任务的系统时间ScheduledFutureTask(Runnable r, V result, long ns) {  super(r, result);  this.time = ns;  this.period = 0;  this.sequenceNumber = sequencer.getAndIncrement();}  // 包装schedule(Callable)提交的任务// ns是纳秒为单位的,要触发执行任务的系统时间ScheduledFutureTask(Callable<V> callable, long ns) {  super(callable);  this.time = ns;  this.period = 0;  this.sequenceNumber = sequencer.getAndIncrement();}    // 包装scheduleWithFixedDelay和scheduleAtFixedRate提交的任务// result 为null// ns是纳秒为单位的,下一次要触发执行任务的系统时间// period是以纳秒为单位的任务循环周期ScheduledFutureTask(Runnable r, V result, long ns, long period) {  super(r, result);  this.time = ns;  this.period = period;  this.sequenceNumber = sequencer.getAndIncrement();}// Worker开始执行任务时执行此方法,要区分是一次性执行的任务,还是周期执行的任务public void run() {  // 首先判断是不是周期性执行的任务  boolean periodic = isPeriodic();  // 判断当前的线程池能否执行定时任务,如果不能则取消任务  if (!canRunInCurrentRunState(periodic))    cancel(false);  else if (!periodic)    // 如果不是周期性任务,也就是一次性的定时任务的话,直接执行提交的任务    ScheduledFutureTask.super.run();  // 如果是周期性执行的任务,首先执行提交的任务,并将任务的状态重置为初始化状态,以备下一次执行  else if (ScheduledFutureTask.super.runAndReset()) {    // 执行完毕后计算下一次执行的时间    setNextRunTime();    // 重新提交当前的任务到延时队列中,用于下一个周期的执行    reExecutePeriodic(outerTask);  }}   // 定时循环任务在执行完一次后重新提交执行,所谓的提交就是直接访达延迟队列中  void reExecutePeriodic(RunnableScheduledFuture<?> task) {    if (canRunInCurrentRunState(true)) {      super.getQueue().add(task);      if (!canRunInCurrentRunState(true) && remove(task))        task.cancel(false);      else        ensurePrestart();    }  }// 计算下一次要执行任务的时间// time表示下一次执行任务的时间,period是用来计算time的周期时间// 此方法中体现出scheduleAtFixedRate与scheduleWithFixedDelay的区别private void setNextRunTime() {  long p = period;  if (p > 0)    // scheduleAtFixedRate    // 在第一次执行完任务后,下一次要执行的时间就是完全按照周期来执行,不管到底什么时候执行完的,之后的每次执行都是如此    time += p;  else    // scheduleWithFixedDelay    // 第一次执行完任务后,下一次要执行的时间是以当前时间为基准计算的,也就是上一次完成任务的时间为基准计算的,之后的每次执行都是如此    time = triggerTime(-p);}      // 下边的两个关键方法用于在DelayedWorkQueue中进行任务的排序与延迟的计算// 用于在延迟队列中按照下一次触发的顺序进行排序public int compareTo(Delayed other) {  if (other == this) // compare zero if same object    return 0;  if (other instanceof ScheduledFutureTask) {    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;    long diff = time - x.time;    if (diff < 0)      return -1;    else if (diff > 0)      return 1;    // 触发时间一致的,按照提交的顺序来    else if (sequenceNumber < x.sequenceNumber)      return -1;    else      return 1;  }  long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);  return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}// 计算从当前时刻到下次执行任务还有多长时间public long getDelay(TimeUnit unit) {  return unit.convert(time - now(), NANOSECONDS);}    }
    • 与FutureTask(确实继承了FutureTask)和Worker一样都实现Runnable接口,以便于控制run方法内任务的执行
      • 应该说这样的设计使用了静态代理的技术
    • 注意两个重要的属性
      • time 下次执行定时任务的时间
      • period 用来计算下次执行任务的周期时间,显然period是否为0是schedule方法与另外两方法的最大区别
        • 除此之外的另一个区别就是outterTask,后边有介绍
    • 延迟任务队列的正常工作是保证定时任务能够正常调度的核心,对应的就是关键的两个方法,compareTo方法与getDelay方法
      • ScheduleFutureTask实现了Comparable接口,其目的就在于要在延迟任务队列中,按照下次执行的时间顺序排列,先执行的排在前边
      • 当Woker取下一个要执行的任务时,如果任务队列不为空,那么还要等待下一个任务到了执行时间之后,再交给线程池去执行,等多场时间?用getDealy计算
      • 应该可以这样说,从大体上看,延迟队列可以按照延迟时间进行排序+延迟队列中可以使用getDelay方法来控制获取任务的时延–这两个特性是直观上的延迟任务线程池起作用的关键
      • DelayedWorkQueue的源码在后边,主要就会提交任务和获取任务的三个方法
    • scheduleWithFixedDelayscheduleAtFixedRate在实现时的区别就在于此次包装过程中,前者传入的周期是unit.toNanos(-delay)而后者是unit.toNanos(perioid)
      • 其原理在于setNextRunTime方法中,详见方法注释
        • scheduleAtFixedRate是直接对上一次的time进行累加
        • scheduleAtFixedDelay是按照当前的时间计算下一次的time
    • 在此次包装过程中,定时循环任务与一次行的定时任务在实现上除了period之外还有一个区别就是outerTask
      • 定时循环任务会持有此属性,以便能够在本轮任务执行完毕后,将当前的任务重新提交到延迟队列中,以备下一轮周期的执行,参考reExecutePeriodic方法
  • 其次包装为RunnableScheduleFuture

    1
    protected <V> RunnableScheduledFuture<V> decorateTask(  Runnable runnable, RunnableScheduledFuture<V> task) {  return task;}protected <V> RunnableScheduledFuture<V> decorateTask(  Callable<V> callable, RunnableScheduledFuture<V> task) {  return task;}
    • 实际上是直接返回RunnableScheduledFuture,但是没有看懂为什么要用这样的一个方法类型提升
  • 定时任务线程池的入口方法delayedExecute

    1
    private void delayedExecute(RunnableScheduledFuture<?> task) {  // 1. 判断线程池是不是shutdown状态,如果是执行拒绝策略  if (isShutdown())    reject(task);  else {    // 2. 首先就是向DelayedWorkQueue中添加任务    super.getQueue().add(task);    // 3. 不管是一般的线程池还是执行定时任务的线程池,都会在向队列中添加完任务后执行re-check    if (isShutdown() &&        !canRunInCurrentRunState(task.isPeriodic()) &&        remove(task))      task.cancel(false);    else      // 4. 如果通过了recheck,执行此方法      // 确保线程池内有线程运行      ensurePrestart();  }}void ensurePrestart() {  int wc = workerCountOf(ctl.get());  // 对于Executors创建的线程池来说,核心线程数量为0,所以会保证有非核心线程执行  if (wc < corePoolSize)    addWorker(null, true);  else if (wc == 0)    addWorker(null, false);}
    • 如果线程池状态不是SHUTDOWN的话,直接向队列中添加任务,而没有直接让线程去执行任务的场景
  • addWorker开始,后续的就是标准的线程池的线程管理与任务获取的流程了,也就是说定时任务线程池与一般线程池的主要区别在于任务调度部分,而连接任务管理与线程管理的通道–延时队列也需要大致了解下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

    // 任务调度时提交任务的方法就是add方法
    public boolean add(Runnable e) {
    return offer(e);
    }

    public boolean offer(Runnable x) {
    if (x == null)
    throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    int i = size;
    if (i >= queue.length)
    grow();
    size = i + 1;
    if (i == 0) {
    queue[0] = e;
    setIndex(e, 0);
    } else {
    // 按照排序规则,选择合适的位置插入到队列中
    siftUp(i, e);
    }
    if (queue[0] == e) {
    leader = null;
    available.signal();
    }
    } finally {
    lock.unlock();
    }
    return true;
    }
    // 按照排序规则,选择合适的位置插入到队列中
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
    int parent = (k - 1) >>> 1;
    RunnableScheduledFuture<?> e = queue[parent];
    // 按照RunnableScheduledFuture的time属性进行排序
    if (key.compareTo(e) >= 0)
    break;
    queue[k] = e;
    setIndex(e, k);
    k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
    }

    // getTask中,核心线程取任务(无超时时间)
    // 如果当前不能获取,就阻塞等待
    public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    for (;;) {
    RunnableScheduledFuture<?> first = queue[0];
    if (first == null)
    available.await();
    else {
    // 调用getDelay方法得到需要延时等待的时间
    long delay = first.getDelay(NANOSECONDS);
    if (delay <= 0)
    return finishPoll(first);
    first = null; // don't retain ref while waiting
    if (leader != null)
    available.await();
    else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
    available.awaitNanos(delay);
    } finally {
    if (leader == thisThread)
    leader = null;
    }
    }
    }
    }
    } finally {
    if (leader == null && queue[0] != null)
    available.signal();
    lock.unlock();
    }
    }

    // getTask中,非核心线程取任务或则核心线程获取任务(允许超时回收)
    public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    for (;;) {
    RunnableScheduledFuture<?> first = queue[0];
    if (first == null) {
    if (nanos <= 0)
    return null;
    else
    nanos = available.awaitNanos(nanos);
    } else {
    long delay = first.getDelay(NANOSECONDS);
    if (delay <= 0)
    return finishPoll(first);
    if (nanos <= 0)
    return null;
    first = null; // don't retain ref while waiting
    if (nanos < delay || leader != null)
    nanos = available.awaitNanos(nanos);
    else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
    long timeLeft = available.awaitNanos(delay);
    nanos -= delay - timeLeft;
    } finally {
    if (leader == thisThread)
    leader = null;
    }
    }
    }
    }
    } finally {
    if (leader == null && queue[0] != null)
    available.signal();
    lock.unlock();
    }
    }

    }
    • DelayedWorkQueue的内部存储是RunnableScheduledFuture类型的数组
    • 提交任务与获取任务用的是同一把锁,但是考虑到其内部要对执行时间做排序,使用同一把锁似乎也是应该的

线程池数量大小的确定

线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。

很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。

上下文切换:

多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换(时间片的轮转调度,线程状态在Runnable状态中,在Running和Ready之间转换)

上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。

Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源(这里说的不是争取锁,而是争取CPU时间片,是由操作系统调度的),这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

第一,线程的初始化,切换,销毁等操作会消耗不小的 cpu 资源,使得 cpu 利用率一直维持在较高水平。第二,线程数较大时,任务会短时间迅速执行,任务的集中执行也会给 cpu 造成较大的压力。第三, 任务的集中支持,会让 cpu 的使用率呈现锯齿状,即短时间内 cpu 飙高,然后迅速下降至闲置状态,cpu 使用的不合理,应该减小线程数,让任务在队列等待,使得 cpu 的使用率应该持续稳定在一个合理,平均的数值范围。所以 cpu 在够用时,不宜过大,不是越大越好。可以通过上线后,观察机器的 cpu 使用率和 cpu 负载两个参数来判断线程数是否合理

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

ForkJoin线程池

  • 继承AbstractExecutorService,属于Executor框架的一员
  • 此线程池用来把大量数据的计算进行拆分(比如一个超大数组的求和),分配给线程池中的多个线程并行去执行,有并行计算那味了

  • Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。还有stream的许多操作底层都用了ForkJoin线程池
  • ForkJoinPool 支持任务窃取机制,能够让所有的线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的情况,所以性能很好
  • ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争

补充

  1. shutdown vs shutdownNow

    1. shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
    2. shutdownNow() :关闭线程池,线程的状态变为 STOP线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 任务列表
  2. isTerminated vs isShutdown

    1. isShutDown

      1
      2
      3
      4
      5
      6
      7
      public boolean isShutdown() {
      return ! isRunning(ctl.get());
      }

      private static boolean isRunning(int c) {
      return c < SHUTDOWN;
      }
      • 只要是非RUNNING的状态都返回true
    2. isTerminated

      1. 线程池在tryTerminate方法中进入TIDYING状态后再进入TERMINATED状态
      1
      2
      3
      public boolean isTerminated() {
      return runStateAtLeast(ctl.get(), TERMINATED);
      }
  3. 注意几个钩子函数的使用(一般就是留给子类去使用)

    1. onShutdown
      1. 在ScheduledThreadPoolExecutor中调用,用来在执行shutdown方法后清除延迟队列中的任务
    2. terminated
      1. tryTerminate中执行完此函数后,由TYDIING状态转为TERMINATED状态
      2. 默认是一个空函数
    3. futureTask 完成任务后执行done钩子函数
    4. runWorker中的beforeExecute与afterExecute 默认是空函数
      1. 如果做定时间隔任务时,前边的任务出现异常,后续的任务会继续执行吗? 答:一旦出现异常,当前的任务与后续的任务都不会再执行,而是卡住,可以通过自定义afterExecute钩子方法来处理异常,保证抛出异常的任务取消,而其他任务继续执行
    5. 还能想到的就是LinkedHashMap覆写了HashMap中的钩子函数实现双向链表删除的维护与维持访问顺序等

参考

  1. Java线程池实现原理及其在美团业务中的实践–美团技术团队
  2. Java线程池学习总结
  3. Java线程池