ThreadPool in Java

理清java线程池

​ ThreadPoolExecutor

ThreadPoolExecutor 是一个 [ExecutorService](https://zhida.zhihu.com/search?content_id=253097539&content_type=Article&match_order=1&q=ExecutorService&zhida_source=entity),用于通过多个线程池中的线程来执行每个提交的任务,通常使用 Executors 工厂方法进行配置。

概述

线程池解决了两个不同的问题:

  1. 当执行大量异步任务时,通常能提高性能,因为减少了每个任务调用的开销。
  2. 提供了一种方式来限制和管理执行任务时消耗的资源,包括线程。

每个 ThreadPoolExecutor 还维护一些基本的统计信息,比如已完成任务的数量。

为了在广泛的上下文中提供实用性,该类提供了许多可调整的参数和扩展钩子。存在更方便的 Executors 工厂方法,例如:

  • [Executors.newCachedThreadPool](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html%23newCachedThreadPool)(无边界线程池,自动回收线程)
  • [Executors.newFixedThreadPool](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html%23newFixedThreadPool)(固定大小线程池)
  • [Executors.newSingleThreadExecutor](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html%23newSingleThreadExecutor)(单线程背景线程池)

但事实上并不建议使用,最好还是直接用ThreadPoolExcuter的构造函数,显式的展示全部参数。

线程池的过程

图源美团技术团队博客

ThreadPoolExcuter总共有以下几种构造函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters and default thread factory and rejected execution handler.     * It may be more convenient to use one of the {@link Executors} factory     * methods instead of this general purpose constructor.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @throws IllegalArgumentException if one of the following holds:<br>     *         {@code corePoolSize < 0}<br>     *         {@code keepAliveTime < 0}<br>     *         {@code maximumPoolSize <= 0}<br>     *         {@code maximumPoolSize < corePoolSize}     * @throws NullPointerException if {@code workQueue} is null     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters and default rejected execution handler.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @param threadFactory the factory to use when the executor     *        creates a new thread     * @throws IllegalArgumentException if one of the following holds:<br>     *         {@code corePoolSize < 0}<br>     *         {@code keepAliveTime < 0}<br>     *         {@code maximumPoolSize <= 0}<br>     *         {@code maximumPoolSize < corePoolSize}     * @throws NullPointerException if {@code workQueue}     *         or {@code threadFactory} is null     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters and default thread factory.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @param handler the handler to use when execution is blocked     *        because the thread bounds and queue capacities are reached     * @throws IllegalArgumentException if one of the following holds:<br>     *         {@code corePoolSize < 0}<br>     *         {@code keepAliveTime < 0}<br>     *         {@code maximumPoolSize <= 0}<br>     *         {@code maximumPoolSize < corePoolSize}     * @throws NullPointerException if {@code workQueue}     *         or {@code handler} is null     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial     * parameters.     *     * @param corePoolSize the number of threads to keep in the pool, even     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set     * @param maximumPoolSize the maximum number of threads to allow in the     *        pool     * @param keepAliveTime when the number of threads is greater than     *        the core, this is the maximum time that excess idle threads     *        will wait for new tasks before terminating.     * @param unit the time unit for the {@code keepAliveTime} argument     * @param workQueue the queue to use for holding tasks before they are     *        executed.  This queue will hold only the {@code Runnable}     *        tasks submitted by the {@code execute} method.     * @param threadFactory the factory to use when the executor     *        creates a new thread     * @param handler the handler to use when execution is blocked     *        because the thread bounds and queue capacities are reached     * @throws IllegalArgumentException if one of the following holds:<br>     *         {@code corePoolSize < 0}<br>     *         {@code keepAliveTime < 0}<br>     *         {@code maximumPoolSize <= 0}<br>     *         {@code maximumPoolSize < corePoolSize}     * @throws NullPointerException if {@code workQueue}     *         or {@code threadFactory} or {@code handler} is null     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
```java
总结一下有七个参数那么他有什么作用呢

*   `corePoolSize`:核心线程数
*   `maximumPoolSize`:最大线程数
*   `keepAliveTime`:线程空闲时间
*   `unit`:时间单位
*   `workQueue`:任务队列
*   `threadFactory`:线程工厂。(可以不用
*   `handler`:拒绝策略。(可以不用

线程工厂可以自定义一个

```java
import java.util.concurrent.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    private int threadCount = 0;

    @Override
    public Thread newThread(Runnable r) {
        threadCount++;
        // 创建一个线程并设置其名字为 "Custom-Thread-1", "Custom-Thread-2" 等
        Thread thread = new Thread(r, "Custom-Thread-" + threadCount);
        // 设置线程为非守护线程
        thread.setDaemon(false);
        return thread;
    }
}

常见的任务拒绝处理器

  • ThreadPoolExecutor.AbortPolicy(默认策略):直接抛出 RejectedExecutionException 异常。
  • ThreadPoolExecutor.[CallerRunsPolicy](https://zhida.zhihu.com/search?content_id=253097539&content_type=Article&match_order=1&q=CallerRunsPolicy&zhida_source=entity):由提交任务的线程来运行被拒绝的任务。
  • ThreadPoolExecutor.[DiscardPolicy](https://zhida.zhihu.com/search?content_id=253097539&content_type=Article&match_order=1&q=DiscardPolicy&zhida_source=entity):忽略被拒绝的任务,不抛出异常。
  • ThreadPoolExecutor.[DiscardOldestPolicy](https://zhida.zhihu.com/search?content_id=253097539&content_type=Article&match_order=1&q=DiscardOldestPolicy&zhida_source=entity):丢弃队列中最旧的一个任务,然后尝试提交当前任务。

成员变量如下。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/**
     * The main pool control state, ctl, is an atomic integer packing     * two conceptual fields     *   workerCount, indicating the effective number of threads     *   runState,    indicating whether running, shutting down etc     *     * In order to pack them into one int, we limit workerCount to     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2     * billion) otherwise representable. If this is ever an issue in     * the future, the variable can be changed to be an AtomicLong,     * and the shift/mask constants below adjusted. But until the need     * arises, this code is a bit faster and simpler using an int.     *     * The workerCount is the number of workers that have been     * permitted to start and not permitted to stop.  The value may be     * transiently different from the actual number of live threads,     * for example when a ThreadFactory fails to create a thread when     * asked, and when exiting threads are still performing     * bookkeeping before terminating. The user-visible pool size is     * reported as the current size of the workers set.     *     * The runState provides the main lifecycle control, taking on values:     *     *   RUNNING:  Accept new tasks and process queued tasks     *   SHUTDOWN: Don't accept new tasks, but process queued tasks     *   STOP:     Don't accept new tasks, don't process queued tasks,     *             and interrupt in-progress tasks     *   TIDYING:  All tasks have terminated, workerCount is zero,     *             the thread transitioning to state TIDYING     *             will run the terminated() hook method     *   TERMINATED: terminated() has completed     *     * The numerical order among these values matters, to allow     * ordered comparisons. The runState monotonically increases over     * time, but need not hit each state. The transitions are:     *     * RUNNING -> SHUTDOWN     *    On invocation of shutdown(), perhaps implicitly in finalize()     * (RUNNING or SHUTDOWN) -> STOP     *    On invocation of shutdownNow()     * SHUTDOWN -> TIDYING     *    When both queue and pool are empty     * STOP -> TIDYING     *    When pool is empty     * TIDYING -> TERMINATED     *    When the terminated() hook method has completed     *     * Threads waiting in awaitTermination() will return when the     * state reaches TERMINATED.     *     * Detecting the transition from SHUTDOWN to TIDYING is less     * straightforward than you'd like because the queue may become     * empty after non-empty and vice versa during SHUTDOWN state, but     * we can only terminate if, after seeing that it is empty, we see     * that workerCount is 0 (which sometimes entails a recheck -- see     * below).     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.     * These depend on the bit layout and on workerCount being never negative.     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on     * abrupt termination of a thread (see processWorkerExit). Other     * decrements are performed within getTask.     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    /**
     * The queue used for holding tasks and handing off to worker     * threads.  We do not require that workQueue.poll() returning     * null necessarily means that workQueue.isEmpty(), so rely     * solely on isEmpty to see if the queue is empty (which we must     * do for example when deciding whether to transition from     * SHUTDOWN to TIDYING).  This accommodates special-purpose     * queues such as DelayQueues for which poll() is allowed to     * return null even if it may later return non-null when delays     * expire.     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Lock held on access to workers set and related bookkeeping.     * While we could use a concurrent set of some sort, it turns out     * to be generally preferable to use a lock. Among the reasons is     * that this serializes interruptIdleWorkers, which avoids     * unnecessary interrupt storms, especially during shutdown.     * Otherwise exiting threads would concurrently interrupt those     * that have not yet interrupted. It also simplifies some of the     * associated statistics bookkeeping of largestPoolSize etc. We     * also hold mainLock on shutdown and shutdownNow, for the sake of     * ensuring workers set is stable while separately checking     * permission to interrupt and actually interrupting.     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when     * holding mainLock.     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under     * mainLock.     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of     * worker threads. Accessed only under mainLock.     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that     * ongoing actions are based on freshest values, but without need     * for locking, since no internal invariants depend on them     * changing synchronously with respect to other actions.     */

    /**
     * Factory for new threads. All threads are created using this     * factory (via method addWorker).  All callers must be prepared     * for addWorker to fail, which may reflect a system or user's     * policy limiting the number of threads.  Even though it is not     * treated as an error, failure to create threads may result in     * new tasks being rejected or existing ones remaining stuck in     * the queue.     *     * We go further and preserve pool invariants even in the face of     * errors such as OutOfMemoryError, that might be thrown while     * trying to create threads.  Such errors are rather common due to     * the need to allocate a native stack in Thread.start, and users     * will want to perform clean pool shutdown to clean up.  There     * will likely be enough memory available for the cleanup code to     * complete without encountering yet another OutOfMemoryError.     */
    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.     * Threads use this timeout when there are more than corePoolSize     * present or if allowCoreThreadTimeOut. Otherwise they wait     * forever for new work.     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.     * If true, core threads use keepAliveTime to time out waiting     * for work.     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive     * (and not allow to time out etc) unless allowCoreThreadTimeOut     * is set, in which case the minimum is zero.     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally     * bounded by CAPACITY.     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    /**
     * Permission required for callers of shutdown and shutdownNow.     * We additionally require (see checkShutdownAccess) that callers     * have permission to actually interrupt threads in the worker set     * (as governed by Thread.interrupt, which relies on     * ThreadGroup.checkAccess, which in turn relies on     * SecurityManager.checkAccess). Shutdowns are attempted only if     * these checks pass.     *     * All actual invocations of Thread.interrupt (see     * interruptIdleWorkers and interruptWorkers) ignore     * SecurityExceptions, meaning that the attempted interrupts     * silently fail. In the case of shutdown, they should not fail     * unless the SecurityManager has inconsistent policies, sometimes     * allowing access to a thread and sometimes not. In such cases,     * failure to actually interrupt threads may disable or delay full     * termination. Other uses of interruptIdleWorkers are advisory,     * and failure to actually interrupt will merely delay response to     * configuration changes so is not handled exceptionally.     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    /* The context to be used when executing the finalizer, or null. */
    private final AccessControlContext acc;
```java
### 生命周期管理

线程池运行的状态并不是用户显式设置的而是伴随着线程池的运行由内部来维护线程池内部使用一个变量维护两个值运行状态(runState)和线程数量 (workerCount)在具体实现中线程池将运行状态(runState)线程数量 (workerCount)两个关键参数的维护放在了一起如下代码所示

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

1
2
3
4
5
6
7
8
`ctl`这个AtomicInteger类型是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段 它同时包含两部分的信息线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)高3位保存runState低29位保存workerCount两个变量之间互不干扰用一个变量去存储两个值可避免在做相关决策时出现不一致的情况不必为了维护两者的一致而占用锁资源通过阅读线程池源代码也可以发现经常出现要同时判断线程池运行状态和线程数量的情况线程池也提供了若干方法去供用户获得线程池当前的运行状态线程个数这里都使用的是位运算的方式相比于基本运算速度也会快很多

关于内部封装的获取生命周期状态获取线程池线程数量的计算方法如以下代码所示

```java
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:

图源:美团技术团队博客

其生命周期转换如下所示:

图源:美团技术团队博客

核心池大小和最大池大小

ThreadPoolExecutor 会根据以下两个边界自动调整线程池大小:

  • corePoolSize(参见 [getCorePoolSize](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23getCorePoolSize)
  • maximumPoolSize(参见 [getMaximumPoolSize](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23getMaximumPoolSize)

当调用 execute(Runnable) 提交新任务时,如果正在运行的线程数少于 corePoolSize,则会创建一个新线程来处理任务,即使其他工作线程处于空闲状态。如果运行的线程数大于 corePoolSize 但小于 maximumPoolSize,则只有当队列已满时才会创建新线程。

通过设置 corePoolSizemaximumPoolSize 相同,可以创建一个固定大小的线程池。通过将 maximumPoolSize 设置为一个几乎无限的值(例如 Integer.MAX_VALUE),可以允许池容纳任意数量的并发任务。

通常,核心池和最大池大小仅在构造时设置,但也可以使用以下方法动态更改:

  • [setCorePoolSize](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23setCorePoolSize)
  • [setMaximumPoolSize](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23setMaximumPoolSize)

按需构建

默认情况下,即使是核心线程,也只有在有新任务到达时才会被创建和启动,但可以通过以下方法动态覆盖此行为:

  • [prestartCoreThread](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23prestartCoreThread)
  • [prestartAllCoreThreads](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23prestartAllCoreThreads)

如果使用非空队列构造池,您可能希望预先启动线程。

创建新线程

新线程通过 ThreadFactory 创建。如果未指定 ThreadFactory,则使用 Executors.defaultThreadFactory,它创建的线程属于同一个 ThreadGroup,具有相同的 NORM_PRIORITY 优先级和非守护线程状态。通过提供不同的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护状态等。

如果 ThreadFactory 在请求时未能创建线程(通过返回 null 来表示),则执行器将继续,但可能无法执行任何任务。线程应该具有 “modifyThread” 的 RuntimePermission。如果工作线程或池中使用的其他线程没有该权限,服务可能会受到影响。

保活时间

如果池中的线程数超过 corePoolSize,并且空闲时间超过 keepAliveTime(参见 [getKeepAliveTime(TimeUnit)](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23getKeepAliveTime)),则多余的线程将被终止。

这为当池未被积极使用时减少资源消耗提供了一种方式。如果池以后变得更活跃,则会构造新的线程。此参数也可以通过方法 [setKeepAliveTime(long, TimeUnit)](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23setKeepAliveTime) 动态更改。

默认情况下,仅当线程数大于 corePoolSize 时,才会应用保活策略。但是,方法 [allowCoreThreadTimeOut(boolean)](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23allowCoreThreadTimeOut) 可以将此超时策略应用于核心线程,只要 keepAliveTime 的值非零。

队列

可以使用任何 BlockingQueue 来传输和存储提交的任务。使用此队列时与池的大小设置交互:

  • 如果运行的线程数少于 corePoolSize,执行器总是选择添加新线程,而不是将任务放入队列。
  • 如果运行的线程数大于或等于 corePoolSize,执行器总是选择将任务放入队列,而不是添加新线程。
  • 如果无法将任务放入队列,且创建新线程不会超过 maximumPoolSize,则会创建新线程;否则任务将被拒绝。

队列的常见策略:

  1. 直接交接:一个好的默认队列选择是 SynchronousQueue,它直接将任务交给线程,而不存储任务。此策略在处理具有内部依赖的请求集时避免了死锁。
  2. 无边界队列:使用无边界队列(例如,LinkedBlockingQueue,没有预定义容量)会导致当所有 corePoolSize 线程忙碌时,任务会在队列中等待。
  3. 有边界队列:使用有边界队列(例如,ArrayBlockingQueue)有助于防止在与有限的 maximumPoolSize 配合使用时出现资源耗尽。

拒绝任务

当执行器已经关闭,或者在使用有限的最大线程数和任务队列容量的情况下,且池已饱和时,提交的新任务将被 拒绝

在这两种情况下,execute 方法会调用其 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法。提供了四种预定义的拒绝处理策略:

  1. AbortPolicy:处理器抛出运行时 RejectedExecutionException
  2. CallerRunsPolicy:调用 execute 的线程自己执行任务。
  3. DiscardPolicy:不能执行的任务会被丢弃。
  4. DiscardOldestPolicy:如果执行器没有关闭,任务队列头部的任务会被丢弃,然后重试执行新任务。

可以定义并使用其他类型的 RejectedExecutionHandler 类。

钩子方法

此类提供了可重写的受保护钩子方法:beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable),它们在执行每个任务之前和之后调用。可以用它们来操作执行环境,例如,重新初始化 ThreadLocals,收集统计信息或添加日志条目。此外,terminated 方法可以被重写,在执行器完全终止后执行特殊处理。

如果钩子或回调方法抛出异常,内部工作线程可能会失败并被异常终止。

队列维护

方法 [getQueue()](https://link.zhihu.com/?target=https%3A//docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html%23getQueue) 允许访问任务队列,通常用于监控和调试。强烈不建议将此方法用于其他目的。

终结

当池不再被程序引用并且没有剩余线程时,池会自动被 shutdown。如果您希望即使用户忘记调用 shutdown,未使用的池也能被回收,那么必须安排让未使用的线程最终死亡。

扩展示例

大多数此类的扩展会重写一个或多个受保护的钩子方法。例如,下面是一个添加了简单暂停/恢复功能的子类:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(...) { super(...); }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}
```java
20个线程池相关常见问题  
线程池的创建与配置  

---------------------------

*   **如何选择合适的线程池类型**

*   `FixedThreadPool`:适用于需要固定数量线程的场景
*   `CachedThreadPool`:适用于任务数量不确定且需要灵活管理线程的场景
*   `ScheduledThreadPool`:适用于需要定时或周期性执行任务的场景
*   `SingleThreadExecutor`:适用于需要单线程执行任务的场景

*   **如何合理配置线程池参数**

*   `corePoolSize`:核心线程数线程池中始终保持的线程数量
*   `maximumPoolSize`:最大线程数线程池允许创建的最大线程数量
*   `keepAliveTime`:当线程数超过核心线程数时多余的空闲线程在终止前等待新任务的最长时间
*   `workQueue`:用于存放待执行任务的阻塞队列常见的有`LinkedBlockingQueue``ArrayBlockingQueue`。
*   `threadFactory`:用于创建新线程的工厂可以自定义线程的名称优先级等
*   `rejectedExecutionHandler`:当任务无法被线程池执行时的处理策略

*   **线程池参数设置不合理的问题**

*   **核心线程数设置过高**导致系统资源被大量线程占用降低系统性能
*   **最大线程数设置过高**可能导致系统资源耗尽甚至崩溃
*   **线程空闲时间设置过短**可能导致线程频繁创建和销毁增加系统开销

线程池的使用与管理

*   **如何提交任务到线程池**

*   使用`execute(Runnable command)`方法提交任务
*   使用`submit(Callable task)`方法提交带返回值的任务

*   **如何处理线程池中的任务异常**

*   在任务中捕获异常并记录日志
*   使用`Future`对象获取任务执行结果并在完成后检查异常

*   **如何关闭线程池**

*   使用`shutdown()`方法平滑关闭线程池等待正在执行的任务完成
*   使用`shutdownNow()`方法立即停止所有正在执行的任务并尝试停止所有正在执行的任务

线程池的性能与优化

*   **线程池过载问题**

*   **原因**线程池中的线程数量过多导致系统资源耗尽
*   **解决方法**调整线程池参数限制线程数量使用分布式线程池将任务分配到多个节点

*   **任务队列过大或过小的问题**

*   **队列过小**任务频繁拒绝导致任务执行延迟
*   **队列过大**可能导致内存溢出增加延迟
*   **解决方法**根据业务需求调整队列大小并使用合适的拒绝策略

*   **线程池资源泄露问题**

*   **原因**在任务中使用了外部资源如数据库连接文件句柄而未能及时释放
*   **解决方法**确保在每个任务完成后释放所有使用的资源

线程池的线程管理

*   **线程饥饿死锁问题**  
    

*   **原因**线程在等待某种条件时没有被其他线程唤醒占用了线程池的资源
*   **解决方法**合理设置任务的超时时间避免长时间阻塞

*   **线程泄露问题**

*   **原因**线程可能因为未能正确处理异常而永远卡在某个状态
*   **解决方法**在任务中捕获异常并确保线程能够正常结束

*   **如何实现非核心线程延迟死亡**

*   当线程池处于运行状态且当前线程数大于最大核心线程数且获取任务超时时间`keepAliveTime`非核心线程会延迟死亡

线程池的任务调度

*   **如何实现核心线程一直保活**

*   通过阻塞队列的`take()`方法让线程一直等待使得`Worker``run()`方法一直阻塞直到获取到执行的任务执行完毕任务后继续阻塞等待保证核心线程一直存活

*   **如何释放核心线程**

*   `allowCoreThreadTimeOut`这个成员变量设置为`true`,核心线程在执行`keepAliveTime`后未获取到执行任务时也会移出线程池

*   **非核心线程能成为核心线程吗**

*   创建`Worker`线程类并没有识别出表示一个线程是核心线程还是非核心线程

线程池的并发安全

*   **线程池如何保证并发安全**

*   线程池状态和工作线程数量的并发通过`AtomicInteger`等原子类来保证

*   **线程池中的任务并发执行问题**

*   可以通过`CompletableFuture`来实现并发多个任务的执行并在所有任务完成后进行回调处理

线程池的监控与调试

*   **如何监控线程池的状态**

*   通过`ThreadPoolExecutor``getActiveCount()`、`getCompletedTaskCount()`等方法来监控线程池的运行状态

*   **如何调试线程池中的问题**

*   通过日志记录异常捕获等方式来调试线程池中的问题

线程池的拒绝策略

*   **如何选择合适的拒绝策略**

*   **AbortPolicy**抛出`RejectedExecutionException`异常表示拒绝执行任务
*   **CallerRunsPolicy**由调用线程执行被拒绝的任务
*   **DiscardPolicy**静默丢弃被拒绝的任务
*   **DiscardOldestPolicy**丢弃队列中等待时间最长的任务然后重新尝试执行新任务

线程池
===

我们知道线程的创建和销毁是比较耗时的操作因此如果我们需要创建大量的线程那么势必会影响程序的运行效率因此我们可以利用线程池这个池化的思想来解决这个问题

线程池在初始化的时候就会创建一组线程任务被提交会以队列的形式加入到线程池中线程池中的线程会去执行任务队列中的任务当一个线程执行完任务后它会再从任务队列中取出下一个任务来执行

有两种比较常见的线程池形式一种是固定线程池(FixedThreadPool)另一种是动态线程池(CachedThreadPool)前者线程池的大小是固定的后者会在空闲时销毁线程线程池的大小会根据需要自动调整

但根据开发规范和经验我们应该禁止使用这两种线程池而应该手动 `new ThreadPoolExecutor` 创建线程池这是因为当任务很多并且处理不来的时候 FixedPool 会因为任务被积压到任务队列中撑爆内存引起 OOM(Out Of Memory) CachedPool 会不断的创建线程来执行任务这同样会导致撑爆内存引起 OOM同时过多的线程切换也会引起严重的性能损失

大多数情况下我们需要的是闲时保留一定的线程核心线程),忙时创建线程直到达到设定的最大线程数时停止创建来不及处理的任务放到定长的任务队列中当任务队列满的时候触发拒绝策略在线程池闲下来的时候销毁线程将线程池中的线程数量回收到核心线程数

工作原理
----

线程池在初始化的时候就会创建 corePoolSize 个线程来池化以备进来的任务使用当有任务提交到线程池时线程池就会调用池中空闲的线程进行处理当线程池中的线程都在处理任务时也就是线程池中线程数达到 corePoolSize ),如果再提交任务线程池就会将这个任务存放在 workQueue 阻塞队列中等待线程池中的线程完成任务释放出来再按照顺序从 workQueue 中取出任务来执行而这个 workQueue 也是有大小的 workQueue 已满时线程池就会开始**创建**注意不是调用已有线程新的线程来执行任务而如果任务继续多起来达到 maximumPoolSize 限制时线程池就会采用 handler 这个拒绝策略来处理这个任务

而创建出来的线程也就是在 maxiumPoolSize 范围内的线程 keepAliveTime 时间内都空闲且为被调用则创建出来的这个线程会被回收

因此我们可以从此窥见线程池的核心想法我们希望 corePoolSize 数量的线程大部分处于工作实现资源的高利用率为了避免线程的创建与销毁引入阻塞队列为了高可用性设置了最大线程数和拒绝策略既可以实现削峰填谷的效果又可以避免线程的过多创建与销毁提高系统的稳定性

但是其最大的问题就在于没有消息的持久化机制相较于MQ来说线程池的消息持久化机制较弱一旦线程池中的线程异常退出或者重启死机则消息也就丢失了还有就是线程池只能作用于一个节点对于分布式系统来说一般还是MQ较多

线程池参数
-----

```java


public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue<Runnable> workQueue,

                          RejectedExecutionHandler handler)

corePoolSize:核心线程数,线程池创建时就创建的线程数量。

maximumPoolSize:最大线程数,线程池中允许的最大线程数量。

keepAliveTime:线程存活时间,当线程池中数量 nnn 符合 corePoolSize<=n<=maximumPoolSizecorePoolSize <= n <= maximumPoolSizecorePoolSize<=n<=maximumPoolSize 的线程的空闲时间超过该值时,线程会被回收。

unit:时间单位,keepAliveTime 的时间单位。

workQueue:任务队列,阻塞队列,用于存放等待执行的任务,注意它们都是线程安全的。

  1. ArrayBlockingQueue:一个有界数组阻塞队列,按FIFO(先进先出)的原则对元素进行排序。
  2. LinkedBlockingQueue: 基于链表无界FIFO阻塞队列,吞吐量高于 ArrayBlockingQueue(出于链表插入和删除的常数时间复杂度)。
  3. SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等待一个移除操作,否则会一直阻塞,吞吐量高于 LinkedBlockingQueue(无锁算法)。
  4. priorityBlockingQueue: 一个支持优先级排序的阻塞队列。

handler:拒绝策略,当线程池中的线程数达到最大值时,如何处理新提交的任务。

  1. AbortPolicy: 直接抛出异常,默认策略。
  2. CallerRunsPolicy: 用调用者所在的线程来执行任务。
  3. DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务。
  4. DiscardPolicy: 直接丢弃任务;

类型

单一线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34


public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService(

      new ThreadPoolExecutor(1, 1, /* 核心线程数和最大线程数均为1 */

                             0L, TimeUnit.MILLISECONDS, /* 多余空闲线程存活时间 */

                             new LinkedBlockingQueue<Runnable>() /* 无界队列 */));

}




```java
这种线程池只有一个线程也就是说当有任务提交到线程池时线程池就会顺序执行

### 固定线程池

```java


public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads, /* 核心线程数和最大线程数相同 */

                                  0L, TimeUnit.MILLISECONDS, /* 多余空闲线程存活时间 */

                                  new LinkedBlockingQueue<Runnable>() /* 无界队列 */);

}

线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。

可变线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73


public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, /* 核心线程数为0,最大线程数为无穷 */

                                  60L, TimeUnit.SECONDS, /* 多余空闲线程可以存活一分钟 */

                                  new SynchronousQueue<Runnable>() /* 任务队列为同步队列 */ );

}




```java
线程池的线程数量不固定可以根据需要自动调整线程池中的线程空闲超过一定时间就会被回收而且出于同步队列的特性任务的执行顺序是按照任务的提交顺序执行的

核心方法
----

### submit()

提交一个任务到线程池中并等待线程池进行execute()方法的调用通过submit方法提交的Callable任务会被封装成了一个FutureTask对象通过Executor.execute方法提交FutureTask到线程池中等待被执行最终执行的是FutureTask的run方法

### execute()

线程池执行任务的过程是这样的 `execute -> addWorker -> runWorker`。

这里的 execute 方法是 Executor 接口中唯一的执行任务的方法它会将 Runnable 或者 Callable 对象提交到线程池中等待线程池的调度

### shutdown()

关闭线程池不再接受新的任务

细节问题
----

### IO/CPU密集型

最常见的就是设置合理的 corePoolSize  maximumPoolSize这个要分为 IO 密集型  CPU 密集型 两种情况设置太小无法完全利用CPU资源还容易出现线程饥饿或是创建销毁的重操作设置太大线程切换开销大资源浪费频繁的调度会影响吞吐量

如果是 IO 密集型的项目可以适当调大 corePoolSize  maximumPoolSize这样可以提高吞吐量如果是 CPU 密集型的项目则可以适当调小 corePoolSize  maximumPoolSize减少线程切换的开销

总的来说 CPU密集型 需要尽可能少的线程 IO密集型 需要尽可能多的线程

### 为什么阿里巴巴规范禁止线程池使用Executors创建

阿里巴巴开发规范当中明确说明创建线程池要用 ThreadPoolExecutor而不是 Executors因为前者需要我们显式地指出各种线程池的参数以此明确线程池的运行规则避免出现一些隐性的错误Executors 类提供了一些静态方法来创建线程池但是这些方法并没有提供足够的控制比如线程池的大小队列的大小线程的名称线程的优先级等因此我们应该使用 ThreadPoolExecutor 来创建线程池

使用示例
----

```java


public static void main(String[] args) {

  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

  // ExecutorService executor = Executors.newFixedThreadPool(5); // 这个跟上面的效果是一样的

  for (int i = 0; i < 10; i++) {

    executor.execute(() -> {

      System.out.println(i + " is running");

    })

  }

}
If you have any questions, please contact me via the repo. Issues are welcome.
Built with Hugo
Theme Stack designed by Jimmy