抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文学习了Atomic原子操作类常用的类和方法。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 原子操作类

原子操作类指的是java.util.concurrent.atomic包中的类,可以分成六种:

  • 普通类型原子类:AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference。
  • 数组类型原子类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。
  • 复合类型原子类:AtomicStampedReference,AtomicMarkableReference。
  • 对象属性原子类:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。
  • 累加操作原子类:DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder。
  • 累加操作基础类:Striped64,Number。

1.1 普通类型原子类

AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference是操作标量类型的原子类,其内部实现使用volatile关键字和native方法,从而避免了synchronized的高开销。

场景:

  • AtomicInteger和AtomicLong用于保证整数类型操作的线程安全。
  • AtomicBoolean可以作为中断标识,用于停止线程。
  • AtomicReference用于保证引用类型操作的线程安全,也可以封装多个共享变量,保证多个共享变量操作的线程安全。

使用AtomicInteger完成多线程自增操作:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Demo {
public AtomicInteger atomicInteger = new AtomicInteger();

public static void main(String[] args) throws InterruptedException {
Demo demo = new Demo();
// 使用CountDownLatch计数10个线程,等待线程执行结束
CountDownLatch countDownLatch = new CountDownLatch(10);
// 10个线程进行循环累加100次
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100; j++) {
demo.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
// main获取到的result:1000
System.out.println(Thread.currentThread().getName() + "获取到的result:" + demo.get());
}

public void addPlusPlus() {
atomicInteger.incrementAndGet();
}

public int get() {
return atomicInteger.get();
}
}

1.2 数组类型原子类

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray是操作数组类型的原子类,对数组提供了支持,其内部没有维持volatile变量,而是全部由native方法实现。

场景:

  • AtomicIntegerArray和AtomicLongArray用于保证整数类型数组操作的线程安全。
  • AtomicReferenceArray用于保证引用类型数组操作的不安全问题。

1.3 复合类型原子类

AtomicStampedReference,AtomicMarkableReference是复合类型的原子类,将某种值和引用关联起来。

场景:

  • AtomicMarkableReference将单个布尔值与引用关联起来,维护带有标记位的对象引用,可以原子更新带有标记位的引用类型。
  • AtomicStampedReference将整数值与引用关联起来,维护带有版本号的对象引用,可以原子更新带有版本号的引用类型,可以解决使用CAS出现的ABA问题。

使用AtomicStampedReference类进行更新:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Demo {
public static void main(String[] args) {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(0, 0);
new Thread(() -> {
// 线程1进入
System.out.println(Thread.currentThread().getName() + "进入");
// 线程1记录时间戳
int stamp = atomicStampedReference.getStamp();
// 线程1被挂起,等待其他线程执行
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 线程1更新时发现时间戳不一致,更新失败
System.out.println(Thread.currentThread().getName() + "将0更新为1:" +
atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1));
}, "t1").start();
new Thread(() -> {
// 线程2进入
System.out.println(Thread.currentThread().getName() + "进入");
// 线程2记录时间戳
int stamp = atomicStampedReference.getStamp();
// 线程2更新成功
System.out.println(Thread.currentThread().getName() + "将0更新为1:" +
atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1));
}, "t2").start();
new Thread(() -> {
// 线程3进入
System.out.println(Thread.currentThread().getName() + "进入");
// 线程3记录时间戳
int stamp = atomicStampedReference.getStamp();
// 线程3更新成功
System.out.println(Thread.currentThread().getName() + "将1更新为0:" +
atomicStampedReference.compareAndSet(1, 0, stamp, stamp + 1));
}, "t3").start();
}
}

1.4 对象属性原子类

AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater是操作对象属性的原子类,基于反射的原理,可以提供对关联字段类型的访问,用于对类中的volatile字段进行原子操作。

场景:

  • AtomicIntegerFieldUpdater和AtomicLongFieldUpdater用于解决多线程环境下整数类型属性操作的不安全问题。
  • AtomicReferenceArray用于解决多线程环境下引用类型属性操作的不安全问题。

要求:

  • 更新的属性必须使用volatile修饰符。
  • 因为操作对象属性的原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置类和属性。

优势:

  • 使用AtomicInteger获取结果值需要调用get()方法,但是AtomicIntegerFieldUpdater获取属性值可以通过对象获取,减少性能消耗。
  • 使用AtomicIntegerFieldUpdater作为类的静态成员,多个对象可以共同使用,但是AtomicInteger对象不允许多个对象共同使用,造成资源浪费。

使用AtomicIntegerFieldUpdater完成多线程自增操作:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Demo {
public static AtomicIntegerFieldUpdater atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Demo.class, "count");
// 使用volatile关键字修饰属性
public volatile int count = 0;

public static void main(String[] args) throws InterruptedException {
Demo demo = new Demo();
// 使用CountDownLatch计数10个线程,等待线程执行结束
CountDownLatch countDownLatch = new CountDownLatch(10);
// 10个线程进行循环累加100次
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100; j++) {
demo.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
// main获取到的result:1000
System.out.println(Thread.currentThread().getName() + "获取到的result:" + demo.get());
}

public void addPlusPlus() {
atomicIntegerFieldUpdater.incrementAndGet(this);
}

public int get() {
return count;
}
}

1.5 累加操作相关类

DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder是累加操作的原子类,是JDK1.8引进的并发新技术,可以看做AtomicLong和AtomicDouble的部分加强类型。

Striped64,Number是累加操作的基础类,是JDK1.8引进的并发新技术,是累加操作原子类的父类。

对比:

  • 在低并发的场景下,AtomicLong和LongAdder的性能相似,并且AtomicLong提供了更加丰富的功能。
  • 在高并发的场景下,多个线程同时进行自旋操作,会出现大量失败并不断自旋的场景,此时AtomicLong的自旋会成为瓶颈,所以LongAdder具有更好的性能,但是代价是消耗更多的内存空间。

缺点:

  • LongAdder只提供了加减法操作,功能过于单一,更多地用于收集统计数据,而不是细粒度的同步控制。
  • LongAdder的sum()方法并不精确,所以不能保证强一致性(在任何时刻查询到的都是最近更新的数据),只能保证最终一致性(最终更新的数据都会被查询到)。

原理:

  • LongAdder类和DoubleAdder类都继承自Striped64类,其底层代码调用来自于Striped64类的longAccumulate()方法和doubleAccumulate()方法。
  • Striped64类的基本思路是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样发生冲突的概率就小很多。如果要获取整体的value值,只要将各个槽中的变量值累加返回即可。

2 分散热点

在多线程高并发的情况下,为了避免自旋锁带来极大的性能开销,在Striped64类中使用了分散热点机制。

多线程自旋争抢的是value值的修改权,可以将value值看做热点,通过将value值分散到数组中实现热点分散,使用类似于哈希碰撞的机制,让每个线程通过计算得到数组中的位置,各个线程只对自己位置的value值进行CAS操作,降低了自旋发生争抢的概率。

Striped64类的计数方法在ConcurrentHashMap中也有使用,ConcurrentHashMap中的baseCount对应着Striped64中的base变量,而counterCells则对应着Striped64中的cells数组,他们的实现是一样的。

3 生产者消费者

使用原子操作类实现生产者消费者:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Demo {
public static void main(String[] args) throws InterruptedException {
Resource r = new Resource(new ArrayBlockingQueue<>(3));
new Thread(() -> {
try {
r.product();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者").start();
new Thread(() -> {
try {
r.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者").start();
Thread.sleep(5000);
r.stop();
}
}

class Resource {
private volatile boolean state = true;
private AtomicInteger count = new AtomicInteger(0);
private BlockingQueue<String> queue;

public Resource(BlockingQueue<String> queue) {
this.queue = queue;
}

public void product() throws InterruptedException {
String data;
while (state) {
data = count.incrementAndGet() + "";
boolean result = queue.offer(data, 2L, TimeUnit.SECONDS);
if (result) {
System.out.println(Thread.currentThread().getName() + " product " + data);
} else {
System.out.println(Thread.currentThread().getName() + " product 超时");
}
Thread.sleep(1000);
}
System.out.println(Thread.currentThread().getName() + "停止");
}

public void consume() throws InterruptedException {
String data;
while (state) {
data = queue.poll(2L, TimeUnit.SECONDS);
if (data == null || data.equals("")) {
System.out.println(Thread.currentThread().getName() + " consume 超时");
} else {
System.out.println(Thread.currentThread().getName() + " consume " + data);
}
Thread.sleep(1000);
}
System.out.println(Thread.currentThread().getName() + "停止");
}

public void stop() {
state = false;
}
}

评论