强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第10章:Java 虚拟线程 —— Loom 项目的新篇章

第10章:Java 虚拟线程 —— Loom 项目的新篇章

10.1 为什么 Java 需要虚拟线程?

Java 生态中的主流异步方案(CompletableFuture、Reactor、RxJava)虽然解决了性能问题,但带来了巨大的认知负担调试困难

传统线程模型的困境

// 传统方式:一个请求一个线程
// 问题:10,000 个并发请求 = 10,000 个线程 = ~10GB 内存
ExecutorService pool = Executors.newFixedThreadPool(200);

pool.submit(() -> {
    var user = db.findUser(userId);        // 阻塞 50ms
    var orders = db.findOrders(userId);    // 阻塞 50ms
    var recs = callRecommendation(userId); // 阻塞 100ms
    return buildResponse(user, orders, recs);
});

异步方式的困境

// 异步方式:代码复杂度爆炸
CompletableFuture.supplyAsync(() -> db.findUser(userId))
    .thenCompose(user -> 
        CompletableFuture.allOf(
            db.findOrders(userId),
            callRecommendation(userId)
        ).thenApply(results -> buildResponse(user, results))
    )
    .exceptionally(err -> handleError(err));

Project Loom 的答案

Project Loom 的目标:让简单的阻塞代码也能高效运行。

// 虚拟线程:阻塞但高效
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> {
        var user = db.findUser(userId);        // 阻塞,但不占用平台线程
        var orders = db.findOrders(userId);    // 阻塞,但不占用平台线程
        var recs = callRecommendation(userId); // 阻塞,但不占用平台线程
        return buildResponse(user, orders, recs);
    });
}

10.2 虚拟线程基础

创建虚拟线程

// 方式一:直接创建
Thread vt = Thread.ofVirtual().name("my-vt").start(() -> {
    System.out.println("运行在虚拟线程: " + Thread.currentThread());
});

// 方式二:使用 ExecutorService(推荐)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 100_000; i++) {
        final int taskId = i;
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            return taskId;
        });
    }
} // 自动等待所有任务完成

// 方式三:虚拟线程工厂
ThreadFactory factory = Thread.ofVirtual().factory();
Thread vt = factory.newThread(() -> doWork());
vt.start();

虚拟线程 vs 平台线程

特性平台线程(Platform Thread)虚拟线程(Virtual Thread)
实现1:1 映射 OS 线程M:N 映射到载体线程
栈大小~1MB(固定)~几百字节初始(按需增长)
创建开销~1ms~几μs
最大数量数千数百万
阻塞代价高(OS 线程阻塞)低(自动卸载载体线程)
适用场景CPU 密集型I/O 密集型

载体线程(Carrier Thread)

虚拟线程的工作原理:

     虚拟线程 1     虚拟线程 2     虚拟线程 3
     (运行中)       (阻塞中)       (运行中)
         │               │               │
         │               │  (挂载到栈)   │
         ▼               ▼               ▼
     ┌─────────┐    ┌─────────┐    ┌─────────┐
     │载体线程 0│    │载体线程 1│    │载体线程 2│
     │(OS线程) │    │(OS线程) │    │(OS线程) │
     └─────────┘    └─────────┘    └─────────┘

当虚拟线程阻塞时(如 I/O、sleep、锁):
  → 虚拟线程的栈帧被复制到堆内存
  → 载体线程被释放去执行其他虚拟线程
  → 阻塞结束后,虚拟线程被重新调度到载体线程

10.3 结构化并发(Structured Concurrency)

概念

结构化并发确保:

  • 子任务的生命周期不超过父任务
  • 父任务可以看到所有子任务的结果
  • 取消父任务时,所有子任务也被取消
// Java 21 预览(StructuredTaskScope)
import jdk.incubator.concurrent.StructuredTaskScope;

Response handle(Request request) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        // 启动子任务
        Subtask<User> userTask = scope.fork(() -> findUser(request.userId()));
        Subtask<List<Order>> orderTask = scope.fork(() -> fetchOrders(request.userId()));
        Subtask<Recs> recsTask = scope.fork(() -> getRecommendations(request.userId()));

        // 等待所有子任务完成或失败
        scope.join().throwIfFailed();

        // 获取结果(此时所有子任务都已完成)
        return new Response(userTask.get(), orderTask.get(), recsTask.get());
    }
}

ShutdownOnFailure vs ShutdownOnSuccess

策略行为适用场景
ShutdownOnFailure任一失败则取消所有需要所有结果
ShutdownOnSuccess任一成功则取消其余竞速(race)
// 竞速模式:从多个源获取数据,使用第一个成功的
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> fetchFromPrimary());
    scope.fork(() -> fetchFromBackup1());
    scope.fork(() -> fetchFromBackup2());

    String result = scope.join().result();
    // 第一个成功的结果,其余被取消
}

10.4 虚拟线程的陷阱

陷阱一:Pinning(钉住载体线程)

当虚拟线程在 synchronized 块中执行阻塞操作时,载体线程会被"钉住"(pinned),无法释放。

// ❌ 问题代码:synchronized 块中的阻塞
synchronized (lock) {
    // 阻塞操作会导致载体线程被 pin
    db.query(sql);  // 这会 pin 住载体线程!
}

// ✅ 解决方案:使用 ReentrantLock
lock.lock();
try {
    db.query(sql);  // 不会 pin 住载体线程
} finally {
    lock.unlock();
}

Pinning 检测

# 启动时添加诊断参数
java -Djdk.tracePinnedThreads=short MyApp

陷阱二:ThreadLocal 的内存问题

// ❌ 每个虚拟线程创建 ThreadLocal 副本 — 可能耗尽内存
static ThreadLocal<Connection> connLocal = ThreadLocal.withInitial(() -> createConnection());

// 在 100 万个虚拟线程中,创建 100 万个 Connection!
for (int i = 0; i < 1_000_000; i++) {
    vtExecutor.submit(() -> {
        Connection conn = connLocal.get();  // 💥
        conn.query(sql);
    });
}

// ✅ 使用 ScopedValue(Java 21 预览)
private static final ScopedValue<Connection> SCOPED_CONN = ScopedValue.newInstance();

ScopedValue.runWhere(SCOPED_CONN, createConnection(), () -> {
    Connection conn = SCOPED_CONN.get();
    conn.query(sql);
});

陷阱三:不要池化虚拟线程

// ❌ 错误:池化虚拟线程没有意义
ExecutorService pool = Executors.newFixedThreadPool(100); // 100 个虚拟线程的池

// ✅ 正确:每个任务一个虚拟线程
ExecutorService vt = Executors.newVirtualThreadPerTaskExecutor();

10.5 与传统异步方案对比

代码对比:数据库查询

// 方式一:传统线程池
ExecutorService pool = Executors.newFixedThreadPool(200);
Future<User> future = pool.submit(() -> db.findUser(id));
User user = future.get(); // 阻塞

// 方式二:CompletableFuture(响应式)
CompletableFuture<User> cf = CompletableFuture.supplyAsync(
    () -> db.findUser(id), ioPool
);
cf.thenApply(user -> process(user))
   .exceptionally(err -> handleError(err));

// 方式三:虚拟线程(推荐)
try (var vt = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<User> future = vt.submit(() -> db.findUser(id));
    User user = future.get(); // 阻塞但高效
}

性能对比

方案10K 并发请求内存代码复杂度调试难度
固定线程池 (200)需排队,延迟高~200MB
CompletableFuture高并发,高吞吐~100MB
虚拟线程高并发,高吞吐~50MB

10.6 Spring Boot + 虚拟线程

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandler() {
        return handler -> {
            handler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

10.7 业务场景:微服务网关

public class GatewayService {
    private final HttpClient client = HttpClient.newBuilder()
        .executor(Executors.newVirtualThreadPerTaskExecutor())
        .build();

    public GatewayResponse handleRequest(GatewayRequest request) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // 并发调用多个微服务
            var userSvc = scope.fork(() -> 
                callService(client, "user-service", request.userId()));
            var orderSvc = scope.fork(() -> 
                callService(client, "order-service", request.userId()));
            var recSvc = scope.fork(() -> 
                callService(client, "rec-service", request.userId()));

            scope.join().throwIfFailed();

            return new GatewayResponse(
                userSvc.get(), orderSvc.get(), recSvc.get()
            );
        } catch (Exception e) {
            throw new GatewayException("聚合失败", e);
        }
    }
}

10.8 本章小结

要点说明
虚拟线程M:N 映射,轻量级,数百万级并发
结构化并发子任务生命周期受父任务管理
Pinningsynchronized 中的阻塞会钉住载体线程
ThreadLocal虚拟线程中慎用,考虑 ScopedValue
适用场景I/O 密集型,如数据库查询、HTTP 调用

下一章预告:Erlang 的轻量级进程模型是 Actor 模型的先驱,它如何实现"九个九"的可用性?


扩展阅读