Java · #java#virtual-threads#loom#jdk21

Java虚拟线程(Project Loom)深度解析

2025.04.16 Java 8 min 3.4k
// 目录 · contents

前言

Java 21(LTS)正式引入了虚拟线程(Virtual Threads),这是自Java 5引入 java.util.concurrent 以来,Java并发模型最大的一次变革。虚拟线程的目标不是替代平台线程,而是让”一个请求一个线程”的编程模型能够高效扩展到数百万并发的场景。本文将深入探讨虚拟线程的底层原理、使用方式、注意事项和迁移策略。

为什么需要虚拟线程

graph TB
    subgraph 传统模型
        R1[请求1] --> PT1[平台线程1]
        R2[请求2] --> PT2[平台线程2]
        R3[请求3] --> PT3[平台线程3]
        RN[请求N] --> PTN[平台线程N]
        PTN -.->|线程数受限于OS| LIMIT[上限: 数千个]
    end

    subgraph 虚拟线程模型
        VR1[请求1] --> VT1[虚拟线程1]
        VR2[请求2] --> VT2[虚拟线程2]
        VR3[请求3] --> VT3[虚拟线程3]
        VRM[请求M] --> VTM[虚拟线程M]
        VTM -.->|JVM管理调度| NOLIMIT[轻松百万级]

        VT1 --> CT1[载体线程1]
        VT2 --> CT1
        VT3 --> CT2[载体线程2]
        VTM --> CT2
    end

    style LIMIT fill:#f66,stroke:#333
    style NOLIMIT fill:#6f6,stroke:#333

传统的平台线程(Platform Thread)是操作系统线程的薄包装,每个线程占用约1MB的栈空间。当应用需要处理大量并发I/O操作时,线程数量成为瓶颈。常见的解决方案是使用异步编程(CompletableFuture、Reactor),但这牺牲了代码的可读性和调试体验。

虚拟线程解决了这个矛盾:保持同步编程的简洁性,同时获得异步编程的吞吐量。

平台线程 vs 虚拟线程

特性 平台线程 虚拟线程
底层实现 OS线程 1:1 映射 JVM管理,M:N映射到载体线程
栈空间 固定 ~1MB 动态增长,初始仅几百字节
创建成本 高(系统调用) 低(纯Java对象)
数量上限 数千 数百万
调度 OS内核调度 JVM ForkJoinPool调度
适用场景 CPU密集型 I/O密集型
线程池 需要且推荐 不需要池化

虚拟线程核心原理

载体线程(Carrier Thread)机制

sequenceDiagram
    participant VT as 虚拟线程
    participant CT as 载体线程(ForkJoinPool)
    participant IO as I/O操作

    VT->>CT: 1. 挂载(mount)到载体线程
    CT->>CT: 2. 执行业务代码
    CT->>IO: 3. 发起阻塞I/O
    Note over VT,CT: 4. 虚拟线程卸载(unmount)<br/>载体线程空闲
    CT->>CT: 5. 载体线程执行其他虚拟线程
    IO-->>VT: 6. I/O完成
    VT->>CT: 7. 重新挂载到(可能不同的)载体线程
    CT->>CT: 8. 继续执行后续代码

虚拟线程的核心机制是continuation(延续)。当虚拟线程遇到阻塞操作时:

  1. JVM将虚拟线程的栈帧保存为continuation对象(堆内存)
  2. 虚拟线程从载体线程上卸载(unmount)
  3. 载体线程可以去执行其他虚拟线程
  4. 当阻塞操作完成后,虚拟线程重新挂载到一个可用的载体线程上继续执行

这个过程对开发者完全透明,代码看起来就是同步阻塞的,但底层实现了非阻塞的效果。

基础使用

创建虚拟线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class VirtualThreadBasics {

public static void main(String[] args) throws Exception {

// 方式1:Thread.ofVirtual()
Thread vt1 = Thread.ofVirtual()
.name("vt-1")
.start(() -> {
System.out.println("Running on: " + Thread.currentThread());
System.out.println("Is virtual: " + Thread.currentThread().isVirtual());
});
vt1.join();

// 方式2:Thread.startVirtualThread() 快捷方法
Thread vt2 = Thread.startVirtualThread(() -> {
System.out.println("Quick virtual thread");
});
vt2.join();

// 方式3:Executors.newVirtualThreadPerTaskExecutor()
// 推荐在服务端应用中使用
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 100_000).forEach(i ->
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
})
);
} // executor.close() 会等待所有任务完成
}
}

百万并发演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MillionThreadsDemo {

public static void main(String[] args) throws Exception {
Instant start = Instant.now();

// 创建100万个虚拟线程,每个线程sleep 1秒
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();

for (int i = 0; i < 1_000_000; i++) {
final int id = i;
futures.add(executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return "Task-" + id + " done";
}));
}

// 等待所有任务完成
int completed = 0;
for (Future<String> future : futures) {
future.get();
completed++;
}
System.out.println("Completed: " + completed);
}

Duration elapsed = Duration.between(start, Instant.now());
System.out.println("Total time: " + elapsed.toSeconds() + "s");
// 100万个线程,每个sleep 1秒,总耗时约2-3秒(而非100万秒)
}
}

线程固定(Thread Pinning)问题

线程固定是虚拟线程最重要的性能陷阱。当虚拟线程无法从载体线程上卸载时,会”固定”在载体线程上,阻塞整个载体线程。

graph TB
    subgraph 正常情况
        VT1[虚拟线程A] -->|阻塞I/O| UNMOUNT[卸载]
        UNMOUNT --> FREE[载体线程空闲]
        FREE --> VT2[执行其他虚拟线程]
    end

    subgraph 线程固定
        VT3[虚拟线程B] -->|synchronized块内阻塞| PINNED[固定!]
        PINNED --> BLOCKED[载体线程被阻塞]
        BLOCKED --> STUCK[其他虚拟线程无法运行]
    end

    style PINNED fill:#f66,stroke:#333
    style BLOCKED fill:#f66,stroke:#333

两种导致线程固定的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ThreadPinningExamples {

private final Object lock = new Object();

/**
* 情况1:synchronized块中执行阻塞操作 -- 会导致线程固定!
*/
public void badSynchronized() {
synchronized (lock) {
// 虚拟线程被固定在载体线程上
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

/**
* 修复方案:使用ReentrantLock替代synchronized
*/
private final ReentrantLock reentrantLock = new ReentrantLock();

public void goodReentrantLock() {
reentrantLock.lock();
try {
// 虚拟线程可以正常卸载
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
reentrantLock.unlock();
}
}

/**
* 情况2:native方法/JNI调用中的阻塞 -- 也会导致线程固定
* 这种情况通常无法避免,但影响较小
*/
}

检测线程固定

通过JVM参数可以检测线程固定:

1
2
3
4
5
# 线程固定时打印堆栈
java -Djdk.tracePinnedThreads=full MyApp

# 线程固定时只打印简短信息
java -Djdk.tracePinnedThreads=short MyApp

结构化并发(Structured Concurrency)

JDK 21引入的结构化并发(Preview)让并发任务的管理更加清晰和安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 结构化并发:并行获取用户信息和订单列表
* 两个子任务中任何一个失败,另一个会被自动取消
*/
public record UserProfile(User user, List<Order> orders) {}

public UserProfile fetchUserProfile(long userId) throws Exception {

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

// 并行发起两个子任务
Subtask<User> userTask = scope.fork(() ->
userService.findById(userId));
Subtask<List<Order>> ordersTask = scope.fork(() ->
orderService.findByUserId(userId));

// 等待所有子任务完成
scope.join();
// 如果有任务失败,抛出异常
scope.throwIfFailed();

// 两个任务都成功,组合结果
return new UserProfile(userTask.get(), ordersTask.get());
}
}

/**
* ShutdownOnSuccess: 任一子任务成功即返回,取消其余任务
* 适用于从多个源获取相同数据,取最快的结果
*/
public String fetchFromFastestMirror(String resourceId) throws Exception {

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {

scope.fork(() -> mirrorA.fetch(resourceId));
scope.fork(() -> mirrorB.fetch(resourceId));
scope.fork(() -> mirrorC.fetch(resourceId));

scope.join();

return scope.result(); // 返回最先成功的结果
}
}

性能基准测试

以下是一个模拟I/O密集型场景的基准测试,对比平台线程池和虚拟线程的性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class VirtualThreadBenchmark {

private static final int TASK_COUNT = 100_000;
private static final Duration IO_DELAY = Duration.ofMillis(100);

/**
* 模拟I/O操作
*/
static String simulateIOWork(int taskId) {
try {
Thread.sleep(IO_DELAY);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result-" + taskId;
}

/**
* 固定大小平台线程池
*/
static long benchmarkPlatformThreads() throws Exception {
long start = System.nanoTime();

try (var executor = Executors.newFixedThreadPool(200)) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < TASK_COUNT; i++) {
final int id = i;
futures.add(executor.submit(() -> simulateIOWork(id)));
}
for (Future<String> f : futures) {
f.get();
}
}

return (System.nanoTime() - start) / 1_000_000;
}

/**
* 虚拟线程
*/
static long benchmarkVirtualThreads() throws Exception {
long start = System.nanoTime();

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < TASK_COUNT; i++) {
final int id = i;
futures.add(executor.submit(() -> simulateIOWork(id)));
}
for (Future<String> f : futures) {
f.get();
}
}

return (System.nanoTime() - start) / 1_000_000;
}

public static void main(String[] args) throws Exception {
// 预热
benchmarkPlatformThreads();
benchmarkVirtualThreads();

// 正式测试
long platformTime = benchmarkPlatformThreads();
long virtualTime = benchmarkVirtualThreads();

System.out.println("Platform threads (200 pool): " + platformTime + "ms");
System.out.println("Virtual threads: " + virtualTime + "ms");
System.out.printf("Speedup: %.2fx%n", (double) platformTime / virtualTime);
}
}

典型测试结果(10万任务,每个100ms I/O延迟):

方案 线程池大小 耗时
平台线程池 200 ~50秒
平台线程池 1000 ~10秒
虚拟线程 不限 ~0.3秒

Spring Boot 集成

Spring Boot 3.2+ 配置

1
2
3
4
5
# application.yml
spring:
threads:
virtual:
enabled: true # 一行配置启用虚拟线程

这会让Tomcat使用虚拟线程处理每个HTTP请求,等效于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class VirtualThreadConfig {

@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}

// 异步任务也使用虚拟线程
@Bean
public AsyncTaskExecutor applicationTaskExecutor() {
return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
}
}

实际应用:高并发API服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@RestController
@RequestMapping("/api/aggregation")
public class AggregationController {

private final UserService userService;
private final ProductService productService;
private final RecommendService recommendService;

/**
* 聚合接口:并行调用多个下游服务
* 使用虚拟线程后,每个请求都有自己的线程
* 内部的并行调用也使用虚拟线程,不会耗尽线程池
*/
@GetMapping("/homepage/{userId}")
public ResponseEntity<HomepageData> getHomepage(@PathVariable Long userId)
throws Exception {

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

var userTask = scope.fork(() -> userService.getProfile(userId));
var productsTask = scope.fork(() -> productService.getHotProducts());
var recommendTask = scope.fork(() ->
recommendService.getRecommendations(userId));

scope.join();
scope.throwIfFailed();

return ResponseEntity.ok(new HomepageData(
userTask.get(),
productsTask.get(),
recommendTask.get()
));
}
}
}

迁移指南与注意事项

适合迁移的场景

  1. I/O密集型应用:Web服务、微服务网关、数据库操作密集的应用
  2. 大量并发连接:WebSocket服务、长轮询服务
  3. 批处理任务:需要并行处理大量独立任务的场景

不适合的场景

  1. CPU密集型计算:虚拟线程不会加速计算任务,反而增加调度开销
  2. 依赖ThreadLocal大量数据:虚拟线程数量可能很大,ThreadLocal会占用过多内存
  3. 依赖synchronized且难以修改的代码:如使用了大量synchronized的第三方库

迁移检查清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 迁移前的检查清单
*/
public class MigrationChecklist {

// 1. 将synchronized替换为ReentrantLock
// Before:
// synchronized (this) { blockingCall(); }

// After:
private final ReentrantLock lock = new ReentrantLock();
public void safeMethod() {
lock.lock();
try {
blockingCall();
} finally {
lock.unlock();
}
}

// 2. 避免线程池限流 -- 使用Semaphore替代
// Before:
// ExecutorService pool = Executors.newFixedThreadPool(10);

// After:
private final Semaphore semaphore = new Semaphore(10);
public void rateLimitedCall() throws Exception {
semaphore.acquire();
try {
externalServiceCall();
} finally {
semaphore.release();
}
}

// 3. 检查ThreadLocal使用
// 虚拟线程中ThreadLocal仍然可用,但注意内存
// 优先使用ScopedValue(Preview in JDK 21)
private static final ScopedValue<UserContext> CONTEXT = ScopedValue.newInstance();

public void handleRequest(UserContext ctx) {
ScopedValue.runWhere(CONTEXT, ctx, () -> {
processRequest();
});
}

// 4. 不要池化虚拟线程
// 虚拟线程是廉价的,不需要池化
// 始终使用 newVirtualThreadPerTaskExecutor()
}

最佳实践总结

  1. 不要池化虚拟线程。虚拟线程创建极其廉价,每个任务创建一个新的虚拟线程即可。
  2. 用Semaphore限制并发。如果需要限制对外部资源的并发访问,使用Semaphore而非固定大小线程池。
  3. 替换synchronized为ReentrantLock。避免线程固定问题,尤其是在锁内有阻塞I/O操作时。
  4. 注意ThreadLocal内存。百万虚拟线程各持有一份ThreadLocal副本可能导致内存问题,考虑使用ScopedValue。
  5. 监控载体线程。通过JFR(Java Flight Recorder)监控载体线程的使用情况和线程固定事件。

总结

虚拟线程是Java并发编程的一次范式转变。它让开发者可以用简单的同步代码编写高并发应用,不再需要在”代码可读性”和”系统吞吐量”之间做妥协。关键要点:

  • 虚拟线程是JVM管理的轻量级线程,通过M:N调度映射到少量载体线程上
  • 阻塞I/O时虚拟线程自动卸载,不会浪费载体线程资源
  • 需要注意synchronized导致的线程固定问题,优先使用ReentrantLock
  • 结构化并发让并行任务的管理更加清晰和安全
  • Spring Boot 3.2+只需一行配置即可启用虚拟线程
  • 虚拟线程最适合I/O密集型场景,对CPU密集型任务没有优势
作者 · authorzt
发布 · date2025-04-16
篇幅 · length3.4k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论