金三银四面试突击必问Java面试之线程池核心源码深度剖析 !

好的,我们来深入剖析 ThreadPoolExecutor 的核心源码,帮助你理解其内部工作原理,这在面试中常被考察。以下分析基于OpenJDK的实现(以常见版本为例,思路一致)。


核心成员变量

ThreadPoolExecutor 的关键状态由 ctl 一个 int 型变量维护,它同时存储了线程池状态工作线程数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

通过位运算拆分:

private static final int COUNT_BITS = Integer.SIZE - 3; // 29位用于计数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 最大线程数

// 状态存储在高3位
private static final int RUNNING    = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000
private static final int STOP       =  1 << COUNT_BITS; // 001
private static final int TIDYING    =  2 << COUNT_BITS; // 010
private static final int TERMINATED =  3 << COUNT_BITS; // 011

  • 状态转换RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

任务队列与工作线程

  1. 任务队列 (workQueue)

    private final BlockingQueue<Runnable> workQueue;
    

    • 存放待执行的任务(如 LinkedBlockingQueue, SynchronousQueue)。
  2. 工作线程 (workers)

    private final HashSet<Worker> workers = new HashSet<>();
    

    • Worker 继承 AQS(实现非重入锁),本质是 Runnable,封装了实际执行任务的线程。

核心方法:execute(Runnable command)

这是提交任务的入口:

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // 当前线程数 < 核心线程数
        if (addWorker(command, true)) // 尝试新建核心线程
            return;
        c = ctl.get(); // 若失败则重新获取ctl
    }
    
    if (isRunning(c) && workQueue.offer(command)) { // 任务入队
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command)) // 二次检查状态
            reject(command); // 若已关闭则拒绝任务
        else if (workerCountOf(recheck) == 0) // 无工作线程时需新建非核心线程
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 队列已满,尝试新建非核心线程
        reject(command); // 若线程数已达最大值则拒绝
}

关键点

  • 优先创建核心线程(即使队列未满)。
  • 队列满时才创建非核心线程(不超过 maximumPoolSize)。
  • 二次检查:防止任务入队后线程池突然关闭。

核心方法:addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
    // 1. 循环CAS增加线程数
    while (true) {
        int c = ctl.get();
        if (runStateAtLeast(c, SHUTDOWN)) // 状态校验
            return false;
        
        int wc = workerCountOf(c);
        if (wc >= (core ? corePoolSize : maximumPoolSize)) // 数量校验
            return false;
        
        if (compareAndIncrementWorkerCount(c)) // CAS增加线程数
            break;
    }

    // 2. 创建Worker并启动线程
    Worker w = new Worker(firstTask);
    Thread t = w.thread;
    if (t != null) {
        workers.add(w); // 加入线程集合
        t.start(); // 启动线程
    }
    return true;
}

注意Worker 的构造函数中会调用 thread.start(),最终执行 runWorker(this)


核心方法:runWorker(Worker w)

这是工作线程的执行循环:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 循环获取任务
            w.lock(); // 加锁保证任务执行不被中断
            if (runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted())
                wt.interrupt(); // 处理线程中断
            try {
                beforeExecute(wt, task); // 钩子方法
                task.run(); // 执行任务
                afterExecute(task, null); // 钩子方法
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出处理
    }
}

关键点

  • getTask():从队列中阻塞或超时获取任务(核心线程可无限等待,非核心线程超时后销毁)。
  • 锁机制Worker 的锁用于保证任务执行期间不被线程池中断。

任务获取:getTask()

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        // 状态检查...
        boolean timed = allowCoreThreadTimeOut || workerCountOf(c) > corePoolSize;
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 非核心线程超时获取
                workQueue.take(); // 核心线程阻塞获取
            if (r != null) return r;
            timedOut = true; // 获取超时
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

线程回收逻辑

  • allowCoreThreadTimeOut=true,核心线程也会超时销毁。
  • 非核心线程在 keepAliveTime 内未获取到任务则销毁。

拒绝策略

当线程池饱和(队列满且线程数达最大值)或已关闭时,触发拒绝策略:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this); // 调用RejectedExecutionHandler
}

JDK 内置策略:

  1. AbortPolicy(默认):抛出 RejectedExecutionException
  2. CallerRunsPolicy:由提交任务的线程直接执行。
  3. DiscardPolicy:静默丢弃。
  4. DiscardOldestPolicy:丢弃队列头部的任务并重试。

总结

  1. 状态压缩ctl 高3位状态 + 低29位线程数。
  2. 任务调度:核心线程优先创建 → 任务入队 → 非核心线程创建 → 拒绝策略。
  3. 线程回收:通过 getTask() 中的超时机制实现。
  4. Worker设计:继承AQS实现任务执行锁,避免中断干扰。

掌握这些核心逻辑,面试时便能从容应对线程池源码的提问!

Logo

立足具身智能前沿赛道,致力于搭建全球化、开源化、全栈式技术交流与实践共创平台。

更多推荐