# 随机数、写时复制列表与原子操作

作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)


# 一、随机数

# Random类及其局限性

在 Java 中,Random 类用于生成伪随机数,其核心原理是基于种子(Seed)的计算。

# 用法举例

@Test
public void testRandom() {
    Random random = new Random();
    for (int i = 0; i < 10; i++) {
        // 随机输出10个0~5之间的随机数
        System.out.println(random.nextInt(5));
    }
}
1
2
3
4
5
6
7
8

在单线程环境下,每次调用 nextInt() 都会基于 前一次的种子值 计算新的种子,从而生成随机数。然而,在多线程环境下,Random 可能会遇到种子竞争问题,导致不同线程生成相同的随机数。

# 工作机制

nextInt(int bound) 方法的实现逻辑如下:

public int nextInt(int bound) {
    // 1. 参数校验
    if (bound <= 0)
        throw new IllegalArgumentException(BadBound);

    // 2. 计算新的种子值
    int r = next(31);
    int m = bound - 1;

    // 3. 计算随机数
    if ((bound & m) == 0) {  // bound 是否为 2 的幂
        r = (int) ((bound * (long) r) >> 31);
    } else {
        int u;
        while ((u = next(31)) - (r = u % bound) + m < 0);
    }
    return r;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

单线程环境 下,这种方式可以保证较好的随机性。但是在 多线程环境 下,多个线程可能会同时读取相同的种子值,然后执行步骤 2 计算新的种子,这会导致多个线程生成相同的随机数

为了解决这个问题,Java 采用 CAS(Compare And Swap) 机制来确保每次只有一个线程能成功更新种子,其他线程必须重新获取新的种子后再计算随机数。

# 线程安全性

Random 使用 AtomicLong 变量存储种子,并通过 CAS 操作保证多线程环境下的随机性:

protected int next(int bits) {
    long nextseed;
    AtomicLong seed = this.seed;
    do {
        // 1. 读取当前种子值
        long oldseed = seed.get();
        // 2. 计算新的种子
        nextseed = (oldseed * multiplier + addend) & mask;
        // 3. CAS 操作更新种子
    } while (!seed.compareAndSet(oldseed, nextseed));

    // 4. 生成随机数
    return (int) (nextseed >>> (48 - bits));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

在多线程环境下:

  1. 线程 A 读取 oldseed,计算 nextseed,尝试更新种子。
  2. 线程 B 也可能读取相同的 oldseed,计算 nextseed,但在 CAS 操作时可能失败。
  3. 失败的线程会重新获取新的 oldseed 并重新计算,直到 CAS 操作成功。

通过这种方式,Random 确保了种子的唯一性,从而避免多个线程生成相同的随机数。

虽然 Random 使用了 CAS 机制,但在高并发环境下仍然可能导致大量线程竞争同一个种子变量,自旋影响性能。

# ThreadLocalRandom

ThreadLocalRandom 是 Java 7 引入的一个随机数生成类,专门为多线程环境设计,它通过为每个线程提供独立的随机数生成器来避免线程间的竞争,从而提高了在多线程场景下的性能。

# 用法举例

@Test
public void testThreadLocalRandom() {
    for (int i = 0; i < 10; i++) {
        // 生成 0~5 之间的随机整数
        System.out.println(ThreadLocalRandom.current().nextInt(5));
    }
}
1
2
3
4
5
6
7

ThreadLocalRandom.current() 获取当前线程的 ThreadLocalRandom 实例。

nextInt(int bound) 用来生成一个指定范围的随机整数。

# 源码解释

  1. ThreadLocalRandom.current() 方法解析

    public static ThreadLocalRandom current() {
        if (U.getInt(Thread.currentThread(), PROBE) == 0)
            localInit();
        return instance;
    }
    
    1
    2
    3
    4
    5

    这里的实现主要包含两部分:U.getInt()localInit()。下面是对每一部分的详细分析。

    1. U.getInt(Thread.currentThread(), PROBE)

      U.getInt()sun.misc.Unsafe 类的一个方法,用于通过 Unsafe API 获取线程的某个字段的值。在这里,它获取当前线程的 PROBE 字段。

      • PROBE 是一个线程局部变量(由 ThreadLocal 实现),用来表示该线程的状态,通常它用来做初始化检查或标记。
      • U.getInt(Thread.currentThread(), PROBE) 返回当前线程的 PROBE 值。如果值为 0,说明该线程尚未完成初始化,因此需要进行初始化操作。

      作用

      • 它的作用是检查当前线程是否已经初始化过 ThreadLocalRandom。如果当前线程没有初始化(PROBE == 0),则会调用 localInit() 来进行初始化。
    2. localInit()

      localInit() 是用来初始化线程局部随机数生成器的关键方法,它为每个线程分配唯一的随机种子和标识符:

      static final void localInit() {
          int p = probeGenerator.addAndGet(PROBE_INCREMENT);  // 获取下一个 probe 值
          int probe = (p == 0) ? 1 : p; // skip 0,避免 probe 值为 0
          long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));  // 生成并获取新的种子
          Thread t = Thread.currentThread();
          U.putLong(t, SEED, seed);  // 将种子值存储到当前线程的 SEED 字段中
          U.putInt(t, PROBE, probe);  // 将 probe 值存储到当前线程的 PROBE 字段中
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
  2. next() 方法

    当调用 nextInt(bound) 方法时,ThreadLocalRandom 会基于当前线程的种子生成一个符合条件的随机整数

    public int nextInt(int bound) {
        // 1. 参数检查
        if (bound <= 0)
            throw new IllegalArgumentException("bound must be positive");
    
        // 2. 生成随机数
        int r = mix32(nextSeed());
        // 3. 优化bound为2的幂次方时的处理
        int m = bound - 1;
        if ((bound & m) == 0)
            r &= m;
        // 4. 处理非2的幂次方时
        else { // If not a power of two, reject over-represented candidates
            for (int u = r >>> 1; u + m - (r = u % bound) < 0; u = mix32(nextSeed()) >>> 1)
                ;
        }
        return r;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

# 相对于Random类优点

  • 每个线程独立的种子:每个线程通过 ThreadLocal 存储自己的种子,避免了线程之间的竞争和冲突。传统的 Random 类在多线程环境下会因为共享种子而导致多个线程产生相同的随机数,而 ThreadLocalRandom 则消除了这种问题。
  • 避免了锁竞争:传统的 Random 类如果在多线程环境下使用,需要同步机制来保证线程安全,导致多个线程可能会在访问时自旋等待(例如,Random 通过 synchronized 来控制对种子的更新)。而 ThreadLocalRandom 则通过 ThreadLocal 为每个线程提供独立的种子,因此避免了自旋等待和同步带来的性能损耗。
  • 提高并发性能:每个线程使用独立的随机数种子,可以并行生成随机数,避免了线程间的相互干扰和锁竞争,从而提高了并发性能。在高并发场景下,ThreadLocalRandom 的性能比传统的 Random 类要好得多。
  • 种子的初始化ThreadLocalRandom 在每个线程第一次使用时会自动初始化种子,并将种子保存在当前线程的本地变量中,这样后续的随机数生成都可以基于该线程自己的种子进行。

# 二、原子操作

在 Java 的并发编程中,java.util.concurrent.atomic 包提供了一组基于 CAS(Compare-And-Swap)操作的原子类,它们在无锁环境下保证了线程安全性。

# AtomicInteger

# 用法举例

// 通过构造器赋value值
private static final AtomicInteger count = new AtomicInteger(0);

@Test
public void testAtomicInteger () {
    // 创建10个线程,每个线程执行1000次自增
    Thread[] threads = new Thread[10];

    for (int i = 0; i < 10; i++) {
        threads[i] = new Thread(() -> {
            for (int j = 0; j < 1000; j++) {
                count.incrementAndGet(); // 线程安全的自增
            }
        });
        threads[i].start();
    }

    // 等待所有线程执行完
    for (Thread thread : threads) {
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 输出最终计数值,理论上应该是 10000
    System.out.println("最终计数值: " + count.get());

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 源码解析

  1. 核心成员变量

    private static final Unsafe U = Unsafe.getUnsafe();
    private static final long VALUE;
    static {
        try {
            VALUE = U.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private volatile int value;
    
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    解析

    • U:通过 Unsafe 实现底层内存操作。Unsafe直接使用需要判断调用类是否由Bootstrap ClassLoader 加载, 而AtomicIntegerUnsafe 属于 java.base 模块,而 java.base 里的类是由 引导类加载器(Bootstrap ClassLoader) 加载的, 所以此处可以直接调用。

      public static Unsafe getUnsafe() {
          Class<?> caller = Reflection.getCallerClass();
          // 判断是否是 Bootstrap ClassLoader 加载
          if (!VM.isSystemDomainLoader(caller.getClassLoader()))
              throw new SecurityException("Unsafe access denied");
          return theUnsafe;
      }
      
      1
      2
      3
      4
      5
      6
      7
    • value:真正存储数据的 volatile 变量,保证可见性。在构造时可以赋默认值

    • VALUE:记录 value 字段在 AtomicInteger 对象内存中的偏移量,供 CAS 操作使用。

  2. getAndIncrement()

    public final int getAndIncrement() {
        return U.getAndAddInt(this, VALUE, 1);
    }
    
    @HotSpotIntrinsicCandidate
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
          v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
          return v;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    解析

    • Unsafe.getAndAddInt:

      • 使用 CAS 进行原子加法。
      • 保证多个线程并发执行时,不会发生数据竞争。
    • 性能优化

      避免 synchronized 造成的线程阻塞,提高效率。

    • 缺点

      高并发的场景下, 多个线程竞争更新同一个原子变量, 只有一个线程CAS会成功, 大量线程在竞争失败后会自旋尝试CAS操作, 浪费性能。

# LongAdder

# 用法举例

private final LongAdder counter = new LongAdder();

@Test
public void testLongAdder () throws InterruptedException {
    int threadCount = 10;
    int incrementsPerThread = 1000;

    Thread[] threads = new Thread[threadCount];

    for (int i = 0; i < threadCount; i++) {
        threads[i] = new Thread(() -> {
            for (int j = 0; j < incrementsPerThread; j++) {
                counter.increment(); // 线程安全的自增
            }
        });
        threads[i].start();
    }

    // 等待所有线程执行完毕
    for (Thread thread : threads) {
        thread.join();
    }

    // 获取最终结果
    System.out.println("最终计数值: " + counter.sum()); // 预期是 10000
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 源码解析

public class LongAdder extends Striped64 implements Serializable {
    
    // 维护的基础值,在低并发时直接更新
    transient volatile long base;
    
    // 维护的 Cell 数组,在高并发时用于分散竞争
    transient volatile Cell[] cells;

    /**
     * 累加指定值
     */
    public void add(long x) {
        Cell[] cs; long b, v; int m; Cell c;
        // 尝试直接更新 base 变量,失败(高并发或已经初始化 cells[])则进入细粒度处理
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x))) {
                // 进入 longAccumulate 方法,进一步处理冲突或扩容 cells[]
                longAccumulate(x, null, uncontended);
            }
        }
    }

    /**
     * 递增 1
     */
    public void increment() {
        add(1L);
    }

    /**
     * 递减 1
     */
    public void decrement() {
        add(-1L);
    }

    /**
     * 计算当前总和
     */
    public long sum() {
        Cell[] cs = cells;
        long sum = base;
        if (cs != null) {
            for (Cell c : cs) {
                if (c != null) sum += c.value;
            }
        }
        return sum;
    }

    /**
     * Cell 类:用于存储分段计数值,继承 AtomicLong 并添加缓存填充
     */
    @sun.misc.Contended
    static final class Cell {
        volatile long value;

        Cell(long x) {
            value = x;
        }

        boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                valueOffset = UNSAFE.objectFieldOffset(Cell.class.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

解析

  1. 低并发时的优化

    低并发 情况下,LongAdder 直接使用 base 变量进行 CAS(无锁原子操作)。

    if ((cs = cells) != null || !casBase(b = base, b + x)) { ... }
    
    1
    • cells == null:表示当前竞争较少,还未启用 cells[] 机制。
    • casBase(b = base, b + x):尝试直接更新 base,如果成功,说明无竞争,直接返回。

    低并发时,LongAdder 采用 AtomicLong 类似的 CAS 机制,避免不必要的复杂操作,提高性能。

  2. 高并发时的优化

    当多个线程同时更新 LongAdder,直接操作 base 会导致 CAS 失败,触发 cells[] 机制:

    if (cs == null || (m = cs.length - 1) < 0 ||
        (c = cs[getProbe() & m]) == null ||
        !(uncontended = c.cas(v = c.value, v + x))) {
        longAccumulate(x, null, uncontended);
    }
    
    1
    2
    3
    4
    5

    具体优化点:

    1. 线程哈希映射
      • c = cs[getProbe() & m]:每个线程通过 getProbe() 计算索引,访问 cells[] 数组的某个 Cell,减少热点竞争。
    2. Cell 机制
      • Cell 类类似 AtomicLong,但增加 @sun.misc.Contended 注解,防止 伪共享(多个 Cell 共享同一缓存行导致性能下降)。
    3. 失败重试机制
      • c.cas(v = c.value, v + x) 失败时,会进入 longAccumulate() 进行 重新探测或扩容

    高并发时,LongAdder 采用 cells[] 进行分段累加,避免 CAS 失败造成的频繁重试。

  3. cells[] 的扩容机制

    cells[] 竞争仍然激烈(即 Cell.cas() 失败多次),会进行扩容:

    if (cells.length < NCPU) {
        expandCells();
    }
    
    1
    2
    3
    • cells[] 初始化时大小为 2,随着冲突增加,cells[]按 2 的指数增长,直到 NCPU(CPU 核心数)。
    • 扩容后,线程会重新分布到新的 Cell,减少竞争。

    cells[] 采用懒加载 + 自适应扩容,减少内存开销,提高吞吐量。


# 三、写时复制列表

CopyOnWriteArrayList 适用于 读多写少 的并发场景,它的特点是 写入时复制,即 写操作会创建新数组,而读操作无锁,所以读取性能极高,但写入成本较高。

# 用法举例

@Test
public void testCopyWriteArrayList() throws InterruptedException {
    List<String> list = new CopyOnWriteArrayList<>();

    // 添加元素
    list.add("Java");
    list.add("Python");
    list.add("C++");

    // 线程 1:读取
    Thread readerThread = new Thread(() -> {
        for (String s : list) {
            System.out.println("读取:" + s);
        }
    });

    // 线程 2:修改
    Thread writerThread = new Thread(() -> {
        list.add("Go");
        System.out.println("写入:Go");
    });

    readerThread.start();
    writerThread.start();

    readerThread.join();
    writerThread.join();

    System.out.println("最终列表:" + list);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 源码解析

  1. 构造函数

    // 每个 CopyOnWriteArrayList 内部维护一个 Object 数组
    private transient volatile Object[] array;
    
    // 无参构造函数:初始化一个空数组
    public CopyOnWriteArrayList() {
        setArray(new Object[0]);
    }
    
    // 通过已有数组创建 CopyOnWriteArrayList,拷贝数组内容,确保数据安全
    public CopyOnWriteArrayList(E[] toCopyIn) {
        setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
    }
    
    // 通过 Collection 创建 CopyOnWriteArrayList
    public CopyOnWriteArrayList(Collection<? extends E> c) {
        Object[] es = (c.getClass() == CopyOnWriteArrayList.class)
                ? ((CopyOnWriteArrayList<?>) c).getArray()
                : c.toArray();
    
        // 确保数组类型一致
        if (es.getClass() != Object[].class) {
            es = Arrays.copyOf(es, es.length, Object[].class);
        }
    
        setArray(es);
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    解析

    1. 无参构造:创建一个空数组,初始化 CopyOnWriteArrayList
    2. 数组构造:使用 Arrays.copyOf() 深拷贝 输入数组,避免外部修改影响内部存储。
    3. 集合构造
      • 如果 cCopyOnWriteArrayList,直接取其数组,提高效率。
      • 否则,调用 toArray(),并确保数组类型一致(防止 toArray() 返回 Object[] 以外的类型)。
  2. 添加元素

    add有多个重载方法, 以add(E e)为例

    public boolean add(E e) {
        // 加锁
        synchronized (lock) {
            // 获取当前实例数组
            Object[] es = getArray();
            int len = es.length;
            // 复制原数组创建新的数组, 并对新数组进行扩容
            es = Arrays.copyOf(es, len + 1);
            es[len] = e;
            // 替换添加前的数组
            setArray(es);
            return true;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    解析

    1. 加锁

      使用 synchronized (lock) 确保线程安全,防止多个线程同时修改 array

    2. 增加元素

      复制当前数组, 创建新数组, 进行扩容, 再增加元素。

    3. 思考2个问题

      1. 为什么使用synchronized, 不适用ReentrantLock?
        • ReentrantLock采用cas自旋锁, 当线程的锁竞争激烈时, 相对于切换线程自旋的消耗更少, 但是CopyOnWriteArrayList本身适合读多写少的场景, 锁的竞争不会过于激烈, synchronized的写法更简洁。
      2. 为什么不直接在实例维护的数组上进行值的追加, 要复制一个新的数组扩容后再将array指向新的数组?
        • 由于 arrayvolatile 修饰,其作用仅在于保证可见性,但无法保证操作的原子性。如果直接在原数组上追加元素,在高并发场景下,可能会导致其它线程读取到不完整的数据,引发数据不一致问题。而通过创建新的数组进行扩容并赋值,确保 getArray() 获取的始终是稳定的、未被修改的旧数据,从而保证读写分离,避免并发读写冲突。
  3. 获取指定位置元素

    get(int index) 用于获取指定索引处的元素,若索引超出范围,则抛出 IndexOutOfBoundsException

    public E get(int index) {
        return elementAt(getArray(), index);
    }
    
    final Object[] getArray() {
        return array;
    }
    
    static <E> E elementAt(Object[] a, int index) {
        return (E) a[index];
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    解析

    从源码来看,get 方法分为两步:

    1. getArray() 获取当前数组的引用。
    2. elementAt() 根据索引返回数组中的元素。

    由于 getArray() 方法没有加锁,同一时间可能有多个线程并发执行,导致 弱一致性问题

    示例: 假设当前数组 array = [1,2,3],线程 A 和 线程 B 并发执行:

    • 情况 1(A 先执行 getArray(),B 后执行 remove(0))
      1. 线程 A 调用 get(0),执行 getArray(),但尚未返回数组。
      2. 线程 B 执行 remove(0),数组变为 [2,3]
      3. 线程 A 继续执行 elementAt(),结果 get(0) = 2,与原始数组不一致。
    • 情况 2(A 先完整获取数组,B 再执行 remove(0))
      1. 线程 A 调用 get(0)getArray() 返回 [1,2,3]
      2. 线程 B 执行 remove(0),数组变为 [2,3]
      3. 线程 A 继续执行 elementAt(),仍然返回 1,与当前数组状态不一致。

    总结

    • CopyOnWriteArrayList 采用 写时复制机制get() 操作不会加锁,可能读到已过时的数据,但不会影响程序稳定性。
    • 这是一种 弱一致性 设计,适用于 读多写少 的场景,确保 get() 操作尽可能高效。
  4. 修改指定元素

    public E set(int index, E element) {
        synchronized (lock) {
            Object[] es = getArray();
            E oldValue = elementAt(es, index);
    
            if (oldValue != element) {
                es = es.clone();
                es[index] = element;
                setArray(es);
            }
            return oldValue;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  5. 删除元素

    public E remove(int index) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            E oldValue = elementAt(es, index);
            int numMoved = len - index - 1;
            Object[] newElements;
            if (numMoved == 0)
                newElements = Arrays.copyOf(es, len - 1);
            else {
                newElements = new Object[len - 1];
                System.arraycopy(es, 0, newElements, 0, index);
                System.arraycopy(es, index + 1, newElements, index,
                                 numMoved);
            }
            setArray(newElements);
            return oldValue;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

修改 和 删除方法较为简单, 读者自己看看, 同样也是弱一致性, 虽然多个线程没法同时执行set, 但是可以执行set的同时其它线程执行remove, 可能会修改一个已经删除的元素。

# 思考题

为什么不直接普通的List写时加锁?

如果只对写操作加锁, 可能会读取到扩容时的不完整数据; 如果对读写操作同时加锁, 读性能差; CopyOnWriteArrayList每次获取数据时都是获取副本, 不会有这样的并发安全问题。