Cyclibarrier is used for creating stages in a concurrent programms.
You can watch me live coding a cyclicbarrier on youtube.
In this blog, I am using Cyclibarrier to allow threads to calculate sum, then calculate the sum of the sums generated by all individual threads.
final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store)));
In the above code, a cyclicbarrier is created with number of barrier equal to numOfThreads and a runnable is provided to calculate the sum generated by all the threads.
private int sum(final int[] digits) { int sum = 0; for(int digit : digits) sum += digit; return sum; }
Each thread calculates the sum and calls the await function to signal that the work is done.
class RowSum implements Runnable{ final int[] forSum; final int[] store; final int storeIndex; public RowSum(final int[] forSum, final int[] store, final int storeIndex){ this.forSum = forSum; this.store = store; this.storeIndex = storeIndex; } @Override public void run(){ store[storeIndex] = sum(forSum); try{ System.out.println(Thread.currentThread().getName() + " has hit the barrier "); sumBarrier.await(); } catch(BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } }
Once all the threads have completed the work the runnable passed to the cyclicbarrier , this runnable prints the final sum.
Source Code :-
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class CyclicBarrierTest { private int sum(final int[] digits) { int sum = 0; for(int digit : digits) sum += digit; return sum; } final int numOfThreads = 10; final int[] store = new int[numOfThreads]; final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store))); class RowSum implements Runnable{ final int[] forSum; final int[] store; final int storeIndex; public RowSum(final int[] forSum, final int[] store, final int storeIndex){ this.forSum = forSum; this.store = store; this.storeIndex = storeIndex; } @Override public void run(){ store[storeIndex] = sum(forSum); try{ System.out.println(Thread.currentThread().getName() + " has hit the barrier "); sumBarrier.await(); } catch(BrokenBarrierException | InterruptedException ex) { ex.printStackTrace(); } } } public static void main(String... args) throws InterruptedException{ final CyclicBarrierTest obj = new CyclicBarrierTest(); final int[][] matrix = new int[obj.numOfThreads][]; int value = 1; for(int i = 0; i < obj.numOfThreads; ++i){ matrix[i] = new int[1_000]; for(int j = 0; j < 1_000; ++j) matrix[i][j] = value++; } final Thread[] workerThreads = new Thread[obj.numOfThreads]; for(int i = 0; i < obj.numOfThreads; ++i) workerThreads[i] = new Thread(obj.new RowSum(matrix[i], obj.store, i)); for(Thread worker : workerThreads) worker.start(); for(Thread worker : workerThreads) worker.join(); } }