Cyclibarrier is used for creating stages in a concurrent programms.
You can watch me live coding a cyclicbarrier on youtube.
[embed]https://youtu.be/zLi6utR20GQ[/embed]
In this blog, I am using Cyclibarrier to allow threads to calculate sum, then calculate the sum of the sums generated by all individual threads.
final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store)));
In the above code, a cyclicbarrier is created with number of barrier equal to numOfThreads and a runnable is provided to calculate the sum generated by all the threads.
private int sum(final int[] digits) {
int sum = 0;
for(int digit : digits) sum += digit;
return sum;
}
Each thread calculates the sum and calls the await function to signal that the work is done.
class RowSum implements Runnable{
final int[] forSum;
final int[] store;
final int storeIndex;
public RowSum(final int[] forSum, final int[] store, final int storeIndex){
this.forSum = forSum;
this.store = store;
this.storeIndex = storeIndex;
}
@Override
public void run(){
store[storeIndex] = sum(forSum);
try{
System.out.println(Thread.currentThread().getName() + " has hit the barrier ");
sumBarrier.await();
} catch(BrokenBarrierException | InterruptedException ex) {
ex.printStackTrace();
}
}
}
Once all the threads have completed the work the runnable passed to the cyclicbarrier , this runnable prints the final sum.
Source Code :-
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
public class CyclicBarrierTest {
private int sum(final int[] digits) {
int sum = 0;
for(int digit : digits) sum += digit;
return sum;
}
final int numOfThreads = 10;
final int[] store = new int[numOfThreads];
final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store)));
class RowSum implements Runnable{
final int[] forSum;
final int[] store;
final int storeIndex;
public RowSum(final int[] forSum, final int[] store, final int storeIndex){
this.forSum = forSum;
this.store = store;
this.storeIndex = storeIndex;
}
@Override
public void run(){
store[storeIndex] = sum(forSum);
try{
System.out.println(Thread.currentThread().getName() + " has hit the barrier ");
sumBarrier.await();
} catch(BrokenBarrierException | InterruptedException ex) {
ex.printStackTrace();
}
}
}
public static void main(String... args) throws InterruptedException{
final CyclicBarrierTest obj = new CyclicBarrierTest();
final int[][] matrix = new int[obj.numOfThreads][];
int value = 1;
for(int i = 0; i < obj.numOfThreads; ++i){
matrix[i] = new int[1_000];
for(int j = 0; j < 1_000; ++j) matrix[i][j] = value++;
}
final Thread[] workerThreads = new Thread[obj.numOfThreads];
for(int i = 0; i < obj.numOfThreads; ++i) workerThreads[i] = new Thread(obj.new RowSum(matrix[i], obj.store, i));
for(Thread worker : workerThreads) worker.start();
for(Thread worker : workerThreads) worker.join();
}
}