Java · #java#stream#functional-programming#lambda

Java Stream API高级用法与性能优化

2025.02.05 Java 10 min 4.0k
// 目录 · contents

引言

Java 8引入的Stream API彻底改变了Java开发者处理集合数据的方式。它提供了一种声明式的、函数式的数据处理范式,让代码更加简洁、可读。然而,Stream API远不止filtermapcollect这些基础操作——自定义Collector、并行流的正确使用、性能陷阱的规避,这些进阶主题往往决定了你能否在生产环境中真正用好它。

本文将从Stream的核心原理出发,逐步深入到高级用法和性能优化,帮助你从”会用”提升到”用好”Stream API。

Stream处理流程

graph LR
    Source[数据源<br/>Collection/Array/Generator] --> Pipeline[流水线]

    subgraph Pipeline[中间操作链 - 惰性求值]
        A[filter] --> B[map]
        B --> C[sorted]
        C --> D[distinct]
    end

    Pipeline --> Terminal[终端操作<br/>collect/reduce/forEach]
    Terminal --> Result[最终结果]

    style Source fill:#e3f2fd
    style Terminal fill:#e8f5e9
    style Result fill:#fff3e0

Stream的核心特性是惰性求值——中间操作不会立即执行,只有当终端操作被调用时,整个流水线才会启动处理。这意味着多个中间操作会被融合(loop fusion)为一次遍历,大幅减少了迭代次数。

Stream的创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class StreamCreation {

public static void main(String[] args) {
// 1. 从集合创建
List<String> names = List.of("Alice", "Bob", "Charlie");
Stream<String> fromList = names.stream();

// 2. 从数组创建
int[] numbers = {1, 2, 3, 4, 5};
IntStream fromArray = Arrays.stream(numbers);

// 3. Stream.of 创建
Stream<String> fromOf = Stream.of("a", "b", "c");

// 4. Stream.generate 无限流(需配合limit使用)
Stream<UUID> uuids = Stream.generate(UUID::randomUUID).limit(10);

// 5. Stream.iterate 迭代生成
Stream<Long> fibonacci = Stream.iterate(
new long[]{0, 1},
pair -> new long[]{pair[1], pair[0] + pair[1]}
).map(pair -> pair[0]).limit(20);

// 6. IntStream.range 区间流
IntStream range = IntStream.rangeClosed(1, 100);

// 7. 文件行流(自动关闭资源)
try (Stream<String> lines = Files.lines(Path.of("data.txt"))) {
lines.filter(line -> !line.isBlank())
.forEach(System.out::println);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

// 8. Pattern正则分割流
Stream<String> words = Pattern.compile("\\s+")
.splitAsStream("hello world java stream");
}
}

中间操作与终端操作

常用中间操作组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class IntermediateOperations {

record Employee(String name, String department, double salary, int age) {}

public static void main(String[] args) {
List<Employee> employees = List.of(
new Employee("张三", "技术部", 25000, 28),
new Employee("李四", "技术部", 30000, 35),
new Employee("王五", "市场部", 20000, 30),
new Employee("赵六", "技术部", 28000, 32),
new Employee("钱七", "市场部", 22000, 27),
new Employee("孙八", "人事部", 18000, 26)
);

// flatMap: 将嵌套结构展平
List<List<Integer>> nested = List.of(
List.of(1, 2, 3),
List.of(4, 5),
List.of(6, 7, 8, 9)
);
List<Integer> flat = nested.stream()
.flatMap(Collection::stream)
.toList();
// [1, 2, 3, 4, 5, 6, 7, 8, 9]

// peek: 调试利器,不影响流水线但可观察元素
List<Employee> techHighSalary = employees.stream()
.filter(e -> "技术部".equals(e.department()))
.peek(e -> System.out.println("过滤后: " + e.name()))
.filter(e -> e.salary() > 26000)
.peek(e -> System.out.println("高薪: " + e.name()))
.sorted(Comparator.comparingDouble(Employee::salary).reversed())
.toList();

// takeWhile / dropWhile (Java 9+)
List<Integer> sorted = List.of(1, 2, 3, 5, 8, 13, 21);
List<Integer> lessThan10 = sorted.stream()
.takeWhile(n -> n < 10)
.toList(); // [1, 2, 3, 5, 8]

List<Integer> from10 = sorted.stream()
.dropWhile(n -> n < 10)
.toList(); // [13, 21]

// mapMulti (Java 16+): 替代filter+map或flatMap的轻量方案
List<Integer> evenDoubled = List.of(1, 2, 3, 4, 5).stream()
.<Integer>mapMulti((n, consumer) -> {
if (n % 2 == 0) {
consumer.accept(n * 2);
}
})
.toList(); // [4, 8]
}
}

Collectors深入

graph TD
    Collectors[Collectors 工具类] --> Basic[基础收集]
    Collectors --> Grouping[分组收集]
    Collectors --> Partition[分区收集]
    Collectors --> Reducing[归约收集]
    Collectors --> Downstream[下游收集器]

    Basic --> toList[toList]
    Basic --> toSet[toSet]
    Basic --> toMap[toMap]
    Basic --> joining[joining]

    Grouping --> groupingBy[groupingBy]
    Partition --> partitioningBy[partitioningBy]

    Downstream --> counting[counting]
    Downstream --> summarizing[summarizingDouble]
    Downstream --> mapping[mapping]
    Downstream --> collectingAndThen[collectingAndThen]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class CollectorsAdvanced {

record Employee(String name, String department, double salary, int age) {}

public static void main(String[] args) {
List<Employee> employees = getEmployees();

// 1. groupingBy + 下游收集器组合
// 按部门分组,统计每个部门的薪资情况
Map<String, DoubleSummaryStatistics> salaryStatsByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::department,
Collectors.summarizingDouble(Employee::salary)
));
// {技术部=DoubleSummaryStatistics{count=3, sum=83000, min=25000, avg=27666.67, max=30000}}

// 2. 多级分组:按部门再按年龄段分组
Map<String, Map<String, List<Employee>>> multiGroup = employees.stream()
.collect(Collectors.groupingBy(
Employee::department,
Collectors.groupingBy(e -> e.age() < 30 ? "青年" : "中年")
));

// 3. toMap处理key冲突:取薪资较高者
Map<String, Employee> highestSalaryByDept = employees.stream()
.collect(Collectors.toMap(
Employee::department,
Function.identity(),
BinaryOperator.maxBy(Comparator.comparingDouble(Employee::salary))
));

// 4. collectingAndThen: 收集后再转换
List<Employee> unmodifiableList = employees.stream()
.filter(e -> e.salary() > 20000)
.collect(Collectors.collectingAndThen(
Collectors.toList(),
Collections::unmodifiableList
));

// 5. teeing (Java 12+): 同时用两个收集器处理,然后合并结果
record SalaryRange(double min, double max) {}
SalaryRange range = employees.stream()
.collect(Collectors.teeing(
Collectors.minBy(Comparator.comparingDouble(Employee::salary)),
Collectors.maxBy(Comparator.comparingDouble(Employee::salary)),
(min, max) -> new SalaryRange(
min.map(Employee::salary).orElse(0.0),
max.map(Employee::salary).orElse(0.0)
)
));

// 6. reducing: 自定义归约
Optional<Employee> highestPaid = employees.stream()
.collect(Collectors.reducing(
BinaryOperator.maxBy(Comparator.comparingDouble(Employee::salary))
));
}
}

自定义Collector

当内置的Collector不能满足需求时,可以通过实现Collector接口来创建自定义收集器。

sequenceDiagram
    participant Stream as Stream元素
    participant Supplier as supplier()<br/>创建容器
    participant Accumulator as accumulator()<br/>累加元素
    participant Combiner as combiner()<br/>合并容器
    participant Finisher as finisher()<br/>最终转换

    Supplier->>Accumulator: 1. 创建空容器
    Stream->>Accumulator: 2. 逐个添加元素
    Stream->>Accumulator: 2. 逐个添加元素
    Note over Combiner: 3. 并行流时合并多个容器
    Accumulator->>Combiner: 容器A
    Accumulator->>Combiner: 容器B
    Combiner->>Finisher: 4. 对合并结果做最终转换
    Finisher->>Finisher: 5. 返回最终结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自定义Collector: 将流元素收集为不可变的ImmutableList(示例用ArrayList模拟)
* 并在过程中过滤掉null值
*/
public class NonNullListCollector<T> implements Collector<T, List<T>, List<T>> {

@Override
public Supplier<List<T>> supplier() {
// 创建累加容器
return ArrayList::new;
}

@Override
public BiConsumer<List<T>, T> accumulator() {
// 累加逻辑:跳过null元素
return (list, element) -> {
if (element != null) {
list.add(element);
}
};
}

@Override
public BinaryOperator<List<T>> combiner() {
// 合并两个容器(并行流场景)
return (left, right) -> {
left.addAll(right);
return left;
};
}

@Override
public Function<List<T>, List<T>> finisher() {
// 最终转换为不可变列表
return Collections::unmodifiableList;
}

@Override
public Set<Characteristics> characteristics() {
// 不包含IDENTITY_FINISH因为finisher做了转换
return Set.of(Characteristics.CONCURRENT);
}

public static <T> NonNullListCollector<T> toNonNullList() {
return new NonNullListCollector<>();
}
}

// 使用自定义Collector
List<String> result = Stream.of("hello", null, "world", null, "java")
.collect(NonNullListCollector.toNonNullList());
// ["hello", "world", "java"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 自定义Collector: 滑动窗口收集器
* 将流元素按指定窗口大小分组
*/
public class SlidingWindowCollector<T>
implements Collector<T, List<List<T>>, List<List<T>>> {

private final int windowSize;
private final int step;
private int count = 0;

public SlidingWindowCollector(int windowSize, int step) {
this.windowSize = windowSize;
this.step = step;
}

@Override
public Supplier<List<List<T>>> supplier() {
return ArrayList::new;
}

@Override
public BiConsumer<List<List<T>>, T> accumulator() {
return (windows, element) -> {
// 判断是否需要开启新窗口
if (count % step == 0 || windows.isEmpty()) {
windows.add(new ArrayList<>());
}
count++;
// 向所有未满的窗口添加元素
for (List<T> window : windows) {
if (window.size() < windowSize) {
window.add(element);
}
}
};
}

@Override
public BinaryOperator<List<List<T>>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}

@Override
public Function<List<List<T>>, List<List<T>>> finisher() {
return Function.identity();
}

@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.IDENTITY_FINISH);
}

public static <T> SlidingWindowCollector<T> sliding(int windowSize, int step) {
return new SlidingWindowCollector<>(windowSize, step);
}
}

并行流的原理与陷阱

并行流工作原理

graph TD
    Source[数据源] --> Fork1[Fork: 分割任务]
    Fork1 --> Sub1[子任务1<br/>处理1-250]
    Fork1 --> Sub2[子任务2<br/>处理251-500]
    Fork1 --> Sub3[子任务3<br/>处理501-750]
    Fork1 --> Sub4[子任务4<br/>处理751-1000]

    Sub1 --> Join1[Join: 合并结果]
    Sub2 --> Join1
    Sub3 --> Join2[Join: 合并结果]
    Sub4 --> Join2

    Join1 --> FinalJoin[最终合并]
    Join2 --> FinalJoin
    FinalJoin --> Result[最终结果]

    style Source fill:#e3f2fd
    style FinalJoin fill:#e8f5e9
    style Result fill:#fff3e0

并行流底层使用ForkJoinPool,默认使用公共的ForkJoinPool,线程数等于Runtime.getRuntime().availableProcessors() - 1

并行流的正确使用与常见陷阱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ParallelStreamPitfalls {

// 陷阱1: 在并行流中使用有状态的操作
public static void statefulBug() {
List<Integer> result = new ArrayList<>(); // 非线程安全!

// 错误做法:并行流写入共享可变集合
IntStream.rangeClosed(1, 1000)
.parallel()
.forEach(result::add); // 会丢失元素或抛出异常

System.out.println(result.size()); // 很可能不是1000

// 正确做法:使用收集器
List<Integer> correctResult = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.toList(); // 线程安全的收集
}

// 陷阱2: 数据源不适合并行拆分
public static void badDataSource() {
// LinkedList拆分代价高,不适合并行流
LinkedList<Integer> linkedList = new LinkedList<>(List.of(1, 2, 3, 4, 5));
linkedList.stream().parallel(); // 性能可能比顺序流更差

// ArrayList、数组、IntStream.range等拆分效率高,适合并行
int[] array = {1, 2, 3, 4, 5};
Arrays.stream(array).parallel(); // 高效拆分
}

// 陷阱3: 操作本身开销太小,并行带来的线程调度开销反而更大
public static void tooLightweight() {
List<Integer> numbers = IntStream.rangeClosed(1, 100).boxed().toList();

// 错误:简单操作不值得并行
int sum1 = numbers.parallelStream()
.mapToInt(Integer::intValue)
.sum();

// 正确:IO密集或计算密集型操作才值得并行
List<String> results = urls.parallelStream()
.map(url -> fetchFromRemote(url)) // IO密集操作
.toList();
}

// 正确使用:自定义ForkJoinPool隔离任务
public static void customForkJoinPool() throws Exception {
List<Integer> data = IntStream.rangeClosed(1, 10000).boxed().toList();

// 使用自定义线程池,避免阻塞公共ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
try {
List<Integer> result = customPool.submit(() ->
data.parallelStream()
.filter(n -> heavyComputation(n))
.toList()
).get();
} finally {
customPool.shutdown();
}
}

private static boolean heavyComputation(int n) {
// 模拟耗时计算
try { Thread.sleep(1); } catch (InterruptedException ignored) {}
return n % 2 == 0;
}
}

性能基准测试

使用JMH(Java Microbenchmark Harness)对不同Stream操作进行基准测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(1)
public class StreamBenchmark {

@Param({"1000", "100000", "1000000"})
private int size;

private List<Integer> arrayList;
private LinkedList<Integer> linkedList;
private int[] array;

@Setup
public void setup() {
array = new int[size];
arrayList = new ArrayList<>(size);
linkedList = new LinkedList<>();
for (int i = 0; i < size; i++) {
array[i] = ThreadLocalRandom.current().nextInt(1000);
arrayList.add(array[i]);
linkedList.add(array[i]);
}
}

@Benchmark
public int forLoop() {
int sum = 0;
for (int i = 0; i < arrayList.size(); i++) {
if (arrayList.get(i) > 500) {
sum += arrayList.get(i) * 2;
}
}
return sum;
}

@Benchmark
public int sequentialStream() {
return arrayList.stream()
.filter(n -> n > 500)
.mapToInt(n -> n * 2)
.sum();
}

@Benchmark
public int parallelStream() {
return arrayList.parallelStream()
.filter(n -> n > 500)
.mapToInt(n -> n * 2)
.sum();
}

@Benchmark
public int primitiveStream() {
return Arrays.stream(array)
.filter(n -> n > 500)
.map(n -> n * 2)
.sum();
}
}

典型测试结果(数据仅供参考):

方法 1,000元素 100,000元素 1,000,000元素
for循环 1.2 us 120 us 1,200 us
顺序Stream 2.1 us 180 us 1,800 us
并行Stream 45 us 95 us 650 us
原始类型Stream 0.8 us 80 us 800 us

关键结论: - 小数据量下,传统for循环最快,并行流因线程调度开销反而最慢。 - 大数据量下,并行流的优势才能体现。 - 使用原始类型流IntStreamLongStreamDoubleStream)避免装箱/拆箱,性能提升显著。

实际项目中的最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class StreamBestPractices {

// 1. 优先使用方法引用提高可读性
public List<String> getActiveUserNames(List<User> users) {
return users.stream()
.filter(User::isActive)
.map(User::getName)
.sorted()
.toList();
}

// 2. 避免在Stream中产生副作用
// 错误示例
public void badPractice(List<Order> orders) {
AtomicInteger count = new AtomicInteger(0);
orders.stream()
.filter(Order::isPaid)
.forEach(order -> {
count.incrementAndGet(); // 副作用!
sendNotification(order); // 副作用!
});
}

// 正确示例
public void goodPractice(List<Order> orders) {
List<Order> paidOrders = orders.stream()
.filter(Order::isPaid)
.toList();

// 副作用操作放在Stream外部
paidOrders.forEach(this::sendNotification);
int count = paidOrders.size();
}

// 3. 对于复杂的流操作,抽取为独立方法
public Map<String, List<OrderSummary>> getOrderReport(List<Order> orders) {
return orders.stream()
.filter(this::isValidOrder)
.map(this::toOrderSummary)
.collect(Collectors.groupingBy(OrderSummary::category));
}

private boolean isValidOrder(Order order) {
return order.isPaid()
&& order.getAmount().compareTo(BigDecimal.ZERO) > 0
&& order.getCreatedAt().isAfter(LocalDateTime.now().minusDays(30));
}

private OrderSummary toOrderSummary(Order order) {
return new OrderSummary(
order.getId(),
order.getCategory(),
order.getAmount(),
order.getCreatedAt()
);
}

// 4. 使用Optional与Stream结合
public Optional<User> findFirstAdmin(List<User> users) {
return users.stream()
.filter(u -> u.getRole() == Role.ADMIN)
.filter(User::isActive)
.findFirst();
}

// 5. Stream不可复用,需要多次使用时用Supplier
public void reuseStream(List<Integer> data) {
Supplier<Stream<Integer>> streamSupplier = () -> data.stream()
.filter(n -> n > 0);

long count = streamSupplier.get().count();
Optional<Integer> max = streamSupplier.get().max(Integer::compareTo);
List<Integer> list = streamSupplier.get().toList();
}
}

总结

Java Stream API是一套功能强大的数据处理工具,但要在生产环境中用好它需要深入理解其原理和边界:

  1. 惰性求值是Stream的核心设计,理解它才能写出高效的流水线。
  2. 内置Collector涵盖了绝大多数收集需求,teeingcollectingAndThen等高级组合非常实用。
  3. 自定义Collector为复杂场景提供了灵活性,核心是理解supplier-accumulator-combiner-finisher四步模型。
  4. 并行流并非总是更快——小数据量、非CPU密集操作、不适合拆分的数据源都不应该使用并行流。
  5. 原始类型流能显著减少装箱开销,在数值计算场景中应优先使用。
  6. 保持Stream操作的无副作用性,将副作用操作提取到流水线之外。

掌握这些要点,你就能在项目中自信地使用Stream API,写出既优雅又高效的代码。

作者 · authorzt
发布 · date2025-02-05
篇幅 · length4.0k 字 · 10 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论