September 11, 2020

Use Cyclibarrier In Java Programming Language

Cyclibarrier is used for creating stages in a concurrent programms.

You can watch me live coding a cyclicbarrier on youtube.

In this blog, I am using Cyclibarrier to allow threads to calculate sum, then calculate the sum of the sums generated by all individual threads.

final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store)));

In the above code, a cyclicbarrier is created with number of barrier equal to numOfThreads and a runnable is provided to calculate the sum generated by all the threads.

private int sum(final int[] digits) {
    int sum = 0;
for(int digit : digits) sum += digit;
return sum;
}

Each thread calculates the sum and calls the await function to signal that the work is done.

class RowSum implements Runnable{
    final int[] forSum;
final int[] store;
final int   storeIndex;

public RowSum(final int[] forSum, final int[] store, final int storeIndex){
    this.forSum = forSum;
    this.store = store;
    this.storeIndex = storeIndex;
}

@Override
public void run(){

    store[storeIndex] = sum(forSum);

    try{ 
        System.out.println(Thread.currentThread().getName() + " has hit the barrier ");
        sumBarrier.await();
    } catch(BrokenBarrierException | InterruptedException ex) {
        ex.printStackTrace();
    }

}
}

Once all the threads have completed the work the runnable passed to the cyclicbarrier , this runnable prints the final sum.

Source Code :-

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;

public class CyclicBarrierTest {
    
    private int sum(final int[] digits) {
        int sum = 0;
    for(int digit : digits) sum += digit;
    return sum;
    }
    
    final int numOfThreads = 10;
    final int[] store = new int[numOfThreads];

    final CyclicBarrier sumBarrier = new CyclicBarrier(numOfThreads, () -> System.out.println("Sum of The matrix -> " + sum(store)));

    class RowSum implements Runnable{
        final int[] forSum;
    final int[] store;
    final int   storeIndex;

    public RowSum(final int[] forSum, final int[] store, final int storeIndex){
        this.forSum = forSum;
        this.store = store;
        this.storeIndex = storeIndex;
    }

    @Override
    public void run(){

        store[storeIndex] = sum(forSum);

        try{ 
        System.out.println(Thread.currentThread().getName() + " has hit the barrier ");
            sumBarrier.await();
            } catch(BrokenBarrierException | InterruptedException ex) {
            ex.printStackTrace();
        }

    }
    }

    public static void main(String... args) throws InterruptedException{
        final CyclicBarrierTest obj = new CyclicBarrierTest();
    final int[][] matrix = new int[obj.numOfThreads][];

        int value = 1;
    for(int i = 0; i < obj.numOfThreads; ++i){
        matrix[i] = new int[1_000];
        for(int j = 0; j < 1_000; ++j) matrix[i][j] = value++;
    }

    final Thread[] workerThreads = new Thread[obj.numOfThreads];

    for(int i = 0; i < obj.numOfThreads; ++i) workerThreads[i] = new Thread(obj.new RowSum(matrix[i], obj.store, i));
    for(Thread worker : workerThreads) worker.start();
    for(Thread worker : workerThreads) worker.join();
    }

}