# Stream 流操作解析

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


Java 8 引入的 Stream 是一种函数式编程风格的集合操作工具,允许我们以声明式方式处理数据。
它并不是数据结构,而是一种数据计算的抽象,支持链式操作、惰性求值以及并行处理。
理解 Stream 的底层原理,有助于提升性能优化和高级框架设计能力。

# 一、Stream 基础概念

# Stream 的定义

  • 非数据结构:不存储数据,仅描述计算过程
  • 一次性消费:类似迭代器,用完即失效
  • 支持并行:基于 ForkJoinPool 实现

# Stream 创建方式

List<String> list = List.of("A", "B", "C");

Stream<String> stream1 = list.stream();                    // 顺序流
Stream<String> stream2 = list.parallelStream();           // 并行流
Stream<Integer> stream3 = Stream.of(1, 2, 3);              // 静态创建
Stream<Double> stream4 = Stream.generate(Math::random);   // 无限流(生成)
Stream<Integer> stream5 = Stream.iterate(0, n -> n + 1);  // 无限流(迭代)
1
2
3
4
5
6
7

# 二、Stream 操作解析

# 中间操作(惰性求值)

中间操作是惰性执行的,只有触发终结操作时才会真正执行整个操作链。

List<String> result = List.of("Tom", "Jerry", "Tom")
    .stream()
    .filter(name -> name.length() > 3)
    .distinct()
    .sorted()
    .limit(2)
    .collect(Collectors.toList());
1
2
3
4
5
6
7
操作 描述
filter 过滤元素
map 映射成另一个类型
flatMap 扁平化嵌套结构
distinct 去重
sorted 排序(支持 Comparator)
limit 限制前 N 个元素
skip 跳过前 N 个元素

# 终结操作

List<String> list = List.of("A", "BB", "CCC");

long count = list.stream().count();

Optional<String> first = list.stream().findFirst();

list.stream().forEach(System.out::println); // 不推荐副作用操作, 没法保证顺序,也不方便调试
1
2
3
4
5
6
7

什么是副作用(Side Effect)?

副作用是指函数执行后,对外部世界产生了影响,例如:

  • 修改了外部变量;
  • 打印日志 / 控制台输出;
  • 操作文件 / 数据库;
  • 改变了集合内容。
操作 描述
count 统计数量
findFirst 获取第一个元素
collect 转换成集合、Map 等
forEach 遍历(注意副作用)
reduce 归约操作,如求和
anyMatch 是否存在满足条件的元素

# collect 高级用法

// 分组
Map<String, List<User>> groupByGender = users.stream()
    .collect(Collectors.groupingBy(User::getGender));

// 统计数量
Map<String, Long> countByGender = users.stream()
    .collect(Collectors.groupingBy(User::getGender, Collectors.counting()));

// 转 Map(处理 key 冲突)
Map<String, Integer> nameToAge = users.stream()
    .collect(Collectors.toMap(User::getName, User::getAge, (a, b) -> b));
1
2
3
4
5
6
7
8
9
10
11

# 三、Stream 底层原理

Stream API 并不是“黑魔法”,它是一套惰性计算、基于流水线的中间操作组合 + 最终执行触发的设计。核心关注:

  • Stream 的创建与封装结构(Pipeline + Sink)
  • Stream 的执行机制(链式装配,惰性求值)
  • 并行流是如何工作的(ForkJoin + Spliterator 分割)

# Stream 的创建

Stream<T> stream = list.stream();
1

底层:

StreamSupport.stream(list.spliterator(), false);
1
方法 说明
StreamSupport.stream(...) 创建串行或并行流
Spliterator<T> 可拆分迭代器,替代 Iterator
AbstractPipeline 所有流的父类(中间管道)

# Stream 的核心类图

BaseStream<T, Stream<T>>
  └── Stream<T>
         └── AbstractPipeline<T, R, S>
                 ├── ReferencePipeline<T, R>      // 引用类型流(String、对象)
                 ├── IntPipeline / LongPipeline  // 基本类型流

1
2
3
4
5
6
  • AbstractPipeline 是中间操作的核心结构,维护前后流的引用(形成 pipeline 链)。

  • 每个操作都构建一个新的 Pipeline 实例,链式引用上下游。

# Stream 执行流程解析(Pull-Push 模型)

List<String> result = list.stream()
    .filter(s -> s.length() > 3)
    .map(String::toUpperCase)
    .collect(Collectors.toList());
1
2
3
4

源码流程简图:

  1. 构建阶段(构建流水线) 每个中间操作(如 filter, map)都会构建一个新的 AbstractPipeline 子类,并维护前一个 pipeline 的引用。

  2. 终结阶段(collect/forEach)触发执行链 进入 TerminalOp.evaluate(...),调用 wrapAndCopyInto() 方法:

    final <P_IN, R> R evaluate(...) {
        return op.evaluateSequential(helper, spliterator);
    }
    
    1
    2
    3

    最终调用:

    copyInto(wrapSink(sink), spliterator); // Sink 被包裹,形成链式调用
    
    1

# Sink:数据流的“接收者”

  • 每个中间操作都会生成一个 Sink,本质上是元素处理器(如 map/flatMap/filter)。
  • 所有 Sink 被构建为“装饰链” → 每个元素按链条顺序传递。

示例链条(伪代码):

Source → Sink_Filter → Sink_Map → Sink_Collector
1
  • push() 模式将元素压入链条头部;
  • 每个 Sink 接收元素处理后再传递给下一个 Sink。