本文旨在系统地阐述线程同步这一关键概念,并介绍一系列Java提供的用于实现线程同步的实用工具类,如 CountDownLatch、CyclicBarrier、Semaphore和 Condition等。
同时,文章中提供了相关习题,确保大家能够扎实掌握这些并发工具类的应用,巩固所学知识,提升解决实际问题的能力。
最后,本文还将带领大家深入研究上述工具类背后的源代码,剖析其实现机制和内部运作原理。
基本介绍
线程同步是指保证多个线程以有序的方式访问共享资源,避免数据竞争,即多个线程同时读、写一个内存位置,从而导致程序行为不可预测。
除了synchronized 关键字和内置的对象锁机制之外,Java 并发包(JUC, java.util.concurrent)还提供了多种高级的线程同步工具类。接下来,我们将详细介绍这些工具类。
Condition
Condition 是 Java 并发包 (java.util.concurrent.locks) 中的一个接口,它的使用方式和对象锁时一样的,通常与ReentrantLock一起使用,允许一个或多个线程在某个条件为真之前阻塞等待,并且可以在该条件满足时被其他线程唤醒。
1 2 3 4 5 6 7 8 9 10 11
|
public interface Condition { void await() throws InterruptedException; void signal(); void signalAll(); }
|
总结:Condition支持多条件等待队列、等待/通知方法及显式锁管理,相比synchronized更灵活、功能更强,适合复杂并发场景。
CountDownLatch
CountDownLatch 是 Java 并发包(java.util.concurrent)中的一个同步辅助类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
它的核心是一个计数器,该计数器被初始化为一个正整数,而countDown()方法会使其减一。若计数器递减至零,则之前所有调用 await()被阻塞的线程都会被唤醒并继续执行。
1 2 3 4 5 6 7 8 9 10
| public class CountDownLatch { public CountDownLatch(int count) {} public long getCount() {} public void countDown() {} public void await() throws InterruptedException {} }
|
CyclicBarrier
CyclicBarrier是 Java 并发包(java.util.concurrent)中的另一个同步辅助类,它将阻塞每个到达屏障点的线程,直到所有线程都到达屏障点。与CountDownLatch 不同的是,CyclicBarrier 可以被重复使用,这也是为什么它的名字中有“Cyclic”(循环的)这个词。
1 2 3 4 5 6 7 8 9 10 11 12
| public class CyclicBarrier { public int getParties() {} public int getNumberWaiting() {} public int await() throws InterruptedException, BrokenBarrierException {} public void reset() {} public boolean isBroken() {} }
|
总结:CyclicBarrier 适用于需要多次重复同步的情况,CountDownLatch 则更适合用于一次性的同步场景。
Semaphore
Semaphore(信号量)用于控制同时访问特定资源的线程数量,它使用计数值来表示可用的许可(Permits)数量。每个线程在进入临界区时获取一个或多个许可,在离开临界区时释放这些许可。如果所有许可都被占用,则会阻塞当前线程,排队直到有许可。
此外,Semaphore支持公平和非公平两种模式。非公平模式下,线程等待许可时会发生插队现象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class Semaphore implements java.io.Serializable { public void acquire() throws InterruptedException {} public void acquire(int permits) throws InterruptedException {} public boolean tryAcquire() {} public void release() {} public void release(int permits) {} public int availablePermits() {} }
|
习题训练
阅读 Leetcode 1117 题意 , 并思考应该选用上述的哪种 API工具。
下面提供一个使用对象锁解题的示例,作为参考。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
class H2O {
int flag = 2; public H2O() {}
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException { synchronized (this) { while (flag == 0) this.wait(); releaseHydrogen.run(); flag--; this.notifyAll(); } }
public void oxygen(Runnable releaseOxygen) throws InterruptedException { synchronized (this) { while (flag != 0) this.wait(); releaseOxygen.run(); flag = 2; this.notifyAll(); } } }
|
答案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
class H2O {
int flag = 2; final ReentrantLock LOCK = new ReentrantLock(); Condition condition = LOCK.newCondition();
public H2O() {}
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException { LOCK.lock(); try { while (flag == 0) condition.await(); releaseHydrogen.run(); flag--; condition.signalAll(); } finally { LOCK.unlock(); } }
public void oxygen(Runnable releaseOxygen) throws InterruptedException { LOCK.lock(); try { while (flag != 0) condition.await(); releaseOxygen.run(); flag = 2; condition.signalAll(); } finally { LOCK.unlock(); } }
public static void main(String[] args) throws InterruptedException { H2O h2O = new H2O(); h2O.hydrogen(() -> {}); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
class H2O { Semaphore hydrogenSemaphore = new Semaphore(2); Semaphore oxygenSemaphore = new Semaphore(0);
public H2O() {}
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException { hydrogenSemaphore.acquire(); releaseHydrogen.run(); if (hydrogenSemaphore.availablePermits() == 0) oxygenSemaphore.release(); }
public void oxygen(Runnable releaseOxygen) throws InterruptedException { oxygenSemaphore.acquire(); releaseOxygen.run(); hydrogenSemaphore.release(2); } }
|
源码探究
CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class CountDownLatch { private final Sync sync;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public void countDown() { sync.releaseShared(1); } }
|
可以看到CountDownLatch核心方法都是间接调用 Sync实例,而Sync实例是基于AQS抽象类实现的共享锁,
AQS是Java并发包的基石,简单说明下,它提供了一套模板方法,由此衍生出许多的同步工具,知道这些就足够了,后续会单独讲。这里用到的acquireSharedInterruptibly()和releaseShared()代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public abstract class AbstractQueuedSynchronizer { public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } }
|
前者在会先用tryAcquireShared()尝试获取锁,失败后进入阻塞队列。后者在tryReleaseShared()返回true时,释放阻塞队列中的线程。
模板方法下,tryAcquireShared()和tryReleaseShared()都是Sync来实现的,实现代码如下。首先,构造函数时初始化state值。若state非零,线程调用tryAcquireShared()后阻塞。若tryReleaseShared()将state减少至零,则返回true,释放前面阻塞的所有线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() {return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
|
CyclicBarrier
它的设计思路比较直接,从属性和构造方法可以看出设计思路。当线程到达屏障点后,它首先会在Condition的队列中阻塞。当有parties个线程到达屏障点,也就是count自减至零时,恢复队列中阻塞的所有线程,并执行barrierCommand任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final Runnable barrierCommand; private final int parties; private int count;
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } }
|
下面看看核心方法await(),代码是简化过的,不过逻辑也比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { int index = --count; if (index == 0) { final Runnable command = barrierCommand; if (command != null) command.run(); nextGeneration(); return 0; } for (;;) { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } } } finally { lock.unlock(); } } private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
|
除此之外,CyclicBarrier会在超时等待、中断、异常时变为破碎状态,我们需要使用reset()方法进行恢复,否则在调用await()等方法时会抛出BrokenBarrierException异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private static class Generation { boolean broken = false; }
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } }
|