• 让天下没有难学的技术
    多数学员都来自推荐,这就是口碑的力量

JUC系列之CyclicBarrier详解

CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至某个状态(屏障点)之后再全部同时执行,同时他还有一个特点,所有线程都被释放了以后,CyclicBarrier还可以被重用。

废话不多说,一切以实践为主,以下是我写的一个例子,供大家理解。

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{System.out.println(“人满发车了”);});

System.out.println(“一车3人,车满发车”);

for (int i = 0; i < 10; i++) {

int finalI = i;

new Thread(()->{

System.out.println(“第”+ finalI +”个人上车了”);

try {

cyclicBarrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

}).start();

}

再来看一下执行结果:

从结果可以看出来,车上每上3个人,就会人满发车,最后第9个人上车了,但是由于不满3个人,所以线程一直没发车,直到等到3个人满时才会发车。

  1. 先看下CyclicBarrier的结构有哪些方法和属性

//栅栏代

private static class Generation {

//是否要打破

boolean broken = false;

}

 

//资源独占锁

private final ReentrantLock lock = new ReentrantLock();

//条件等待队列

private final Condition trip = lock.newCondition();

//拦截的线程数

private final int parties;

//到达屏障点(换代前),执行的方法

private final Runnable barrierCommand;

//栅栏代

private Generation generation = new Generation();

//线程计数器

private int count;

CyclicBarrier提供了2个构造方法。

 

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

//标识屏障拦截的线程数量,相当于一个副本,作用是在达到屏障后进行重置count操作

this.parties = parties;

//线程计数器

this.count = parties;

//达到屏障点后执行的方法

this.barrierCommand = barrierAction;

}

 

public CyclicBarrier(int parties) {

this(parties, null);

}

  1. 接着我们分析一下CyclicBarrier的await()方法

 

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

 

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException

//实际用的是一把独占锁

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

 

if (g.broken)

throw new BrokenBarrierException();

//线程是否中断,如果中断的话,需要打破栅栏,并抛出中断异常

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

 

int index = –count;

if (index == 0) {  // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

//到达屏障点时,执行需要处理的业务

if (command != null)

command.run();

ranAction = true;

//执行下一代屏障,在这儿会将条件等待队列的元素转到同步队列,并将之前阻塞的线程唤醒,

nextGeneration();

return 0;

} finally {

//是否要打破栅栏代

if (!ranAction)

breakBarrier();

}

}

 

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

//在线程计数器count不等于0时,会走到Condition.await()

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We’re about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// “belong” to subsequent execution.

Thread.currentThread().interrupt();

}

}

 

if (g.broken)

throw new BrokenBarrierException();

 

if (g != generation)

return index;

 

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

 

 

/**

* Implements interruptible condition wait.

* <ol>

* <li> If current thread is interrupted, throw InterruptedException.

* <li> Save lock state returned by {@link #getState}.

* <li> Invoke {@link #release} with saved state as argument,

*      throwing IllegalMonitorStateException if it fails.

* <li> Block until signalled or interrupted.

* <li> Reacquire by invoking specialized version of

*      {@link #acquire} with saved state as argument.

* <li> If interrupted while blocked in step 4, throw InterruptedException.

* </ol>

*/

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

//添加到条件等待队列中

Node node = addConditionWaiter();

//释放当前拿到的ReentrantLock锁

int savedState = fullyRelease(node);

int interruptMode = 0;

//判断是否在同步队列上

while (!isOnSyncQueue(node)) {

//不在的话,就阻塞当前队列

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

 

/**

* Adds a new waiter to wait queue.

* @return its new wait node

*/

private Node addConditionWaiter() {

//第一次进来时,lastWaiter为Null

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

//如果最后一个节点不为空而且不是条件等待状态

if (t != null && t.waitStatus != Node.CONDITION) {

//剔除已取消的等待队列,并重新给t复制lastWaiter

unlinkCancelledWaiters();

t = lastWaiter;

}

//新创建一个thread当前线程的Node

Node node = new Node(Thread.currentThread(), Node.CONDITION);

//lastWaiter为空的话,新增firstWaiter为当前节点;lastWaiter不为空的话,新增t.nextWaiter为当前节点;

if (t == null)

firstWaiter = node;

else

t.nextWaiter = node;

//将lastWaiter指向当前节点

lastWaiter = node;

return node;

}

 

/**

* Invokes release with current state value; returns saved state.

* Cancels node and throws exception on failure.

* @param node the condition node for this wait

* @return previous sync state

*/

final int fullyRelease(Node node) {

boolean failed = true;

try {

//拿到当前的锁状态标识state

int savedState = getState();

//释放锁

if (release(savedState)) {

failed = false;

return savedState;

} else {

throw new IllegalMonitorStateException();

}

} finally {

if (failed)

node.waitStatus = Node.CANCELLED;

}

}

 

/**

* Unlinks cancelled waiter nodes from condition queue.

* Called only while holding lock. This is called when

* cancellation occurred during condition wait, and upon

* insertion of a new waiter when lastWaiter is seen to have

* been cancelled. This method is needed to avoid garbage

* retention in the absence of signals. So even though it may

* require a full traversal, it comes into play only when

* timeouts or cancellations occur in the absence of

* signals. It traverses all nodes rather than stopping at a

* particular target to unlink all pointers to garbage nodes

* without requiring many re-traversals during cancellation

* storms.

*/

private void unlinkCancelledWaiters() {

Node t = firstWaiter;

Node trail = null;

while (t != null) {

Node next = t.nextWaiter;

if (t.waitStatus != Node.CONDITION) {

//出队操作,由于条件等待队列是个单向链表,所以只需要将t.nextWaiter=null即可完成出队操作

t.nextWaiter = null;

if (trail == null)

firstWaiter = next;

else

trail.nextWaiter = next;

if (next == null)

lastWaiter = trail;

}

else

trail = t;

t = next;

}

}

 

/**

* Updates state on barrier trip and wakes up everyone.

* Called only while holding lock.

*/

private void nextGeneration() {

// signal completion of last generation

//唤醒所有队列

trip.signalAll();

// set up next generation

//重置计数器

count = parties;

//新创建一个屏障

generation = new Generation();

}

 

/**

* Moves all threads from the wait queue for this condition to

* the wait queue for the owning lock.

*

* @throws IllegalMonitorStateException if {@link #isHeldExclusively}

*         returns {@code false}

*/

public final void signalAll() {

//判断是否为当前独占线程

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

Node first = firstWaiter;

if (first != null)

doSignalAll(first);

}

 

/**

* Removes and transfers all nodes.

* @param first (non-null) the first node on condition queue

*/

private void doSignalAll(Node first) {

//直接清空条件等待队列,设计精髓所在

lastWaiter = firstWaiter = null;

do {

//保存first的下一个节点

Node next = first.nextWaiter;

//first出单项链表

first.nextWaiter = null;

//转换到同步队列Sync并依次唤醒

transferForSignal(first);

//first指向下一个节点

first = next;

} while (first != null);

}

 

/**

* Transfers a node from a condition queue onto sync queue.

* Returns true if successful.

* @param node the node

* @return true if successfully transferred (else the node was

* cancelled before signal)

*/

final boolean transferForSignal(Node node) {

/*

* If cannot change waitStatus, the node has been cancelled.

*/

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

 

/*

* Splice onto queue and try to set waitStatus of predecessor to

* indicate that thread is (probably) waiting. If cancelled or

* attempt to set waitStatus fails, wake up to resync (in which

* case the waitStatus can be transiently and harmlessly wrong).

*/

//当前节点入队同步队列Sync,并返回node的前驱节点

Node p = enq(node);

int ws = p.waitStatus;

//CAS将node的前驱节点p的waitStatus修改为待唤醒

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return true;

}

 

/**

* Inserts node into queue, initializing if necessary. See picture above.

* @param node the node to insert

* @return node’s predecessor

*/

private Node enq(final Node node) {

//用for(;;)必须保证入队成功

for (;;) {

//t指向队尾

Node t = tail;

//如果队尾为空,说明队列为空

if (t == null) { // Must initialize

//CAS操作将head指向一个空Node,并将tail也指向空Node

if (compareAndSetHead(new Node()))

tail = head;

} else {

//如果队尾不为空,则将当前节点的前驱指向队尾

node.prev = t;

//CAS操作将tail指向当前节点

if (compareAndSetTail(t, node)) {

//由于是个双向链表,将t的后继节点指向当前节点

t.next = node;

return t;

}

}

}

}

 

await()源码也跟着走了一遍,以下是我根据源码画了一些关键节点的流程图,便于理解。

 

  1. 再来看一下我们的reset()方法,可以重置我们的计数器,并释放所有线程

 

/**

* Resets the barrier to its initial state.  If any parties are

* currently waiting at the barrier, they will return with a

* {@link BrokenBarrierException}. Note that resets <em>after</em>

* a breakage has occurred for other reasons can be complicated to

* carry out; threads need to re-synchronize in some other way,

* and choose one to perform the reset.  It may be preferable to

* instead create a new barrier for subsequent use.

*/

public void reset() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

//打破栅栏

breakBarrier();   // break the current generation

nextGeneration(); // start a new generation

} finally {

lock.unlock();

}

}

 

/**

* Sets current barrier generation as broken and wakes up everyone.

* Called only while holding lock.

*/

private void breakBarrier() {

//将是否打破标识置为true

generation.broken = true;

//重置计数器

count = parties;

//重置后,会唤醒之前阻塞的所有的线程

trip.signalAll();

}

到此为止,CyclicBarrier的两个重要的方法已分析完,其他的一些方法相对简单,就不在这儿分析了,大家有兴趣的可以去看下源码,也可以私信我进行探讨。

 

我们来总结一下:

说到底,底层为CyclicBarrier是基于ReentrantLock和Condition实现的。

 

CountDownLatch也可以实现一组线程等待至某个状态之后再全部同时执行的需求。

那到底CyclicBarrier与CountDownLatch有啥区别呢?

 

CountDownLatch的计数器只能使用一次,而CyclicBarrier可以通过reset()使用多次.

CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。

CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。

CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤 醒的,而CountDownLatch则是通过AQS的“共享锁”实现。

 

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注