个人博客

http://www.milovetingting.cn

Java多线程(四)

前言

本文为学习Java相关知识所作笔记,参考以下资料:https://github.com/Snailclimb/JavaGuide ,感谢原作者的分享!

CyclicBarrier、 CountDownLatch、 Semaphore 的用法

CountDownLatch(线程计数器 )

CountDownLatch 类位于 java.util.concurrent 包下,利用它可以实现类似计数器的功能。比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用 CountDownLatch来实现这种功能了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
};}.start();
new Thread(){
public void run() {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
};}.start();
System.out.println("等待 2 个子线程执行完毕...");
latch.await();
System.out.println("2 个子线程已经执行完毕");
System.out.println("继续执行主线程");
}

CyclicBarrier(回环栅栏-等待至 barrier 状态再全部同时执行)

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后, CyclicBarrier 可以被重用。我们暂且把这个状态就叫做barrier,当调用 await()方法之后,线程就处于 barrier 了

CyclicBarrier 中最重要的方法就是 await 方法,它有 2 个重载版本:

  1. public int await(): 用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任
    务;

  2. public int await(long timeout, TimeUnit unit): 让这些线程等待至一定的时间,如果还有
    线程没有到达 barrier 状态就直接让到达 barrier 的线程执行后续任务。

具体使用如下, 另外 CyclicBarrier 是可以重用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
System.out.println("线程 "+Thread.currentThread().getName()+"写入数据完
毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
}
}
}

Semaphore(信号量-控制同时访问的线程个数)

Semaphore 翻译成字面意思为 信号量, Semaphore 可以控制同时访问的线程个数, 通过acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可

Semaphore 类中比较重要的几个方法:

  1. public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

  2. public void acquire(int permits):获取 permits 个许可

  3. public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。

  4. public void release(int permits) { }:释放 permits 个许可

上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法

  1. public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回 true,若获取失败,则立即返回 false

  2. public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false

  3. public boolean tryAcquire(int permits):尝试获取 permits 个许可,若获取成功,则立即返回 true,若获取失败,则立即返回 false

  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取 permits个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false

  5. 还可以通过 availablePermits()方法得到可用的许可数目。

例子:若一个工厂有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过 Semaphore 来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同; CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行; 而 CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外, CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。

Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限

volatile 关键字的作用(变量可见性、禁止重排序)

Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。 volatile 变量具备两种特性, volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。

变量可见性

其一是保证该变量对所有线程可见,这里的可见性指的是当一个线程修改了变量的值,那么新的值对于其他线程是可以立即获取的

禁止重排序

volatile 禁止了指令重排。

比 sychronized 更轻量级的同步锁

在访问 volatile 变量时不会执行加锁操作,因此也就不会使执行线程阻塞,因此 volatile 变量是一种比 sychronized 关键字更轻量级的同步机制。 volatile 适合这种场景:一个变量被多个线程共享,线程直接给这个变量赋值

pic

当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到 CPU 缓存中。如果计算机有多个 CPU,每个线程可能在不同的 CPU 上被处理,这意味着每个线程可以拷贝到不同的 CPUcache 中。而声明变量是 volatile 的, JVM 保证了每次读变量都从内存中读,跳过 CPU cache这一步。

适用场景

值得说明的是对 volatile 变量的单次读/写操作可以保证原子性的,如 long 和 double 类型变量,但是并不能保证 i++这种操作的原子性,因为本质上 i++是读、写两次操作。在某些场景下可以代替 Synchronized。但是,volatile 的不能完全取代 Synchronized 的位置,只有在一些特殊的场景下,才能适用 volatile。总的来说,必须同时满足下面两个条件才能保证在并发环境的线程安全:

  1. 对变量的写操作不依赖于当前值(比如 i++),或者说是单纯的变量赋值(booleanflag = true) 。

  2. 该变量没有包含在具有其他变量的不变式中, 也就是说,不同的 volatile 变量之间,不能互相依赖。 只有在状态真正独立于程序内其他内容时才能使用 volatile。

如何在两个线程之间共享数据

Java 里面进行多线程通信的主要方式就是共享内存的方式,共享内存主要的关注点有两个:可见性和有序性原子性。 Java 内存模型(JMM)解决了可见性和有序性的问题,而锁解决了原子性的问题, 理想情况下我们希望做到“同步”和“互斥”。 有以下常规实现方法:

将数据抽象成一个类,并将数据的操作作为这个类的方法

将数据抽象成一个类,并将对这个数据的操作作为这个类的方法,这么设计可以和容易做到同步,只要在方法上加” synchronized“

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MyData {
private int j=0;
public synchronized void add(){
j++;
System.out.println("线程"+Thread.currentThread().getName()+"j 为: "+j);
}
public synchronized void dec(){
j--;
System.out.println("线程"+Thread.currentThread().getName()+"j 为: "+j);
}
public int getData(){
return j;
}
}

public class AddRunnable implements Runnable{
MyData data;
public AddRunnable(MyData data){
this.data= data;
}
public void run() {
data.add();
}
}

public class DecRunnable implements Runnable {
MyData data;
public DecRunnable(MyData data){
this.data = data;
}
public void run() {
data.dec();
}
}
public static void main(String[] args) {
MyData data = new MyData();
Runnable add = new AddRunnable(data);
Runnable dec = new DecRunnable(data);
for(int i=0;i<2;i++){
new Thread(add).start();
new Thread(dec).start();
}
}

Runnable 对象作为一个类的内部类

将 Runnable 对象作为一个类的内部类,共享数据作为这个类的成员变量,每个线程对共享数据的操作方法也封装在外部类,以便实现对数据的各个操作的同步和互斥,作为内部类的各个 Runnable 对象调用外部类的这些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MyData {
private int j=0;
public synchronized void add(){
j++;
System.out.println("线程"+Thread.currentThread().getName()+"j 为: "+j);
}
public synchronized void dec(){
j--;
System.out.println("线程"+Thread.currentThread().getName()+"j 为: "+j);
}
public int getData(){
return j;
}
}

public class TestThread {
public static void main(String[] args) {
final MyData data = new MyData();
for(int i=0;i<2;i++){
new Thread(new Runnable(){
public void run() {
data.add();
}
}).start();
new Thread(new Runnable(){
public void run() {
data.dec();
}
}).start();
}
}
}

ThreadLocal 作用(线程本地存储)

ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储, ThreadLocal 的作用是提供线程内的局部变量, 这种变量在线程的生命周期内起作用, 减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度

ThreadLocalMap(线程的一个属性)

  1. 每个线程中都有一个自己的 ThreadLocalMap 类对象,可以将线程自己的对象保持到其中,各管各的,线程可以正确的访问到自己的对象。

  2. 将一个共用的 ThreadLocal 静态实例作为 key,将不同对象的引用保存 到不同线程的ThreadLocalMap 中,然后在线程执行的各处通过这个静态 ThreadLocal 实例的 get()方法取得自己线程保存的那个对象,避免了将这个对象作为参数传递的麻烦。

  3. ThreadLocalMap 其实就是线程里面的一个属性,它在 Thread 类中定义

    ThreadLocal.ThreadLocalMap threadLocals = null;

pic

使用场景

最常见的 ThreadLocal 使用场景为 用来解决 数据库连接、 Session 管理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (s == null) {
s = getSessionFactory().openSession();
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}

synchronized 和 ReentrantLock 的区别

两者的共同点

  1. 都是用来协调多线程对共享对象、变量的访问

  2. 都是可重入锁,同一线程可以多次获得同一个锁

  3. 都保证了可见性和互斥性

两者的不同点

  1. ReentrantLock 显示的获得、释放锁, synchronized 隐式获得释放锁

  2. ReentrantLock 可响应中断、可轮回, synchronized 是不可以响应中断的,为处理锁的不可用性提供了更高的灵活性

  3. ReentrantLock 是 API 级别的, synchronized 是 JVM 级别的

  4. ReentrantLock 可以实现公平锁

  5. ReentrantLock 通过 Condition 可以绑定多个条件

  6. 底层实现不一样, synchronized 是同步阻塞,使用的是悲观并发策略, lock 是同步非阻塞,采用的是乐观并发策略

  7. Lock 是一个接口,而 synchronized 是 Java 中的关键字, synchronized 是内置的语言实现。

  8. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁。

  9. Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用 synchronized 时,等待的线程会一直等待下去,不能够响应中断。

  10. 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。

  11. Lock 可以提高多个线程进行读操作的效率,既就是实现读写锁等。

ConcurrentHashMap 并发

减小锁粒度

减小锁粒度是指缩小锁定对象的范围,从而减小锁冲突的可能性,从而提高系统的并发能力。减小锁粒度是一种削弱多线程锁竞争的有效手段, 这种技术典型的应用是 ConcurrentHashMap(高性能的 HashMap)类的实现。对于 HashMap 而言,最重要的两个方法是 get 与 set 方法,如果我们对整个 HashMap 加锁,可以得到线程安全的对象,但是加锁粒度太大。 Segment 的大小也被称为 ConcurrentHashMap 的并发度。

ConcurrentHashMap 分段锁

ConcurrentHashMap,它内部细分了若干个小的 HashMap,称之为段(Segment)。 默认情况下一个 ConcurrentHashMap 被进一步细分为 16 个段,既就是锁的并发度。

如果需要在 ConcurrentHashMap 中添加一个新的表项,并不是将整个 HashMap 加锁,而是首先根据 hashcode 得到该表项应该存放在哪个段中,然后对该段加锁,并完成 put 操作。在多线程环境中,如果多个线程同时进行 put操作,只要被加入的表项不存放在同一个段中,则线程间可以做到真正的并行。

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。 Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色, HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组, Segment 的结构和 HashMap类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护一个HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。

pic

Java 中用到的线程调度

抢占式调度

抢占式调度指的是每条线程执行的时间、线程的切换都由系统控制,系统控制指的是在系统某种运行机制下,可能每条线程都分同样的执行时间片,也可能是某些线程执行的时间片较长,甚至某些线程得不到执行的时间片。在这种机制下,一个线程的堵塞不会导致整个进程堵塞

协同式调度

协同式调度指某一线程执行完后主动通知系统切换到另一线程上执行,这种模式就像接力赛一样,一个人跑完自己的路程就把接力棒交接给下一个人,下个人继续往下跑。线程的执行时间由线程本身控制,线程切换可以预知,不存在多线程同步问题,但它有一个致命弱点:如果一个线程编写有问题,运行到一半就一直堵塞,那么可能导致整个系统崩溃

JVM 的线程调度实现(抢占式调度)

java 使用的线程调使用抢占式调度, Java 中线程会按优先级分配 CPU 时间片运行, 且优先级越高越优先执行,但优先级高并不代表能独自占用执行时间片,可能是优先级高得到越多的执行时间片,反之,优先级低的分到的执行时间少但不会分配不到执行时间

线程让出 cpu 的情况

  1. 当前运行线程主动放弃 CPU, JVM 暂时放弃 CPU 操作(基于时间片轮转调度的 JVM 操作系统不会让线程永久放弃 CPU,或者说放弃本次时间片的执行权),例如调用 yield()方法。

  2. 当前运行线程因为某些原因进入阻塞状态,例如阻塞在 I/O 上。

  3. 当前运行线程结束,即运行完 run()方法里面的任务。

进程调度算法

优先调度算法

  1. 先来先服务调度算法(FCFS)

    当在作业调度中采用该算法时,每次调度都是从后备作业队列中选择一个或多个最先进入该队列的作业,将它们调入内存,为它们分配资源、创建进程,然后放入就绪队列。在进程调度中采用 FCFS 算法时,则每次调度是从就绪队列中选择一个最先进入该队列的进程,为之分配处理机,使之投入运行。该进程一直运行到完成或发生某事件而阻塞后才放弃处理机, 特点是:算法比较简单,可以实现基本上的公平

  2. 短作业(进程)优先调度算法

    短作业优先(SJF)的调度算法是从后备队列中选择一个或若干个估计运行时间最短的作业,将它们调入内存运行。而短进程优先(SPF)调度算法则是从就绪队列中选出一个估计运行时间最短的进程,将处理机分配给它,使它立即执行并一直执行到完成,或发生某事件而被阻塞放弃处理机时再重新调度。 该算法未照顾紧迫型作业。

高优先权优先调度算法

为了照顾紧迫型作业,使之在进入系统后便获得优先处理,引入了最高优先权优先(FPF)调度算法。当把该算法用于作业调度时,系统将从后备队列中选择若干个优先权最高的作业装入内存。当用于进程调度时,该算法是把处理机分配给就绪队列中优先权最高的进程。

  1. 非抢占式优先权算法

在这种方式下,系统一旦把处理机分配给就绪队列中优先权最高的进程后,该进程便一直执行下去,直至完成;或因发生某事件使该进程放弃处理机时。这种调度算法主要用于批处理系统中;也可用于某些对实时性要求不严的实时系统中。

  1. 抢占式优先权调度算法

在这种方式下,系统同样是把处理机分配给优先权最高的进程,使之执行。 但在其执行期间,只要又出现了另一个其优先权更高的进程,进程调度程序就立即停止当前进程(原优先权最高的进程)的执行,重新将处理机分配给新到的优先权最高的进程。显然,这种抢占式的优先权调度算法能更好地满足紧迫作业的要求,故而常用于要求比较严格的实时系统中,以及对性能要求较高的批处理和分时系统中

  1. 高响应比优先调度算法

在批处理系统中,短作业优先算法是一种比较好的算法,其主要的不足之处是长作业的运行得不到保证。如果我们能为每个作业引入前面所述的动态优先权,并使作业的优先级随着等待时间的增加而以速率 a 提高,则长作业在等待一定的时间后,必然有机会分配到处理机。该优先权的变化规律可描述为

pic

(1) 如果作业的等待时间相同,则要求服务的时间愈短,其优先权愈高,因而该算法有利于短作业。

(2) 当要求服务的时间相同时,作业的优先权决定于其等待时间,等待时间愈长,其优先权愈高,因而它实现的是先来先服务。

(3) 对于长作业,作业的优先级可以随等待时间的增加而提高,当其等待时间足够长时,其优先级便可升到很高,从而也可获得处理机。简言之,该算法既照顾了短作业,又考虑了作业到达的先后次序,不会使长作业长期得不到服务。因此,该算法实现了一种较好的折衷。当然,在利用该算法时,每要进行调度之前,都须先做响应比的计算,这会增加系统开销。

基于时间片的轮转调度算法

  1. 时间片轮转法

    在早期的时间片轮转法中,系统将所有的就绪进程按先来先服务的原则排成一个队列,每次调度时,把 CPU 分配给队首进程,并令其执行一个时间片。时间片的大小从几 ms 到几百 ms。 当执行的时间片用完时,由一个计时器发出时钟中断请求,调度程序便据此信号来停止该进程的执行,并将它送往就绪队列的末尾;然后,再把处理机分配给就绪队列中新的队首进程,同时也让它执行一个时间片。 这样就可以保证就绪队列中的所有进程在一给定的时间内均能获得一时间片的处理机执行时间

  2. 多级反馈队列调度算法

    (1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第 i+1 个队列的时间片要比第 i 个队列的时间片长一倍。

    (2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按 FCFS 原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按 FCFS 原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第 n 队列后,在第 n 队列便采取按时间片轮转的方式运行。

    (3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第 1~(i-1)队列均空时,才会调度第 i 队列中的进程运行。如果处理机正在第 i 队列中为某进程服务时,又有新进程进入优先权较高的队列(第 1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第 i 队列的末尾,把处理机分配给新到的高优先权进程。在多级反馈队列调度算法中,如果规定第一个队列的时间片略大于多数人机交互所需之处理时间时,便能够较好的满足各种类型用户的需要。

什么是 CAS(比较并交换-乐观锁机制-锁自旋)

概念及特性

CAS(Compare And Swap/Set)比较并交换, CAS 算法的过程是这样:它包含 3 个参数CAS(V,E,N)。 V 表示要更新的变量(内存值), E 表示预期值(旧的), N 表示新值。当且仅当 V 值等于 E 值时,才会将 V 的值设为 N,如果 V 值和 E 值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后, CAS 返回当前 V 的真实值

CAS 操作是抱着乐观的态度进行的(乐观锁),它总是认为自己可以成功完成操作。 当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS 操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理

原子包 java.util.concurrent.atomic(锁自旋)

JDK1.5 的原子包: java.util.concurrent.atomic 这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性, 即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。

相对于对于 synchronized 这种阻塞算法, CAS 是非阻塞算法的一种常见实现。 由于一般 CPU 切换时间比 CPU 指令集操作更加长, 所以 J.U.C 在性能上有了很大的提升。如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicInteger extends Number implements java.io.Serializable {
private volatile int value;
public final int get() {
return value;
}
public final int getAndIncrement() {
for (;;) { //CAS 自旋,一直尝试,直达成功
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}

getAndIncrement 采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行CAS 操作,如果成功就返回结果,否则重试直到成功为止。而 compareAndSet 利用 JNI 来完成CPU 指令的操作。

ABA 问题

CAS 会导致“ABA 问题”。 CAS 算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化。

比如说一个线程 one 从内存位置 V 中取出 A,这时候另一个线程 two 也从内存中取出 A,并且two 进行了一些操作变成了 B,然后 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操作发现内存中仍然是 A,然后 one 操作成功。尽管线程 one 的 CAS 操作成功,但是不代表这个过程就是没有问题的

部分乐观锁的实现是通过版本号(version)的方式来解决 ABA 问题,乐观锁每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1 操作,否则就执行失败。因为每次操作的版本号都会随之增加,所以不会出现 ABA 问题,因为版本号只会增加不会减少。

什么是 AQS(抽象的队列同步器)

AbstractQueuedSynchronizer 类如其名,抽象的队列式的同步器, AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。

pic

它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里 volatile 是核心关键词,具体 volatile 的语义,在此不述。 state 的访问方式有三种:

getState()

setState()

compareAndSetState()

AQS 定义两种资源共享方式

Exclusive 独占资源-ReentrantLock

Exclusive(独占,只有一个线程能执行,如 ReentrantLock)

Share 共享资源-Semaphore/CountDownLatch

Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。

AQS 只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现, AQS 这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过 state 的 get/set/CAS)之所以没有定义成abstract ,是 因 为独 占模 式 下 只 用实现 tryAcquire-tryRelease ,而 共享 模 式 下 只用 实 现tryAcquireShared-tryReleaseShared。如果都定义成 abstract,那么每个模式也要去实现另一模式下的接口。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等), AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

1. isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。

2. tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。

3. tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。

4. tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败; 0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

5. tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回 false。

同步器的实现是 ABS 核心(state 资源状态计数)

同步器的实现是 ABS 核心,以 ReentrantLock 为例, state 初始化为 0,表示未锁定状态。 A 线程lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前, A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

以 CountDownLatch 以例,任务分为 N 个子线程去执行, state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的, 每个子线程执行完后 countDown()一次, state会 CAS 减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await()函数返回,继续后余动作。

ReentrantReadWriteLock 实现独占和共享两种方式

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquiretryRelease、 tryAcquireShared-tryReleaseShared 中的一种即可。 但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock