线程同步

  本文旨在系统地阐述线程同步这一关键概念,并介绍一系列Java提供的用于实现线程同步的实用工具类,如 CountDownLatchCyclicBarrierSemaphoreCondition等。

  同时,文章中提供了相关习题,确保大家能够扎实掌握这些并发工具类的应用,巩固所学知识,提升解决实际问题的能力。

  最后,本文还将带领大家深入研究上述工具类背后的源代码,剖析其实现机制和内部运作原理。




基本介绍

  线程同步是指保证多个线程以有序的方式访问共享资源,避免数据竞争,即多个线程同时读、写一个内存位置,从而导致程序行为不可预测。

  除了synchronized 关键字和内置的对象锁机制之外,Java 并发包(JUC, java.util.concurrent)还提供了多种高级的线程同步工具类。接下来,我们将详细介绍这些工具类。




Condition

  Condition 是 Java 并发包 (java.util.concurrent.locks) 中的一个接口,它的使用方式和对象锁时一样的,通常与ReentrantLock一起使用,允许一个或多个线程在某个条件为真之前阻塞等待,并且可以在该条件满足时被其他线程唤醒。

1
2
3
4
5
6
7
8
9
10
11
/**
* Condition 是一个接口,AQS 的内部实现类为 ConditionObject
*/
public interface Condition {
// 在当前条件下阻塞
void await() throws InterruptedException;
// 唤醒一个等待在条件上的线程
void signal();
// 唤醒所有等待在条件上的线程
void signalAll();
}

总结:Condition支持多条件等待队列、等待/通知方法及显式锁管理,相比synchronized更灵活、功能更强,适合复杂并发场景。




CountDownLatch

  CountDownLatch 是 Java 并发包(java.util.concurrent)中的一个同步辅助类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

  它的核心是一个计数器,该计数器被初始化为一个正整数,而countDown()方法会使其减一。若计数器递减至零,则之前所有调用 await()被阻塞的线程都会被唤醒并继续执行。

1
2
3
4
5
6
7
8
9
10
public class CountDownLatch {
// 构造方法,初始化正整数的计数值count
public CountDownLatch(int count) {}
// 获取当前计数值count
public long getCount() {}
// 将计数值count减一。若count归零,所有因await()方法被阻塞的线程将被唤醒并继续执行
public void countDown() {}
// 阻塞当前线程,等待计数值归零
public void await() throws InterruptedException {}
}



CyclicBarrier
  CyclicBarrier是 Java 并发包(java.util.concurrent)中的另一个同步辅助类,它将阻塞每个到达屏障点的线程,直到所有线程都到达屏障点。与CountDownLatch 不同的是,CyclicBarrier 可以被重复使用,这也是为什么它的名字中有“Cyclic”(循环的)这个词。

1
2
3
4
5
6
7
8
9
10
11
12
public class CyclicBarrier {
// 查询总共需要多少线程到达屏障点才能解除阻塞
public int getParties() {}
// 返回当前已到达屏障点的线程
public int getNumberWaiting() {}
// 使当前线程在屏障点阻塞等待,一旦所有线程都到达了屏障点,所有阻塞等待的线程将会阻塞并立即释放
public int await() throws InterruptedException, BrokenBarrierException {}
// 将屏障重置,重置后的屏障可以再次使用,就像新创建的一样。进行重置时,还在屏障点等待的线程将会抛出BrokenBarrierException异常。
public void reset() {}
// 检查CyclicBarrier是否破损。如果曾经有线程异常、中断或超时而未能成功完成等待,则屏障会被标记为破损。一旦破损,后续调用await()将立即抛出BrokenBarrierException,除非屏障被重置。
public boolean isBroken() {}
}

总结:CyclicBarrier 适用于需要多次重复同步的情况,CountDownLatch 则更适合用于一次性的同步场景。




Semaphore

  Semaphore(信号量)用于控制同时访问特定资源的线程数量,它使用计数值来表示可用的许可(Permits)数量。每个线程在进入临界区时获取一个或多个许可,在离开临界区时释放这些许可。如果所有许可都被占用,则会阻塞当前线程,排队直到有许可。
  此外,Semaphore支持公平非公平两种模式。非公平模式下,线程等待许可时会发生插队现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Semaphore implements java.io.Serializable {
// 获取一个许可。若没有可用许可,则会阻塞当前线程,直到有许可可用
public void acquire() throws InterruptedException {}
// 获取若干个许可。若没有足够的可用许可,则会阻塞当前线程,直到有许可可用
public void acquire(int permits) throws InterruptedException {}
// 尝试获取一个许可,返回是否获取成功
public boolean tryAcquire() {}
// 释放一个许可
public void release() {}
// 释放若干个许可
public void release(int permits) {}
// 返回当前可用的许可数量
public int availablePermits() {}
}



习题训练

阅读 Leetcode 1117 题意 , 并思考应该选用上述的哪种 API工具。

下面提供一个使用对象锁解题的示例,作为参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 使用 对象锁 解题
*/
class H2O {

int flag = 2;
public H2O() {}

public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
synchronized (this) {
while (flag == 0) this.wait();
releaseHydrogen.run();
flag--;
this.notifyAll();
}
}

public void oxygen(Runnable releaseOxygen) throws InterruptedException {
synchronized (this) {
while (flag != 0) this.wait();
releaseOxygen.run();
flag = 2;
this.notifyAll();
}
}
}



答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Condition与对象锁很像,解法也是类似的
*/
class H2O {

int flag = 2;
final ReentrantLock LOCK = new ReentrantLock();
Condition condition = LOCK.newCondition();

public H2O() {}

public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
LOCK.lock();
try {
while (flag == 0) condition.await();
releaseHydrogen.run();
flag--;
condition.signalAll();
} finally {
LOCK.unlock();
}
}

public void oxygen(Runnable releaseOxygen) throws InterruptedException {
LOCK.lock();
try {
while (flag != 0) condition.await();
releaseOxygen.run();
flag = 2;
condition.signalAll();
} finally {
LOCK.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
H2O h2O = new H2O();
h2O.hydrogen(() -> {});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 使用 Semaphore 解题
*/
class H2O {
Semaphore hydrogenSemaphore = new Semaphore(2);
Semaphore oxygenSemaphore = new Semaphore(0);

public H2O() {}

public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
hydrogenSemaphore.acquire();
releaseHydrogen.run();
if (hydrogenSemaphore.availablePermits() == 0) oxygenSemaphore.release();
}

public void oxygen(Runnable releaseOxygen) throws InterruptedException {
oxygenSemaphore.acquire();
releaseOxygen.run();
hydrogenSemaphore.release(2);
}
}



源码探究


CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CountDownLatch {
// 基于AQS实现的共享锁
private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void countDown() {
sync.releaseShared(1);
}
}

  可以看到CountDownLatch核心方法都是间接调用 Sync实例,而Sync实例是基于AQS抽象类实现的共享锁,

  AQS是Java并发包的基石,简单说明下,它提供了一套模板方法,由此衍生出许多的同步工具,知道这些就足够了,后续会单独讲。这里用到的acquireSharedInterruptibly()releaseShared()代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 尝试获取锁失败后,将线程放入阻塞队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {
// 尝试释放锁成功后,恢复阻塞队列中的所有线程
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

  前者在会先用tryAcquireShared()尝试获取锁,失败后进入阻塞队列。后者在tryReleaseShared()返回true时,释放阻塞队列中的线程。

  模板方法下,tryAcquireShared()tryReleaseShared()都是Sync来实现的,实现代码如下。首先,构造函数时初始化state值。若state非零,线程调用tryAcquireShared()后阻塞。若tryReleaseShared()将state减少至零,则返回true,释放前面阻塞的所有线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); }
int getCount() {return getState(); }
// state为零时才能获取到锁
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
// 将aqs的state值减一
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}



CyclicBarrier

  它的设计思路比较直接,从属性和构造方法可以看出设计思路。当线程到达屏障点后,它首先会在Condition的队列中阻塞。当有parties个线程到达屏障点,也就是count自减至零时,恢复队列中阻塞的所有线程,并执行barrierCommand任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final Runnable barrierCommand;
private final int parties;
private int count;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}

下面看看核心方法await(),代码是简化过的,不过逻辑也比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 复用方法
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int index = --count;
// <1> 若count自减至零,也就是所有线程到达屏障点时
if (index == 0) {
// 执行到达屏障点后的任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 唤醒阻塞线程,恢复属性初始值
nextGeneration();
return 0;
}
// <2> 阻塞等待
for (;;) {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

  除此之外,CyclicBarrier会在超时等待、中断、异常时变为破碎状态,我们需要使用reset()方法进行恢复,否则在调用await()等方法时会抛出BrokenBarrierException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 用于保存破损状态的静态内部类
private static class Generation {
boolean broken = false;
}
// CyclicBarrier进入破碎状态
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 恢复CyclicBarrier
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}