Sunday, 10 November 2013

Core JMS using ActiveMQ and Multithreading

JMS using threads

Ques:- How to use multiple producers and multiple consumers using threads?


Answer:-  Here in the below example, we will create multiple producers and multiple consumers by implementing  runnable interface.

System Requirements:-

  •       Eclipse Editor or any other.

  •       JDK 1.7.0_03

  •       Required jars(activemq-all-5.4.3.jar).       

  •       Apache-activemq-5.4.3

Note: - Apache Active MQ Setup is required for the execution of this example. For doing the Active MQ Setup please follow the below link:-


Steps for creating Eclipse java project for implementing Core JMS using Apache ActiveMQ with mutlithreading:-


  • Create a java project named JMSUsingActiveMQUsingThreads.
  • Create a package names com.gaurav.jms.activemq.threads in the src directory
 



  • Create an ActiveMQMessageProducerThread.java in the above specified package.



package com.gaurav.jms.activemq.threads;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQMessageProducerThread implements Runnable {
      @Override
      public void run() {
            try {
                  // Creating a connection factory for ActiveMQ
                  ActiveMQConnectionFactory activeMQConFactory = new ActiveMQConnectionFactory(
                              "tcp://localhost:61616");

                  // Creating a connection
                  Connection con = activeMQConFactory.createConnection();
                  con.start();

                  // Creating a session;
                  Session session = con
                              .createSession(false, Session.AUTO_ACKNOWLEDGE);

                  // Creating a destination using Topic or Queue
                  Destination dest = session.createQueue("TestWelcomeActiveMQQueue");

                  // creating a MessageProducer using the session to the topic or

                  // queue.
                  MessageProducer msgProducer = session.createProducer(dest);
                  msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                  // Creating a message for sending in the queue
                  String strMessage = "WELCOME GAURAV BY "
                              + Thread.currentThread().getName();
                  TextMessage txtMessage = session.createTextMessage(strMessage);

                  System.out.println("Sent message : " + strMessage.hashCode()
                              + " : " + Thread.currentThread().getName());
                  msgProducer.send(txtMessage);

                  // closing the resources
                  msgProducer.close();
                  session.close();
                  con.close();

            } catch (Exception e) {
                  System.out.println("Exception thrown : " + e);
                  e.printStackTrace();
            }
      }

}

/* NON_PERSISTENT means no need for database specific persistent */


  • Create an ActiveMQMessageConsumerThread.java in the above specified package.


package com.gaurav.jms.activemq.threads;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQMessageConsumerThread implements Runnable,
            ExceptionListener {
      @Override
      public void run() {
            try {
                  // Creating a connection factory for ActiveMQ
                  ActiveMQConnectionFactory activeMQConFactory = new ActiveMQConnectionFactory(
                              "tcp://localhost:61616");

                  // Creating a connection
                  Connection con = activeMQConFactory.createConnection();
                  con.start();

                  // Creating a session;
                  Session session = con
                              .createSession(false, Session.AUTO_ACKNOWLEDGE);

                  // Creating a destination using Topic or Queue
                  Destination dest = session.createQueue("TestWelcomeActiveMQQueue");

                  // creating a MessageProducer using the session to the topic or

                  // queue.
                  MessageConsumer msgConsumer = session.createConsumer(dest);

                  Message message = msgConsumer.receive(5000);

                  if (message instanceof TextMessage) {
                        TextMessage txtMessage = (TextMessage) message;
                        String strMessage = txtMessage.getText();
                        System.out.println("Received Message from queue is : "
                                    + strMessage);
                  } else {
                        System.out.println("Received : " + message);
                  }

                  // closing the resources
                  msgConsumer.close();
                  session.close();
                  con.close();

            } catch (Exception e) {
                  System.out.println("Exception thrown : " + e);
                  e.printStackTrace();
            }
      }

      @Override
      public synchronized void onException(JMSException jmsEx) {
            System.out.println("JMS Exception Occured, So closing the client ");

      }
}


  • Create an ActiveMQProducerConsumerCaller.java in the above specified package.

package com.gaurav.jms.activemq.threads;

public class ActiveMQProducerConsumerCaller {
      public static void main(String args[]) throws Exception {
            executeThread(new ActiveMQMessageProducerThread(), false);
            executeThread(new ActiveMQMessageProducerThread(), false);
            executeThread(new ActiveMQMessageProducerThread(), false);
            executeThread(new ActiveMQMessageProducerThread(), false);
            executeThread(new ActiveMQMessageProducerThread(), false);
            Thread.sleep(3000);
            executeThread(new ActiveMQMessageConsumerThread(), false);
            executeThread(new ActiveMQMessageConsumerThread(), false);
            executeThread(new ActiveMQMessageConsumerThread(), false);
            executeThread(new ActiveMQMessageConsumerThread(), false);
            executeThread(new ActiveMQMessageConsumerThread(), false);

            Thread.sleep(3000);
      }

      public static void executeThread(Runnable runnable, boolean daemonThreadFlag) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(daemonThreadFlag);
            thread.start();
      }
}

  • Execute the ActiveMQProducerConsumerCaller.java by selecting the option Run as Java Application.


Result of ActiveMQProducerConsumerCaller.java successful execution.

Sent message : 1075196385 : Thread-1
Sent message : 1075196388 : Thread-4
Sent message : 1075196387 : Thread-3
Sent message : 1075196386 : Thread-2
Sent message : 1075196384 : Thread-0
Received Message from queue is : WELCOME GAURAV BY Thread-1
Received Message from queue is : WELCOME GAURAV BY Thread-2
Received Message from queue is : WELCOME GAURAV BY Thread-4
Received Message from queue is : WELCOME GAURAV BY Thread-3
Received Message from queue is : WELCOME GAURAV BY Thread-0
 



1 comment: