Monday, December 28, 2015

Producer Consumer using BlockingQueue


A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or if you try to enqueue items to it and the queue is already full.
A thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue. A thread trying to enqueue an item in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more items or clearing the queue completely.

take() method of BlockingQueue will block if Queue is empty and thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue.

put() method of BlockingQueue will block if Queue is full and thread trying to enqueue an item in a full queue is blocked until some other thread makes space in the queue.


A BlockingQueue with one thread putting into it, and another thread taking from it.

Java BlockingQueue doesn’t accept null values and throw NullPointerException if you try to store null value in the queue.

Java provides several BlockingQueue implementations such as ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue etc.
Actual Working of Blocking Queue
A thread trying to dequeue from an empty queue is blocked until some other thread inserts an item into the queue. A thread trying to enqueue an item in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more items or clearing the queue completely.

Important Methods to remember

/*
OFFER()
Add message in queue without waiting if queue has    space and return false if queue is full- No Exception is thrown queue.offer(msg);
Add()
Add message to queue till the time queue has space.
Throws Exception when queue is full queue.add(msg);
PUT()
Add message to queue and wait if queue is full. As soon as queue has space, it starts putting those elements in queue again
*/

Below Program for understanding implementation of BlockingQueue using ArrayBlockingQueue implementation. 

package com.concurrent.jdk5;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Message {
     String message;

     public Message(String str) {
          this.message = str;
     }

     public String getMessage() {
          return message;
     }

}

class Producer implements Runnable {

     BlockingQueue<Message> queue;

     public Producer(BlockingQueue<Message> queue) {
          this.queue = queue;
     }

     @Override
     public void run() {
          for (int i = 0; i < 100; i++) {

              /* Addding Message in Queue and then sleep */
              try {
                   Message msg = new Message("" + i);
                   Thread.sleep(i);
                  
                   queue.put(msg);
                   System.out.println("Produced "+msg.getMessage());
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
          }   
              //adding exit message
             Message msg = new Message("EXIT");
             try {
                 queue.put(msg);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         
     }

}

class Consumer implements Runnable {

     BlockingQueue<Message> queue;

     public Consumer(BlockingQueue<Message> queue) {
          this.queue = queue;
     }

     @Override
     public void run() {
          try {
              Message msg;
              while(!((msg=queue.take()).getMessage().equals("EXIT"))){
                   Thread.sleep(10);
                   System.out.println("Message Consumed " + msg.getMessage());
              }
              System.out.println("All Messages Produced , Consumed and Finished");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
     }

}

public class BlockingQueueImpl {

     public static void main(String[] args) throws InterruptedException {
          BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
          Thread producer = new Thread(new Producer(queue));
          Thread consumer = new Thread(new Consumer(queue));
          producer.start();
          consumer.start();
          producer.join();
          consumer.join();
     }
}

Output
Produced 0
Produced 1
Produced 2
Produced 3
Message Consumed 0
Produced 4
Produced 5
Message Consumed 1
--
--
Produced 96
Message Consumed 96
Produced 97
Message Consumed 97
Produced 98
Message Consumed 98
Produced 99
Message Consumed 99
All Messages Produced , Consumed and Finished 

References

http://www.journaldev.com/1034/java-blockingqueue-example

No comments:

Post a Comment