跳到主要内容
📖 本章预览

本章为预览版本,展示部分核心内容。完整内容包含详细源码解析、实战代码和面试要点,加入知识星球即可解锁全部章节。

第八章:线程在等 IO 任务却在排队 — 标准线程池的致命缺陷

本章目标:理解标准线程池在 IO 密集型场景下的延迟问题,掌握 EagerDtpExecutor 通过 TaskQueue.offer() 返回 false 反转入队优先级的核心技巧,理解 OrderedDtpExecutor 的 Hash 路由 + ChildExecutor 串行消费设计。

标准线程池 vs EagerDtpExecutor


8.1 标准线程池的问题

标准 ThreadPoolExecutor 的行为是:

核心线程满 → 入队 → 队列满 → 创建非核心线程 → 拒绝

这个策略在 CPU 密集型场景下没问题。但在 IO 密集型场景(网络请求、DB 查询)下,线程大部分时间在等待 IO,入队反而增加了延迟。

理想行为是:优先创建新线程处理,而不是排队等待。

DynamicTP 提供了 4 种线程池变体,每种针对特定场景优化,且都继承自 DtpExecutor,自动享受动态配置、监控、告警能力。


8.2 EagerDtpExecutor — 反转入队优先级

关键在自定义的 TaskQueue,通过重写 offer() 方法改变入队行为:

public class TaskQueue extends VariableLinkedBlockingQueue<Runnable> {

private transient EagerDtpExecutor executor;

@Override
public boolean offer(@NonNull Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException(
"The task queue does not have executor.");
}

// 情况1:已达最大线程数,无法再创建线程,只能入队
if (executor.getPoolSize() == executor.getMaximumPoolSize()) {
return super.offer(runnable);
}

// 情况2:有空闲线程,入队让空闲线程处理
if (executor.getSubmittedTaskCount() <= executor.getPoolSize()) {
return super.offer(runnable);
}

// 情况3:关键!还没到 max,返回 false 让线程池创建新线程
if (executor.getPoolSize() < executor.getMaximumPoolSize()) {
return false;
}

// 兜底:入队
return super.offer(runnable);
}

/**
* 强制入队,用于并发竞争时的兜底
*/
public boolean force(Runnable o, long timeout, TimeUnit unit)
throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown.");
}
return super.offer(o, timeout, unit);
}
}

核心原理:ThreadPoolExecutor.execute() 内部调用 workQueue.offer(command),如果返回 false,线程池会尝试创建新线程。TaskQueue 利用这个机制,在线程数未达 max 时故意返回 false,触发线程创建。


8.3 并发竞争处理

多线程同时提交任务时可能出现竞态条件:TaskQueue 返回 false → 线程池尝试创建线程 → 但此时其他线程已经把线程数推到 max → 抛 RejectedExecutionException

EagerDtpExecutor 用 submittedTaskCount 追踪未完成任务数,并在拒绝时尝试强制入队:

public class EagerDtpExecutor extends DtpExecutor {

// 已提交但未完成的任务数
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 提交前计数 +1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// 被拒绝了,但可能是并发竞争导致的
if (getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue) getQueue();
try {
// 尝试强制入队(绕过 offer() 的返回 false 逻辑)
if (!queue.force(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();

🔒 解锁完整内容

本章剩余内容需要解锁后查看

以上仅为本章部分预览内容,完整内容包含更多深度源码解析、实战代码和面试要点。

加入知识星球你将获得:

  • ✅ 全部 12 章完整内容 + 持续更新
  • ✅ 配套源码 + 实战项目
  • ✅ 一对一答疑 + 面试辅导
  • ✅ 简历优化 + 内推机会

📚 本章完整目录

以下为本章完整目录结构,加入知识星球即可解锁全部内容。

8.4 OrderedDtpExecutor — 顺序执行

8.5 其他变体

8.6 所有变体都继承 DtpExecutor

8.7 变体总览

8.8 本章涉及的设计模式