# 随机数、写时复制列表与原子操作
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 一、随机数
# Random类及其局限性
在 Java 中,Random 类用于生成伪随机数,其核心原理是基于种子(Seed)的计算。
# 用法举例
@Test
public void testRandom() {
Random random = new Random();
for (int i = 0; i < 10; i++) {
// 随机输出10个0~5之间的随机数
System.out.println(random.nextInt(5));
}
}
2
3
4
5
6
7
8
在单线程环境下,每次调用 nextInt() 都会基于 前一次的种子值 计算新的种子,从而生成随机数。然而,在多线程环境下,Random 可能会遇到种子竞争问题,导致不同线程生成相同的随机数。
# 工作机制
nextInt(int bound) 方法的实现逻辑如下:
public int nextInt(int bound) {
// 1. 参数校验
if (bound <= 0)
throw new IllegalArgumentException(BadBound);
// 2. 计算新的种子值
int r = next(31);
int m = bound - 1;
// 3. 计算随机数
if ((bound & m) == 0) { // bound 是否为 2 的幂
r = (int) ((bound * (long) r) >> 31);
} else {
int u;
while ((u = next(31)) - (r = u % bound) + m < 0);
}
return r;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在 单线程环境 下,这种方式可以保证较好的随机性。但是在 多线程环境 下,多个线程可能会同时读取相同的种子值,然后执行步骤 2 计算新的种子,这会导致多个线程生成相同的随机数。
为了解决这个问题,Java 采用 CAS(Compare And Swap) 机制来确保每次只有一个线程能成功更新种子,其他线程必须重新获取新的种子后再计算随机数。
# 线程安全性
Random 使用 AtomicLong 变量存储种子,并通过 CAS 操作保证多线程环境下的随机性:
protected int next(int bits) {
long nextseed;
AtomicLong seed = this.seed;
do {
// 1. 读取当前种子值
long oldseed = seed.get();
// 2. 计算新的种子
nextseed = (oldseed * multiplier + addend) & mask;
// 3. CAS 操作更新种子
} while (!seed.compareAndSet(oldseed, nextseed));
// 4. 生成随机数
return (int) (nextseed >>> (48 - bits));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
在多线程环境下:
- 线程 A 读取
oldseed,计算nextseed,尝试更新种子。 - 线程 B 也可能读取相同的
oldseed,计算nextseed,但在CAS操作时可能失败。 - 失败的线程会重新获取新的
oldseed并重新计算,直到CAS操作成功。
通过这种方式,Random 确保了种子的唯一性,从而避免多个线程生成相同的随机数。
虽然 Random 使用了 CAS 机制,但在高并发环境下仍然可能导致大量线程竞争同一个种子变量,自旋影响性能。
# ThreadLocalRandom
ThreadLocalRandom 是 Java 7 引入的一个随机数生成类,专门为多线程环境设计,它通过为每个线程提供独立的随机数生成器来避免线程间的竞争,从而提高了在多线程场景下的性能。
# 用法举例
@Test
public void testThreadLocalRandom() {
for (int i = 0; i < 10; i++) {
// 生成 0~5 之间的随机整数
System.out.println(ThreadLocalRandom.current().nextInt(5));
}
}
2
3
4
5
6
7
ThreadLocalRandom.current() 获取当前线程的 ThreadLocalRandom 实例。
nextInt(int bound) 用来生成一个指定范围的随机整数。
# 源码解释
ThreadLocalRandom.current()方法解析public static ThreadLocalRandom current() { if (U.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; }1
2
3
4
5这里的实现主要包含两部分:
U.getInt()和localInit()。下面是对每一部分的详细分析。U.getInt(Thread.currentThread(), PROBE)
U.getInt()是sun.misc.Unsafe类的一个方法,用于通过 Unsafe API 获取线程的某个字段的值。在这里,它获取当前线程的PROBE字段。PROBE是一个线程局部变量(由ThreadLocal实现),用来表示该线程的状态,通常它用来做初始化检查或标记。U.getInt(Thread.currentThread(), PROBE)返回当前线程的PROBE值。如果值为0,说明该线程尚未完成初始化,因此需要进行初始化操作。
作用:
- 它的作用是检查当前线程是否已经初始化过
ThreadLocalRandom。如果当前线程没有初始化(PROBE == 0),则会调用localInit()来进行初始化。
localInit()
localInit()是用来初始化线程局部随机数生成器的关键方法,它为每个线程分配唯一的随机种子和标识符:static final void localInit() { int p = probeGenerator.addAndGet(PROBE_INCREMENT); // 获取下一个 probe 值 int probe = (p == 0) ? 1 : p; // skip 0,避免 probe 值为 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); // 生成并获取新的种子 Thread t = Thread.currentThread(); U.putLong(t, SEED, seed); // 将种子值存储到当前线程的 SEED 字段中 U.putInt(t, PROBE, probe); // 将 probe 值存储到当前线程的 PROBE 字段中 }1
2
3
4
5
6
7
8
next()方法当调用
nextInt(bound)方法时,ThreadLocalRandom会基于当前线程的种子生成一个符合条件的随机整数public int nextInt(int bound) { // 1. 参数检查 if (bound <= 0) throw new IllegalArgumentException("bound must be positive"); // 2. 生成随机数 int r = mix32(nextSeed()); // 3. 优化bound为2的幂次方时的处理 int m = bound - 1; if ((bound & m) == 0) r &= m; // 4. 处理非2的幂次方时 else { // If not a power of two, reject over-represented candidates for (int u = r >>> 1; u + m - (r = u % bound) < 0; u = mix32(nextSeed()) >>> 1) ; } return r; }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 相对于Random类优点
- 每个线程独立的种子:每个线程通过
ThreadLocal存储自己的种子,避免了线程之间的竞争和冲突。传统的Random类在多线程环境下会因为共享种子而导致多个线程产生相同的随机数,而ThreadLocalRandom则消除了这种问题。 - 避免了锁竞争:传统的
Random类如果在多线程环境下使用,需要同步机制来保证线程安全,导致多个线程可能会在访问时自旋等待(例如,Random通过synchronized来控制对种子的更新)。而ThreadLocalRandom则通过ThreadLocal为每个线程提供独立的种子,因此避免了自旋等待和同步带来的性能损耗。 - 提高并发性能:每个线程使用独立的随机数种子,可以并行生成随机数,避免了线程间的相互干扰和锁竞争,从而提高了并发性能。在高并发场景下,
ThreadLocalRandom的性能比传统的Random类要好得多。 - 种子的初始化:
ThreadLocalRandom在每个线程第一次使用时会自动初始化种子,并将种子保存在当前线程的本地变量中,这样后续的随机数生成都可以基于该线程自己的种子进行。
# 二、原子操作
在 Java 的并发编程中,java.util.concurrent.atomic 包提供了一组基于 CAS(Compare-And-Swap)操作的原子类,它们在无锁环境下保证了线程安全性。
# AtomicInteger
# 用法举例
// 通过构造器赋value值
private static final AtomicInteger count = new AtomicInteger(0);
@Test
public void testAtomicInteger () {
// 创建10个线程,每个线程执行1000次自增
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
count.incrementAndGet(); // 线程安全的自增
}
});
threads[i].start();
}
// 等待所有线程执行完
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 输出最终计数值,理论上应该是 10000
System.out.println("最终计数值: " + count.get());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 源码解析
核心成员变量
private static final Unsafe U = Unsafe.getUnsafe(); private static final long VALUE; static { try { VALUE = U.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; public AtomicInteger(int initialValue) { value = initialValue; }1
2
3
4
5
6
7
8
9
10
11
12
13解析
U:通过Unsafe实现底层内存操作。Unsafe直接使用需要判断调用类是否由Bootstrap ClassLoader 加载, 而AtomicInteger及Unsafe属于java.base模块,而java.base里的类是由 引导类加载器(Bootstrap ClassLoader) 加载的, 所以此处可以直接调用。public static Unsafe getUnsafe() { Class<?> caller = Reflection.getCallerClass(); // 判断是否是 Bootstrap ClassLoader 加载 if (!VM.isSystemDomainLoader(caller.getClassLoader())) throw new SecurityException("Unsafe access denied"); return theUnsafe; }1
2
3
4
5
6
7value:真正存储数据的volatile变量,保证可见性。在构造时可以赋默认值VALUE:记录value字段在AtomicInteger对象内存中的偏移量,供CAS操作使用。
getAndIncrement()public final int getAndIncrement() { return U.getAndAddInt(this, VALUE, 1); } @HotSpotIntrinsicCandidate public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }1
2
3
4
5
6
7
8
9
10
11
12解析
Unsafe.getAndAddInt:- 使用
CAS进行原子加法。 - 保证多个线程并发执行时,不会发生数据竞争。
- 使用
性能优化:
避免
synchronized造成的线程阻塞,提高效率。缺点
高并发的场景下, 多个线程竞争更新同一个原子变量, 只有一个线程CAS会成功, 大量线程在竞争失败后会自旋尝试CAS操作, 浪费性能。
# LongAdder
# 用法举例
private final LongAdder counter = new LongAdder();
@Test
public void testLongAdder () throws InterruptedException {
int threadCount = 10;
int incrementsPerThread = 1000;
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
counter.increment(); // 线程安全的自增
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
// 获取最终结果
System.out.println("最终计数值: " + counter.sum()); // 预期是 10000
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 源码解析
public class LongAdder extends Striped64 implements Serializable {
// 维护的基础值,在低并发时直接更新
transient volatile long base;
// 维护的 Cell 数组,在高并发时用于分散竞争
transient volatile Cell[] cells;
/**
* 累加指定值
*/
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
// 尝试直接更新 base 变量,失败(高并发或已经初始化 cells[])则进入细粒度处理
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x))) {
// 进入 longAccumulate 方法,进一步处理冲突或扩容 cells[]
longAccumulate(x, null, uncontended);
}
}
}
/**
* 递增 1
*/
public void increment() {
add(1L);
}
/**
* 递减 1
*/
public void decrement() {
add(-1L);
}
/**
* 计算当前总和
*/
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs) {
if (c != null) sum += c.value;
}
}
return sum;
}
/**
* Cell 类:用于存储分段计数值,继承 AtomicLong 并添加缓存填充
*/
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) {
value = x;
}
boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
valueOffset = UNSAFE.objectFieldOffset(Cell.class.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
解析
低并发时的优化
在 低并发 情况下,
LongAdder直接使用base变量进行CAS(无锁原子操作)。if ((cs = cells) != null || !casBase(b = base, b + x)) { ... }1cells == null:表示当前竞争较少,还未启用cells[]机制。casBase(b = base, b + x):尝试直接更新base,如果成功,说明无竞争,直接返回。
低并发时,
LongAdder采用AtomicLong类似的CAS机制,避免不必要的复杂操作,提高性能。高并发时的优化
当多个线程同时更新
LongAdder,直接操作base会导致CAS失败,触发cells[]机制:if (cs == null || (m = cs.length - 1) < 0 || (c = cs[getProbe() & m]) == null || !(uncontended = c.cas(v = c.value, v + x))) { longAccumulate(x, null, uncontended); }1
2
3
4
5具体优化点:
- 线程哈希映射
c = cs[getProbe() & m]:每个线程通过getProbe()计算索引,访问cells[]数组的某个Cell,减少热点竞争。
Cell机制Cell类类似AtomicLong,但增加@sun.misc.Contended注解,防止 伪共享(多个Cell共享同一缓存行导致性能下降)。
- 失败重试机制
c.cas(v = c.value, v + x)失败时,会进入longAccumulate()进行 重新探测或扩容。
高并发时,
LongAdder采用cells[]进行分段累加,避免CAS失败造成的频繁重试。- 线程哈希映射
cells[]的扩容机制当
cells[]竞争仍然激烈(即Cell.cas()失败多次),会进行扩容:if (cells.length < NCPU) { expandCells(); }1
2
3cells[]初始化时大小为 2,随着冲突增加,cells[]会 按 2 的指数增长,直到NCPU(CPU 核心数)。- 扩容后,线程会重新分布到新的
Cell,减少竞争。
cells[]采用懒加载 + 自适应扩容,减少内存开销,提高吞吐量。
# 三、写时复制列表
CopyOnWriteArrayList 适用于 读多写少 的并发场景,它的特点是 写入时复制,即 写操作会创建新数组,而读操作无锁,所以读取性能极高,但写入成本较高。
# 用法举例
@Test
public void testCopyWriteArrayList() throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();
// 添加元素
list.add("Java");
list.add("Python");
list.add("C++");
// 线程 1:读取
Thread readerThread = new Thread(() -> {
for (String s : list) {
System.out.println("读取:" + s);
}
});
// 线程 2:修改
Thread writerThread = new Thread(() -> {
list.add("Go");
System.out.println("写入:Go");
});
readerThread.start();
writerThread.start();
readerThread.join();
writerThread.join();
System.out.println("最终列表:" + list);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 源码解析
构造函数
// 每个 CopyOnWriteArrayList 内部维护一个 Object 数组 private transient volatile Object[] array; // 无参构造函数:初始化一个空数组 public CopyOnWriteArrayList() { setArray(new Object[0]); } // 通过已有数组创建 CopyOnWriteArrayList,拷贝数组内容,确保数据安全 public CopyOnWriteArrayList(E[] toCopyIn) { setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); } // 通过 Collection 创建 CopyOnWriteArrayList public CopyOnWriteArrayList(Collection<? extends E> c) { Object[] es = (c.getClass() == CopyOnWriteArrayList.class) ? ((CopyOnWriteArrayList<?>) c).getArray() : c.toArray(); // 确保数组类型一致 if (es.getClass() != Object[].class) { es = Arrays.copyOf(es, es.length, Object[].class); } setArray(es); }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27解析
- 无参构造:创建一个空数组,初始化
CopyOnWriteArrayList。 - 数组构造:使用
Arrays.copyOf()深拷贝 输入数组,避免外部修改影响内部存储。 - 集合构造:
- 如果
c是CopyOnWriteArrayList,直接取其数组,提高效率。 - 否则,调用
toArray(),并确保数组类型一致(防止toArray()返回Object[]以外的类型)。
- 如果
- 无参构造:创建一个空数组,初始化
添加元素
add有多个重载方法, 以add(E e)为例
public boolean add(E e) { // 加锁 synchronized (lock) { // 获取当前实例数组 Object[] es = getArray(); int len = es.length; // 复制原数组创建新的数组, 并对新数组进行扩容 es = Arrays.copyOf(es, len + 1); es[len] = e; // 替换添加前的数组 setArray(es); return true; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14解析
加锁
使用
synchronized (lock)确保线程安全,防止多个线程同时修改array。增加元素
复制当前数组, 创建新数组, 进行扩容, 再增加元素。
思考2个问题
- 为什么使用synchronized, 不适用ReentrantLock?
- ReentrantLock采用cas自旋锁, 当线程的锁竞争激烈时, 相对于切换线程自旋的消耗更少, 但是CopyOnWriteArrayList本身适合读多写少的场景, 锁的竞争不会过于激烈, synchronized的写法更简洁。
- 为什么不直接在实例维护的数组上进行值的追加, 要复制一个新的数组扩容后再将array指向新的数组?
- 由于
array由volatile修饰,其作用仅在于保证可见性,但无法保证操作的原子性。如果直接在原数组上追加元素,在高并发场景下,可能会导致其它线程读取到不完整的数据,引发数据不一致问题。而通过创建新的数组进行扩容并赋值,确保getArray()获取的始终是稳定的、未被修改的旧数据,从而保证读写分离,避免并发读写冲突。
- 由于
- 为什么使用synchronized, 不适用ReentrantLock?
获取指定位置元素
get(int index)用于获取指定索引处的元素,若索引超出范围,则抛出IndexOutOfBoundsException。public E get(int index) { return elementAt(getArray(), index); } final Object[] getArray() { return array; } static <E> E elementAt(Object[] a, int index) { return (E) a[index]; }1
2
3
4
5
6
7
8
9
10
11解析
从源码来看,
get方法分为两步:getArray()获取当前数组的引用。elementAt()根据索引返回数组中的元素。
由于
getArray()方法没有加锁,同一时间可能有多个线程并发执行,导致 弱一致性问题。示例: 假设当前数组
array = [1,2,3],线程 A 和 线程 B 并发执行:- 情况 1(A 先执行
getArray(),B 后执行remove(0))- 线程 A 调用
get(0),执行getArray(),但尚未返回数组。 - 线程 B 执行
remove(0),数组变为[2,3]。 - 线程 A 继续执行
elementAt(),结果get(0) = 2,与原始数组不一致。
- 线程 A 调用
- 情况 2(A 先完整获取数组,B 再执行
remove(0))- 线程 A 调用
get(0),getArray()返回[1,2,3]。 - 线程 B 执行
remove(0),数组变为[2,3]。 - 线程 A 继续执行
elementAt(),仍然返回1,与当前数组状态不一致。
- 线程 A 调用
总结
CopyOnWriteArrayList采用 写时复制机制,get()操作不会加锁,可能读到已过时的数据,但不会影响程序稳定性。- 这是一种 弱一致性 设计,适用于 读多写少 的场景,确保
get()操作尽可能高效。
修改指定元素
public E set(int index, E element) { synchronized (lock) { Object[] es = getArray(); E oldValue = elementAt(es, index); if (oldValue != element) { es = es.clone(); es[index] = element; setArray(es); } return oldValue; } }1
2
3
4
5
6
7
8
9
10
11
12
13删除元素
public E remove(int index) { synchronized (lock) { Object[] es = getArray(); int len = es.length; E oldValue = elementAt(es, index); int numMoved = len - index - 1; Object[] newElements; if (numMoved == 0) newElements = Arrays.copyOf(es, len - 1); else { newElements = new Object[len - 1]; System.arraycopy(es, 0, newElements, 0, index); System.arraycopy(es, index + 1, newElements, index, numMoved); } setArray(newElements); return oldValue; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
修改 和 删除方法较为简单, 读者自己看看, 同样也是弱一致性, 虽然多个线程没法同时执行set, 但是可以执行set的同时其它线程执行remove, 可能会修改一个已经删除的元素。
# 思考题
为什么不直接普通的List写时加锁?
如果只对写操作加锁, 可能会读取到扩容时的不完整数据; 如果对读写操作同时加锁, 读性能差; CopyOnWriteArrayList每次获取数据时都是获取副本, 不会有这样的并发安全问题。