Thread – Part 2

Synchronizers

A synchronizer is any object that determines whether threads arriving at it should be forced to wait or allowed to proceed based on its state. Examples of synchronizers are semaphores, barriers, and latches.

CountDownLatch

A latch acts just like a gate. When it is closed, it will not allow any thread to pass. But when it is open, it will allow all threads to pass. The gate will open only when the latch reaches a certain state (the terminal state) and till then it will remain closed. Once the latch reaches that terminal state, it remains open forever.

CountDownLatch is the commonly used latch implementation. When we create a CountDownLatch, we pass a positive number. This is used as a counter. This counter indicates how many events must happen to open the gate. When an event happens, we call the countDown() method to decrease the counter value by 1. When this counter reaches 0, the gate opens.

If we pass a negative value while creating the CountDownLatch, at runtime it will throw an IllegalArgumentException.

Let’s consider one interview process. Assume this interview consists of a technical round, a managerial round and a HR round. Only when all three rounds are clear, a decision will be made whether to select or not. So this decision making step must wait until all three rounds are done.

In our code, each round will be represented by individual threads. At the end of the run() method of each thread, we will call the countDown() method to indicate that the round is over. Thread.sleep() will be used to imitate the time taken for the interview process.

So, first create a Runnable implementation.

public class InterviewRound implements Runnable {
    private final String round;
    private final CountDownLatch latch;

    public InterviewRound(String round, CountDownLatch latch) {
        this.round = round;
        this.latch = latch;
    }

    public void run() {
        System.out.println("Conducting " + round + " round...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        latch.countDown();
    }
}

Now, create a CountDownLatch with count 3 (same as total interview rounds). Then Start 3 rounds and wait for all those rounds to be completed.

public class ThreadTester {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        new Thread(new InterviewRound("technical", latch)).start();
        new Thread(new InterviewRound("managerial", latch)).start();
        new Thread(new InterviewRound("HR", latch)).start();
        latch.await();
        System.out.println("Time to take the decision.");
    }
}

Output:

Conducting technical round...
Conducting managerial round...
Conducting HR round...
Time to take the decision.

As you can see from the output, the await() method waits for the completion of all three rounds. Once all the three rounds are completed, latch opens and prints the message: Time to take the decision.

Semaphore

A Semaphore is used to limit the number of threads that want to access a shared resource. To achieve this, Semaphore uses a counter that defines at most how many threads can access the shared resource. When we create a Semaphore, we pass this counter.

Consider this counter as a permit. 

  1. If the count of permits is greater than zero, then a thread can acquire a permit by calling acquire() method. This will decrease the available permit by 1.
  2. If no permit is available, acquire() method blocks until one is available.
  3. When a thread is done with its activity, it can release the permit. This will increase the available permit by 1.

Let’s consider an interview panel where only 2 panelists are available and there are 5 candidates. So, initially 2 candidates can enter for the interview and others must wait. Once an interview is over, another candidate can enter for the interview and it goes on like this until all 5 candidates are interviewed.

In our code, we will create separate threads for separate interviews. Before starting an interview, we will call the acquire() method to acquire the permit. Once the interview is over, we’ll release the permit to allow another candidate to come in for the interview. Thread.sleep() will be used to imitate the time taken for the interview process.

public class Interview implements Runnable {
    private final String candidate;
    private final Semaphore sem;

    public Interview(String candidate, Semaphore sem) {
        this.candidate = candidate;
        this.sem = sem;
    }

    @Override
    public void run() {
        System.out.println(LocalTime.now() + " : Starting interview of : " + candidate);
        try {
            Thread.sleep(5000);
            sem.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadTester {
    public static void main(String[] args) throws InterruptedException {
        Semaphore sem = new Semaphore(2);
        for (int i = 0; i < 5; i++) {
            String candidateId = "Candidate-" + (i+1);
            sem.acquire();
            new Thread(new Interview(candidateId, sem)).start();
        }
    }
}

Output:

11:57:40.705 : Starting interview of : Candidate-1
11:57:40.705 : Starting interview of : Candidate-2
11:57:45.730 : Starting interview of : Candidate-3
11:57:45.730 : Starting interview of : Candidate-4
11:57:50.740 : Starting interview of : Candidate-5

As you can see –

  1. First 2 candidates are interviewed at 11:57:40
  2. Once those interviews are over, next 2 candidates are interviewed at 11:57:45
  3. For the last candidate, the interview started at 11:57:50
CyclicBarrier

We have seen latches which enable waiting for a group of activities to be completed. Latches can be used only once. Once a latch is open, it cannot be closed. 

CyclicBarrier is quite similar to lathes, but it can be reused multiple times. When we create a CyclicBarrier, we specify the number of threads that must wait for each other before continuing execution. When a thread completes its execution, it calls the await() method and waits for other threads. Once all the threads call the await() method, that indicates all the threads have completed their execution and only then the CyclicBarrier gives the way for threads to proceed.

To understand this, let’s consider one interview process. Assume this interview consists of a technical round, a managerial round and a HR round. Even if one round is over, the interviewer must wait for the other two rounds to be completed. Only when all three rounds are over, then only interviewers can proceed and take a decision whether to select or not. So before taking the final decision, all three interviewers must wait for each other.

In our code, each round will be represented by individual threads. At the end of the run() method of each thread, we will call the await() method to wait for other threads. Thread.sleep() will be used to imitate the time taken for the interview process. Also, we will create a Runnable implementation to define what needs to be done once all the three rounds are done and the wait is over.

public class AfterCompletion implements Runnable {  
    @Override  
    public void run() {  
        System.out.println(LocalTime.now() + " - All rounds are done");
    }  
}
public class InterviewRound implements Runnable {
    private final String round;
    private final CyclicBarrier barrier;

    public InterviewRound(String round, CyclicBarrier barrier) {
        this.round = round;
        this.barrier = barrier;
    }

    public void run() {
        System.out.println("Conducting " + round + " round...");
        try {
            Thread.sleep(5000);
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadTester {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(3, new AfterCompletion());
        System.out.println(LocalTime.now() + " - Starting interview process");
        new Thread(new InterviewRound("technical", barrier)).start();
        new Thread(new InterviewRound("managerial", barrier)).start();
        new Thread(new InterviewRound("HR", barrier)).start();
    }
}

Output:

15:52:27.149 - Starting interview process
Conducting technical round...
Conducting HR round...
Conducting managerial round...
15:52:32.170 - All rounds are done

As you can see from the output, all the 3 threads wait for each other and once all three rounds are done, our code prints the message: All rounds are done.

That’s it for now. Hope you have enjoyed this tutorial. If you have any doubt, please ask in the comment section. I will try to answer that as soon as possible. Till then, bye bye.