# Stream 流操作解析
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
Java 8 引入的 Stream 是一种函数式编程风格的集合操作工具,允许我们以声明式方式处理数据。
它并不是数据结构,而是一种数据计算的抽象,支持链式操作、惰性求值以及并行处理。
理解 Stream 的底层原理,有助于提升性能优化和高级框架设计能力。
# 一、Stream 基础概念
# Stream 的定义
- 非数据结构:不存储数据,仅描述计算过程
- 一次性消费:类似迭代器,用完即失效
- 支持并行:基于
ForkJoinPool实现
# Stream 创建方式
List<String> list = List.of("A", "B", "C");
Stream<String> stream1 = list.stream(); // 顺序流
Stream<String> stream2 = list.parallelStream(); // 并行流
Stream<Integer> stream3 = Stream.of(1, 2, 3); // 静态创建
Stream<Double> stream4 = Stream.generate(Math::random); // 无限流(生成)
Stream<Integer> stream5 = Stream.iterate(0, n -> n + 1); // 无限流(迭代)
1
2
3
4
5
6
7
2
3
4
5
6
7
# 二、Stream 操作解析
# 中间操作(惰性求值)
中间操作是惰性执行的,只有触发终结操作时才会真正执行整个操作链。
List<String> result = List.of("Tom", "Jerry", "Tom")
.stream()
.filter(name -> name.length() > 3)
.distinct()
.sorted()
.limit(2)
.collect(Collectors.toList());
1
2
3
4
5
6
7
2
3
4
5
6
7
| 操作 | 描述 |
|---|---|
filter | 过滤元素 |
map | 映射成另一个类型 |
flatMap | 扁平化嵌套结构 |
distinct | 去重 |
sorted | 排序(支持 Comparator) |
limit | 限制前 N 个元素 |
skip | 跳过前 N 个元素 |
# 终结操作
List<String> list = List.of("A", "BB", "CCC");
long count = list.stream().count();
Optional<String> first = list.stream().findFirst();
list.stream().forEach(System.out::println); // 不推荐副作用操作, 没法保证顺序,也不方便调试
1
2
3
4
5
6
7
2
3
4
5
6
7
什么是副作用(Side Effect)?
副作用是指函数执行后,对外部世界产生了影响,例如:
- 修改了外部变量;
- 打印日志 / 控制台输出;
- 操作文件 / 数据库;
- 改变了集合内容。
| 操作 | 描述 |
|---|---|
count | 统计数量 |
findFirst | 获取第一个元素 |
collect | 转换成集合、Map 等 |
forEach | 遍历(注意副作用) |
reduce | 归约操作,如求和 |
anyMatch | 是否存在满足条件的元素 |
# collect 高级用法
// 分组
Map<String, List<User>> groupByGender = users.stream()
.collect(Collectors.groupingBy(User::getGender));
// 统计数量
Map<String, Long> countByGender = users.stream()
.collect(Collectors.groupingBy(User::getGender, Collectors.counting()));
// 转 Map(处理 key 冲突)
Map<String, Integer> nameToAge = users.stream()
.collect(Collectors.toMap(User::getName, User::getAge, (a, b) -> b));
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 三、Stream 底层原理
Stream API 并不是“黑魔法”,它是一套惰性计算、基于流水线的中间操作组合 + 最终执行触发的设计。核心关注:
Stream的创建与封装结构(Pipeline + Sink)Stream的执行机制(链式装配,惰性求值)- 并行流是如何工作的(ForkJoin + Spliterator 分割)
# Stream 的创建
Stream<T> stream = list.stream();
1
底层:
StreamSupport.stream(list.spliterator(), false);
1
| 方法 | 说明 |
|---|---|
StreamSupport.stream(...) | 创建串行或并行流 |
Spliterator<T> | 可拆分迭代器,替代 Iterator |
AbstractPipeline | 所有流的父类(中间管道) |
# Stream 的核心类图
BaseStream<T, Stream<T>>
└── Stream<T>
└── AbstractPipeline<T, R, S>
├── ReferencePipeline<T, R> // 引用类型流(String、对象)
├── IntPipeline / LongPipeline // 基本类型流
1
2
3
4
5
6
2
3
4
5
6
AbstractPipeline是中间操作的核心结构,维护前后流的引用(形成 pipeline 链)。每个操作都构建一个新的
Pipeline实例,链式引用上下游。
# Stream 执行流程解析(Pull-Push 模型)
List<String> result = list.stream()
.filter(s -> s.length() > 3)
.map(String::toUpperCase)
.collect(Collectors.toList());
1
2
3
4
2
3
4
源码流程简图:
构建阶段(构建流水线) 每个中间操作(如
filter,map)都会构建一个新的AbstractPipeline子类,并维护前一个 pipeline 的引用。终结阶段(collect/forEach)触发执行链 进入
TerminalOp.evaluate(...),调用wrapAndCopyInto()方法:final <P_IN, R> R evaluate(...) { return op.evaluateSequential(helper, spliterator); }1
2
3最终调用:
copyInto(wrapSink(sink), spliterator); // Sink 被包裹,形成链式调用1
# Sink:数据流的“接收者”
- 每个中间操作都会生成一个
Sink,本质上是元素处理器(如 map/flatMap/filter)。 - 所有
Sink被构建为“装饰链” → 每个元素按链条顺序传递。
示例链条(伪代码):
Source → Sink_Filter → Sink_Map → Sink_Collector
1
push()模式将元素压入链条头部;- 每个
Sink接收元素处理后再传递给下一个 Sink。
← 注解机制