Wednesday, 15 May 2013

Producer-Consumer problem in Java



Producer Consumer Design Pattern 

 Question :- What is Producer-Consumer problem?


Answer :- The producer–consumer problem (also known as the bounded-buffer problem) is a traditional concurrency problem or example of a multi-process synchronization problem. The problem is producer generates data and place data into buffer continuously. At the same time, the consumer is consuming the data continuously from the buffer. So we have to make sure that producer won’t try to add data into buffer if it is full and the consumer won’t try to remove data from buffer when it is empty.  

 

Producer Consumer problem can be resolved by using below two approaches.
  •  By Implementing Producer Consumer Design Pattern with wait() and notify()

  • By Implementing Producer Consumer Design Pattern with Blocking Queue

Producer Consumer Design pattern helps to reduce coupling between Producer and Consumer by separating their work category on the basis of work execution. The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer.

Advantage of using Producer Consumer Design Pattern
  • Producer and Consumer is independent of each other. Producer is not interseted to know about consumer, it means who is consumer and how many consumers are available for that producer is not interested to know?
  • They(Producer and Consumer) can work with different pace, if consumer is having less speed in consuming the produced data then it is possible to introduce other consumer.
  • The producer's job is to generate a piece of data, and place it into the buffer and start again to produce. At the same time, the consumer is consuming the data (i.e., removing it from the buffer) one piece at a time.
  • This design pattern provide a nice readability and maintainability as producer and consumer codes are developed and maintained separately.
  •  Independently, we can develop producer and consumer code just they need to know about the shared object.

By the Implemention of Producer Consumer Design Pattern with wait() and notify()

Example to show how to use Producer Consumer Design Pattern with wait() and notify()

Producer.java 

package com.gaurav.corejave.multithreading;

class Producer extends Thread {
    private SharedObject sharedObject;

    public Producer(SharedObject sharedObj) {
        sharedObject = sharedObj;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            sharedObject.produce(i);
            System.out.println("PRODUCER PRODUCED # : " + i);
            try {
                sleep((int) (Math.random() * 100));
            } catch (InterruptedException ite) {
                System.out.println("THE EXECPTION IN THE PRODUCER CLASS IS : - "+ite.getMessage());
            }
        }
    }

Consumer .java

package com.gaurav.corejave.multithreading;

class Consumer extends Thread {
    private SharedObject sharedObject;

    public Consumer(SharedObject sharedObj) {
        sharedObject = sharedObj;
    }

    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            value = sharedObject.consume();
            System.out.println("CONSUMER CONSUMED # : " + value);
        }
    }
}

SharedObject.java

package com.gaurav.corejave.multithreading;

import java.util.logging.Level;
import java.util.logging.Logger;

class SharedObject {
       private int naturalNumber;
       private boolean sharedObjectAvailability = false;
       public synchronized int consume() {
          while (sharedObjectAvailability == false) {
             try {
                wait();
             }
             catch (InterruptedException ite) {
                 Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ite);
             }
          }
          sharedObjectAvailability = false;
          notifyAll();
          return naturalNumber;
       }
       public synchronized void produce(int numericValue) {
          while (sharedObjectAvailability == true) {
             try {
                wait();
             }
             catch (InterruptedException ite) {
                 Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ite);
             }
          }
          naturalNumber = numericValue;
          sharedObjectAvailability = true;
          notifyAll();
       }
    }

ProducerConsumerTest.java

package com.gaurav.corejave.multithreading;

public class ProducerConsumerTest {
       public static void main(String[] args) {
          SharedObject sharedObject = new SharedObject();
          Producer producer = new Producer(sharedObject);
          Consumer consumer = new Consumer(sharedObject);
          producer.start();
          consumer.start();
       }
    }

Result:-

PRODUCER PRODUCED # : 0
CONSUMER CONSUMED # : 0
PRODUCER PRODUCED # : 1
CONSUMER CONSUMED # : 1
PRODUCER PRODUCED # : 2
CONSUMER CONSUMED # : 2
PRODUCER PRODUCED # : 3
CONSUMER CONSUMED # : 3
PRODUCER PRODUCED # : 4
CONSUMER CONSUMED # : 4
PRODUCER PRODUCED # : 5
CONSUMER CONSUMED # : 5
PRODUCER PRODUCED # : 6
CONSUMER CONSUMED # : 6
PRODUCER PRODUCED # : 7
CONSUMER CONSUMED # : 7
PRODUCER PRODUCED # : 8
CONSUMER CONSUMED # : 8
PRODUCER PRODUCED # : 9
CONSUMER CONSUMED # : 9


By the Implemention of
Producer Consumer Design Pattern with Blocking Queue
  • With the introduction of BlockingQueue in java it’s now become easier to implement this pattern. BlockingQueue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue, both implement FIRST IN FIRST OUT order.  
  • The producer consumer design pattern describes two processes, the producer and consumer, who share a common fixed size buffer used as a blocking queue. 
  • BlockingQueue supports operations that wait for the queue to become empty when inserting an element, and also wait for the space to become available in the queue for storing an element. When we try to insert an element in a queue which is already full then for that we can also specify the wait time period limit.

Example to show how to use Producer Consumer Design Pattern with Blocking Queue.  

ProducerInBlockingQueue.java   

package com.gaurav.corejave.multithreading;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class ProducerInBlockingQueue extends Thread {
    BlockingQueue<String> blockingQueue;

    ProducerInBlockingQueue() {
        blockingQueue = new ArrayBlockingQueue<String>(3);
    }

    public void run() {
        for (int i = 0; i <= 10; i++) {
            try {
                System.out.println("PRODUCED # " + i);
                blockingQueue.put("" + i);
                Thread.sleep(1);
            } catch (InterruptedException ite) {
                System.out.println("THE EXCEPTION IN PRODUCER CLASS IS : - "+ite.getMessage());
            }
        }
    }
}

ConsumerInBlockingQueue.java

package com.gaurav.corejave.multithreading;

import java.util.concurrent.TimeUnit;

class ConsumerInBlockingQueue extends Thread {
    ProducerInBlockingQueue producerInBlockingQueue;

    ConsumerInBlockingQueue(ProducerInBlockingQueue prodInBlockingQueue) {
        this.producerInBlockingQueue = prodInBlockingQueue;
    }

    public void run() {
        for (int i = 0; i <= 10; i++) {
            try {
                System.out.println("CONSUMED # "
                        + producerInBlockingQueue.blockingQueue.poll(5,
                                TimeUnit.SECONDS));
                Thread.sleep(2000);
            } catch (InterruptedException ite) {
                System.out.println("THE EXCEPTION IN CONSUMER CLASS IS : - "+ite.getMessage());
            }
        }
    }
}


ProducerConsumerInBlockingQueue.java

package com.gaurav.corejave.multithreading;

public class ProducerConsumerInBlockingQueue {

    public static void main(String[] args) {
        ProducerInBlockingQueue object1 = new ProducerInBlockingQueue();
        ConsumerInBlockingQueue object2 = new ConsumerInBlockingQueue(object1);
        Thread thread1 = new Thread(object1);
        Thread thread2 = new Thread(object2);
        thread1.start();
        thread2.start();
    }
}

Result:-

PRODUCED # 0
CONSUMED # 0
PRODUCED # 1
PRODUCED # 2
PRODUCED # 3
PRODUCED # 4
CONSUMED # 1
PRODUCED # 5
CONSUMED # 2
PRODUCED # 6
CONSUMED # 3
PRODUCED # 7
CONSUMED # 4
PRODUCED # 8
CONSUMED # 5
PRODUCED # 9
CONSUMED # 6
PRODUCED # 10
CONSUMED # 7
CONSUMED # 8
CONSUMED # 9
CONSUMED # 10
 

No comments:

Post a Comment