CyclicBarrier的使用及分析(jdk1.8)

开始是想和CountDownLatch写在一篇的 但是怕篇幅太长就分开来写了,CyclicBarrier从字面上理解是循环阻碍,它是一个同步辅助类,允许一个或多个线程互相等待,直到到达某个点,并且可以多次循环。下面还是先看一个实例。

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000);
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
线程Thread-0正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

通过输出可以看出当所有线程都写入完毕后才开始继续运行。

源码分析

构造函数

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

CyclicBarrier提供了两个构造方法,但最后调的都是这个,就是做个参数效验,然后赋值没什么难点,barrierAction参数是都到了共同点之后可以先执行一个动作。

属性介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private int count;
private final int parties;
//表示parties个线程到达barrier时,执行的动作
private final Runnable barrierCommand;
//一个可重入锁,CyclicBarrier的主要加锁方式。
private final ReentrantLock lock = new ReentrantLock();
//通过lock得到的一个状态变量
private final Condition trip = lock.newCondition();
//通过构造器传入的参数,表示总的等待线程的数量。
private final int parties;
//当屏障正常打开后运行的进程,通过最后一个调用await的线程来执行。
private final Runnable barrierCommand;
//当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。
private Generation generation = new Generation();

内部类

1
2
3
private static class Generation {
boolean broken = false;
}

Generation是CyclicBarrier的一个私有内部类,他只有一个成员变量来标识当前的barrier是否已”损坏”。

常用方法

await
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await是最常用也是最重要的方法,调用await()的线程会等待直到有足够数量的线程调用await--也就是开闸状态。
await是由内部方法dowait来实现的,下面我们来看看这个方法的内部实现。

dowait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 保存当前的generation
final Generation g = generation;
// 若当前generation已损坏,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 将计数器-1
int index = --count;
// 如果index等于0了,则意味着有parties个线程到达barrier。
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果barrierCommand不为null,则执行该动作。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 当前线程一直阻塞,直到有parties个线程到达barrier或当前线程被中断或超时这3者之一发生,当前线程才继续执行。
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果不是超时等待,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果当前generation已经损坏,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果generation已经换代,则返回index。
if (g != generation)
return index;
// 如果是超时等待,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放独占锁
lock.unlock();
}
}

dowait()的作用就是让当前线程阻塞,直到有parties个线程到达barrier或当前线程被中断 或超时这三者之一发生,当前线程才继续执行。

在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。当有parties个线程到达barrier,generation就会被更新换代。如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。

breakBarrier
1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。

nextGeneration

将count计数器-1,即–count;然后判断是不是有parties个线程到达barrier,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:

1
2
3
4
5
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。
在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

reset
1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

reset方法先break当执行breakBarrier操作(如果有线程在barrier上等待,调用reset会导致BrokenBarrierException),再更新generation对象。