前言
Java 21(LTS)正式引入了虚拟线程(Virtual Threads),这是自Java 5引入
java.util.concurrent
以来,Java并发模型最大的一次变革。虚拟线程的目标不是替代平台线程,而是让”一个请求一个线程”的编程模型能够高效扩展到数百万并发的场景。本文将深入探讨虚拟线程的底层原理、使用方式、注意事项和迁移策略。
为什么需要虚拟线程
graph TB
subgraph 传统模型
R1[请求1] --> PT1[平台线程1]
R2[请求2] --> PT2[平台线程2]
R3[请求3] --> PT3[平台线程3]
RN[请求N] --> PTN[平台线程N]
PTN -.->|线程数受限于OS| LIMIT[上限: 数千个]
end
subgraph 虚拟线程模型
VR1[请求1] --> VT1[虚拟线程1]
VR2[请求2] --> VT2[虚拟线程2]
VR3[请求3] --> VT3[虚拟线程3]
VRM[请求M] --> VTM[虚拟线程M]
VTM -.->|JVM管理调度| NOLIMIT[轻松百万级]
VT1 --> CT1[载体线程1]
VT2 --> CT1
VT3 --> CT2[载体线程2]
VTM --> CT2
end
style LIMIT fill:#f66,stroke:#333
style NOLIMIT fill:#6f6,stroke:#333
传统的平台线程(Platform
Thread)是操作系统线程的薄包装,每个线程占用约1MB的栈空间。当应用需要处理大量并发I/O操作时,线程数量成为瓶颈。常见的解决方案是使用异步编程(CompletableFuture、Reactor),但这牺牲了代码的可读性和调试体验。
虚拟线程解决了这个矛盾:保持同步编程的简洁性,同时获得异步编程的吞吐量。
平台线程 vs 虚拟线程
| 底层实现 |
OS线程 1:1 映射 |
JVM管理,M:N映射到载体线程 |
| 栈空间 |
固定 ~1MB |
动态增长,初始仅几百字节 |
| 创建成本 |
高(系统调用) |
低(纯Java对象) |
| 数量上限 |
数千 |
数百万 |
| 调度 |
OS内核调度 |
JVM ForkJoinPool调度 |
| 适用场景 |
CPU密集型 |
I/O密集型 |
| 线程池 |
需要且推荐 |
不需要池化 |
虚拟线程核心原理
载体线程(Carrier Thread)机制
sequenceDiagram
participant VT as 虚拟线程
participant CT as 载体线程(ForkJoinPool)
participant IO as I/O操作
VT->>CT: 1. 挂载(mount)到载体线程
CT->>CT: 2. 执行业务代码
CT->>IO: 3. 发起阻塞I/O
Note over VT,CT: 4. 虚拟线程卸载(unmount)<br/>载体线程空闲
CT->>CT: 5. 载体线程执行其他虚拟线程
IO-->>VT: 6. I/O完成
VT->>CT: 7. 重新挂载到(可能不同的)载体线程
CT->>CT: 8. 继续执行后续代码
虚拟线程的核心机制是continuation(延续)。当虚拟线程遇到阻塞操作时:
- JVM将虚拟线程的栈帧保存为continuation对象(堆内存)
- 虚拟线程从载体线程上卸载(unmount)
- 载体线程可以去执行其他虚拟线程
- 当阻塞操作完成后,虚拟线程重新挂载到一个可用的载体线程上继续执行
这个过程对开发者完全透明,代码看起来就是同步阻塞的,但底层实现了非阻塞的效果。
基础使用
创建虚拟线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class VirtualThreadBasics {
public static void main(String[] args) throws Exception {
Thread vt1 = Thread.ofVirtual() .name("vt-1") .start(() -> { System.out.println("Running on: " + Thread.currentThread()); System.out.println("Is virtual: " + Thread.currentThread().isVirtual()); }); vt1.join();
Thread vt2 = Thread.startVirtualThread(() -> { System.out.println("Quick virtual thread"); }); vt2.join();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0, 100_000).forEach(i -> executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return i; }) ); } } }
|
百万并发演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class MillionThreadsDemo {
public static void main(String[] args) throws Exception { Instant start = Instant.now();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) { final int id = i; futures.add(executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return "Task-" + id + " done"; })); }
int completed = 0; for (Future<String> future : futures) { future.get(); completed++; } System.out.println("Completed: " + completed); }
Duration elapsed = Duration.between(start, Instant.now()); System.out.println("Total time: " + elapsed.toSeconds() + "s"); } }
|
线程固定(Thread Pinning)问题
线程固定是虚拟线程最重要的性能陷阱。当虚拟线程无法从载体线程上卸载时,会”固定”在载体线程上,阻塞整个载体线程。
graph TB
subgraph 正常情况
VT1[虚拟线程A] -->|阻塞I/O| UNMOUNT[卸载]
UNMOUNT --> FREE[载体线程空闲]
FREE --> VT2[执行其他虚拟线程]
end
subgraph 线程固定
VT3[虚拟线程B] -->|synchronized块内阻塞| PINNED[固定!]
PINNED --> BLOCKED[载体线程被阻塞]
BLOCKED --> STUCK[其他虚拟线程无法运行]
end
style PINNED fill:#f66,stroke:#333
style BLOCKED fill:#f66,stroke:#333
两种导致线程固定的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class ThreadPinningExamples {
private final Object lock = new Object();
public void badSynchronized() { synchronized (lock) { try { Thread.sleep(Duration.ofSeconds(1)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
private final ReentrantLock reentrantLock = new ReentrantLock();
public void goodReentrantLock() { reentrantLock.lock(); try { Thread.sleep(Duration.ofSeconds(1)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { reentrantLock.unlock(); } }
}
|
检测线程固定
通过JVM参数可以检测线程固定:
1 2 3 4 5
| java -Djdk.tracePinnedThreads=full MyApp
java -Djdk.tracePinnedThreads=short MyApp
|
结构化并发(Structured
Concurrency)
JDK 21引入的结构化并发(Preview)让并发任务的管理更加清晰和安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
public record UserProfile(User user, List<Order> orders) {}
public UserProfile fetchUserProfile(long userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<User> userTask = scope.fork(() -> userService.findById(userId)); Subtask<List<Order>> ordersTask = scope.fork(() -> orderService.findByUserId(userId));
scope.join(); scope.throwIfFailed();
return new UserProfile(userTask.get(), ordersTask.get()); } }
public String fetchFromFastestMirror(String resourceId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> mirrorA.fetch(resourceId)); scope.fork(() -> mirrorB.fetch(resourceId)); scope.fork(() -> mirrorC.fetch(resourceId));
scope.join();
return scope.result(); } }
|
性能基准测试
以下是一个模拟I/O密集型场景的基准测试,对比平台线程池和虚拟线程的性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class VirtualThreadBenchmark {
private static final int TASK_COUNT = 100_000; private static final Duration IO_DELAY = Duration.ofMillis(100);
static String simulateIOWork(int taskId) { try { Thread.sleep(IO_DELAY); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Result-" + taskId; }
static long benchmarkPlatformThreads() throws Exception { long start = System.nanoTime();
try (var executor = Executors.newFixedThreadPool(200)) { List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < TASK_COUNT; i++) { final int id = i; futures.add(executor.submit(() -> simulateIOWork(id))); } for (Future<String> f : futures) { f.get(); } }
return (System.nanoTime() - start) / 1_000_000; }
static long benchmarkVirtualThreads() throws Exception { long start = System.nanoTime();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < TASK_COUNT; i++) { final int id = i; futures.add(executor.submit(() -> simulateIOWork(id))); } for (Future<String> f : futures) { f.get(); } }
return (System.nanoTime() - start) / 1_000_000; }
public static void main(String[] args) throws Exception { benchmarkPlatformThreads(); benchmarkVirtualThreads();
long platformTime = benchmarkPlatformThreads(); long virtualTime = benchmarkVirtualThreads();
System.out.println("Platform threads (200 pool): " + platformTime + "ms"); System.out.println("Virtual threads: " + virtualTime + "ms"); System.out.printf("Speedup: %.2fx%n", (double) platformTime / virtualTime); } }
|
典型测试结果(10万任务,每个100ms I/O延迟):
| 平台线程池 |
200 |
~50秒 |
| 平台线程池 |
1000 |
~10秒 |
| 虚拟线程 |
不限 |
~0.3秒 |
Spring Boot 集成
Spring Boot 3.2+ 配置
1 2 3 4 5
| spring: threads: virtual: enabled: true
|
这会让Tomcat使用虚拟线程处理每个HTTP请求,等效于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Configuration public class VirtualThreadConfig {
@Bean public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() { return protocolHandler -> { protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }; }
@Bean public AsyncTaskExecutor applicationTaskExecutor() { return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); } }
|
实际应用:高并发API服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @RestController @RequestMapping("/api/aggregation") public class AggregationController {
private final UserService userService; private final ProductService productService; private final RecommendService recommendService;
@GetMapping("/homepage/{userId}") public ResponseEntity<HomepageData> getHomepage(@PathVariable Long userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> userService.getProfile(userId)); var productsTask = scope.fork(() -> productService.getHotProducts()); var recommendTask = scope.fork(() -> recommendService.getRecommendations(userId));
scope.join(); scope.throwIfFailed();
return ResponseEntity.ok(new HomepageData( userTask.get(), productsTask.get(), recommendTask.get() )); } } }
|
迁移指南与注意事项
适合迁移的场景
- I/O密集型应用:Web服务、微服务网关、数据库操作密集的应用
- 大量并发连接:WebSocket服务、长轮询服务
- 批处理任务:需要并行处理大量独立任务的场景
不适合的场景
- CPU密集型计算:虚拟线程不会加速计算任务,反而增加调度开销
- 依赖ThreadLocal大量数据:虚拟线程数量可能很大,ThreadLocal会占用过多内存
- 依赖synchronized且难以修改的代码:如使用了大量synchronized的第三方库
迁移检查清单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
public class MigrationChecklist {
private final ReentrantLock lock = new ReentrantLock(); public void safeMethod() { lock.lock(); try { blockingCall(); } finally { lock.unlock(); } }
private final Semaphore semaphore = new Semaphore(10); public void rateLimitedCall() throws Exception { semaphore.acquire(); try { externalServiceCall(); } finally { semaphore.release(); } }
private static final ScopedValue<UserContext> CONTEXT = ScopedValue.newInstance();
public void handleRequest(UserContext ctx) { ScopedValue.runWhere(CONTEXT, ctx, () -> { processRequest(); }); }
}
|
最佳实践总结
- 不要池化虚拟线程。虚拟线程创建极其廉价,每个任务创建一个新的虚拟线程即可。
- 用Semaphore限制并发。如果需要限制对外部资源的并发访问,使用Semaphore而非固定大小线程池。
- 替换synchronized为ReentrantLock。避免线程固定问题,尤其是在锁内有阻塞I/O操作时。
- 注意ThreadLocal内存。百万虚拟线程各持有一份ThreadLocal副本可能导致内存问题,考虑使用ScopedValue。
- 监控载体线程。通过JFR(Java Flight
Recorder)监控载体线程的使用情况和线程固定事件。
总结
虚拟线程是Java并发编程的一次范式转变。它让开发者可以用简单的同步代码编写高并发应用,不再需要在”代码可读性”和”系统吞吐量”之间做妥协。关键要点:
- 虚拟线程是JVM管理的轻量级线程,通过M:N调度映射到少量载体线程上
- 阻塞I/O时虚拟线程自动卸载,不会浪费载体线程资源
- 需要注意synchronized导致的线程固定问题,优先使用ReentrantLock
- 结构化并发让并行任务的管理更加清晰和安全
- Spring Boot 3.2+只需一行配置即可启用虚拟线程
- 虚拟线程最适合I/O密集型场景,对CPU密集型任务没有优势