在 Java 并发领域,我们解决并发安全问题最粗暴的方式就是使用 synchronized 关键字了,但它是一种独占形式的锁,属于悲观锁机制,性能会大打折扣。volatile 貌似也是一个不错的选择,但 volatile 只能保持变量的可见性,并不保证变量的原子性操作。
CAS 全称是 compare and swap,即比较并交换,它是一种原子操作,同时 CAS 是一种乐观机制。java.util.concurrent 包很多功能都是建立在 CAS 之上,如 ReenterLock 内部的 AQS,各种原子类,其底层都用 CAS来实现原子操作。
如何解决并发安全问题
在我们认识 CAS 之前,我们是通过什么来解决并发带来的安全问题呢?volatile 关键字可以保证变量的可见性,但保证不了原子性,synchronized 关键字利用 JVM 字节码层面来实现同步机制,它是一个悲观锁机制。
public class AddTest {
public volatile int i;
public void add() {
i++;
}
}
通过javap -c AddTest
看看 add 方法的字节码指令:
public void add();
Code:
0: aload_0
1: dup
2: getfield #2 // Field i:I
5: iconst_1
6: iadd
7: putfield #2 // Field i:I
10: return
i++
被拆分成了几个指令:
- 执行
getfield
拿到原始 i; - 执行
iadd
进行加 1 操作; - 执行
putfield
写把累加后的值写回 i。
以上例子来源于:https://www.jianshu.com/p/fb6e91b013cc
当线程 1 执行到加 1 步骤时,由于还没有执行赋值改变变量的值,这时候并不会刷新主内存区中的变量,如果此时线程 2 正好要拷贝该变量的值到自己私有缓存中,问题就出现了,当线程 2 拷贝完以后,线程1正好执行赋值运算,立马更新主内存区的值,那么此时线程 2 的副本就是旧的了,脏读又出现了。
怎么解决这个问题呢?
在 add 方法加上 synchronized 修饰解决。
public class AddTest {
public volatile int i;
public synchronized void add() {
i++;
}
}
现在完美解决了并发安全问题了,但是这样做性能也会大打折扣。
下面我们来看看 JDK 自带的 CAS 方案。
CAS 底层原理
CAS 的思想很简单:三个参数,一个当前内存值 V、旧的预期值 A、即将更新的值 B,当且仅当预期值 A 和内存值 V 相同时,将内存值修改为 B 并返回 true,否则什么都不做,并返回 false。
上面那段话来源于官方解析
我们拿 AtomicInteger 类来分析,先来看看 AtomicInteger 静态代码块片段:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
// 省略部分代码
}
这里用到了 sun.misc.Unsafe 类,它可以提供硬件级别的原子操作,它可以获取某个属性在内存中的位置,也可以修改对象的字段值,只不过该类对一般开发而言,很少会用到,其底层是用 C/C++ 实现的,所以它的方式都是被 native 关键字修饰过的。
可以看得出 AtomicInteger 类存储的值是在 value 字段中,并且获取了 Unsafe 实例,在静态代码块中,还获取了 value 字段在内存中的偏移量 valueOffset。
接下来我们看个例子:
public class AddIntTest {
public AtomicInteger i;
public void add() {
i.getAndIncrement();
}
}
如上,getAndIncrement() 方法底层利用 CAS 技术保证了并发安全。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
getAndAddInt 方法:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));// 自旋
return var5;
}
var5 通过 this.getIntVolatile(var1, var2)
方法获取,是个 native 方法,其目的是获取 var1 在 var2 偏移量的值,其中 var1 就是 AtomicInteger, var2 是 valueOffset 值。
那么 CAS 核心重点来了,compareAndSwapInt 就是实现 CAS 的核心方法,其原理是如果 var1 中的 value 值和 var5 相等,就证明没有其他线程改变过这个变量,那么就把 value 值更新为 var5 + var4
,其中 var4 是更新的增量值;反之如果没有更新,那么 CAS 就一直采用自旋的方式继续进行操作(其实就是个 while 循环),这一步也是一个原子操作。
举例分析:
- 设定 AtomicInteger 的 value 原始值为 A,从 Java 内存模型得知,线程 1 和线程 2 各自持有一份 value 的副本,值都是 A。
- 线程 1 通过
getIntVolatile(var1, var2)
拿到 value 值 A,这时线程 1 被挂起。 - 线程 2 也通过
getIntVolatile(var1, var2)
方法获取到 value 值 A,并执行compareAndSwapInt
方法比较内存值也为 A,成功修改内存值为 B。 - 这时线程 1 恢复执行
compareAndSwapInt
方法比较,发现自己手里的值 A 和内存的值 B 不一致,说明该值已经被其它线程提前修改过了。 - 线程 1 重新执行
getIntVolatile(var1, var2)
再次获取 value 值,因为变量 value 被 volatile 修饰,所以其它线程对它的修改,线程 A 总是能够看到,线程A继续执行compareAndSwapInt
进行比较替换,直到成功。
以上举例1、2、3、4、5步骤来源于:https://www.jianshu.com/p/fb6e91b013cc
compareAndSwapInt 方法是一个本地方法:
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用,位于 unsafe.cpp,查看源码:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
其实我并没有看懂这段代码的意思(内心崩溃),网上查阅之后,大概知道如下意思:
- 先想办法拿到变量 value 在内存中的地址。
- 通过
Atomic::cmpxchg
实现比较替换,其中参数 x 是即将更新的值,参数 e 是原内存的值。
以上1、2解析步骤来源于:https://www.jianshu.com/p/fb6e91b013cc
CAS 常见问题
ABA 问题
CAS 看起来很爽,但它也有缺点,那就是“ABA”问题。
例如线程 1 从内存位置 V 取出 A,这时候线程 2 也从内存位置 V 取出 A,此时线程 1 处于挂起状态,线程 2 将位置 V 的值改成 B,最后再改成 A,这时候线程 1 再执行,发现位置 V 的值没有变化,尽管线程 1 也更改成功了,但是不代表这个过程就是没有问题的。
举例分析:
现有一个用单向链表实现的栈,栈顶元素为 A,A.next 为 B,期望用 CAS 将栈顶替换成 B。
有线程 1 获取了元素 A,此时线程 1 被挂起,线程 2 也获取了元素 A,并将 A、B 出栈,再 push D、C、A,这时线程 1 恢复执行 CAS,因为此时栈顶元素依然为 A,线程 1 执行成功,栈顶元素变成了 B,但 B.next 为 null,这就会导致 C、D 被丢掉了。
这个例子充分说明了 CAS 的 ABA 问退带来的隐患,通常,我们的乐观锁实现中都会带一个 version 字段来记录更改的版本,避免并发操作带来的问题。在 Java 中,AtomicStampedReference 也实现了这个处理方式。
AtomicStampedReference 的内部类 Pair:
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
如上,每个 Pair 维护一个值,其中 reference 维护对象的引用,stamp 维护修改的版本号。
compareAndSet 方法:
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
从 compareAndSet 方法得知,如果要更改内存中的值,不但要值相同,还要版本号相同。
举例分析:
public class AtomicStampedReferenceTest {
// 初始值为1,版本号为0
private static AtomicStampedReference<Integer> a = new AtomicStampedReference<>(1, 0);
// 计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) {
new Thread(() -> {
System.out.println("线程名字:" + Thread.currentThread() + ", 当前 value = " + a.getReference());
// 获取当前版本号
int stamp = a.getStamp();
// 计数器阻塞,直到计数器为0,才执行
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名字:" + Thread.currentThread() + ",CAS操作结果: " + a.compareAndSet(1, 2, stamp, stamp + 1));
}, "线程1").start();
// 线程2
new Thread(() -> {
// 将 value 值改成 2
a.compareAndSet(1, 2, a.getStamp(), a.getStamp() + 1);
System.out.println("线程名字" + Thread.currentThread() + "value = " + a.getReference());
// 将 value 值又改成 1
a.compareAndSet(2, 1, a.getStamp(), a.getStamp() + 1);
System.out.println("线程名字" + Thread.currentThread() + "value = " + a.getReference());
// 线程计数器
countDownLatch.countDown();
}, "线程2").start();
}
}
这里我用 CountDownLatch 计数器实现线程先后执行顺序,线程2先执行完后,线程1才开始执行。
打印结果:
线程名字:Thread[线程1,5,main], 当前 value = 1
线程名字Thread[线程2,5,main]value = 2
线程名字Thread[线程2,5,main]value = 1
线程名字:Thread[线程1,5,main],CAS操作结果: false
自旋问题
从源码可以知道所说的自选无非就是操作结果失败后继续循环操作,这种操作也称之为自旋锁,是一种乐观锁机制,一般来说都会给一个限定的自选次数,防止进入死循环。
自旋锁的优点是不需要休眠当前线程,因为自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是休眠当前线程是提高并发性能的关键点,这是因为减少了很多不必要的线程上下文切换开销。
但是,如果 CAS 一直操作不成功,会造成长时间原地自旋,会给 CPU 带来非常大的执行开销。