JMS using threads
Ques:- How to use multiple producers and multiple consumers using threads?
Answer:- Here in the below example, we will create multiple producers and multiple consumers by implementing runnable interface.
System Requirements:-
- Eclipse Editor or any other.
- JDK 1.7.0_03
- Required jars(activemq-all-5.4.3.jar).
- Apache-activemq-5.4.3
Note: - Apache
Active MQ Setup is required for the execution of this example. For doing the Active
MQ Setup please follow the below link:-
Steps for creating Eclipse java
project for implementing Core JMS using Apache ActiveMQ with mutlithreading:-
- Create a java project named JMSUsingActiveMQUsingThreads.
- Create a package names com.gaurav.jms.activemq.threads in the src directory
- Create an ActiveMQMessageProducerThread.java in the above specified package.
package com.gaurav.jms.activemq.threads;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQMessageProducerThread implements
Runnable {
@Override
public void
run() {
try {
// Creating
a connection factory for ActiveMQ
ActiveMQConnectionFactory
activeMQConFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
// Creating
a connection
Connection
con = activeMQConFactory.createConnection();
con.start();
// Creating
a session;
Session
session = con
.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Creating
a destination using Topic or Queue
Destination
dest = session.createQueue("TestWelcomeActiveMQQueue");
// creating
a MessageProducer using the session to the topic or
// queue.
MessageProducer
msgProducer = session.createProducer(dest);
msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Creating
a message for sending in the queue
String
strMessage = "WELCOME GAURAV BY "
+
Thread.currentThread().getName();
TextMessage
txtMessage = session.createTextMessage(strMessage);
System.out.println("Sent
message : " + strMessage.hashCode()
+
" : " + Thread.currentThread().getName());
msgProducer.send(txtMessage);
// closing
the resources
msgProducer.close();
session.close();
con.close();
} catch
(Exception e) {
System.out.println("Exception
thrown : " + e);
e.printStackTrace();
}
}
}
/* NON_PERSISTENT means no need for database specific persistent
*/
- Create an ActiveMQMessageConsumerThread.java in the above specified package.
package com.gaurav.jms.activemq.threads;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQMessageConsumerThread implements
Runnable,
ExceptionListener
{
@Override
public void
run() {
try {
// Creating
a connection factory for ActiveMQ
ActiveMQConnectionFactory
activeMQConFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
// Creating
a connection
Connection
con = activeMQConFactory.createConnection();
con.start();
// Creating
a session;
Session
session = con
.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Creating
a destination using Topic or Queue
Destination
dest = session.createQueue("TestWelcomeActiveMQQueue");
// creating
a MessageProducer using the session to the topic or
// queue.
MessageConsumer
msgConsumer = session.createConsumer(dest);
Message
message = msgConsumer.receive(5000);
if
(message instanceof TextMessage) {
TextMessage
txtMessage = (TextMessage) message;
String
strMessage = txtMessage.getText();
System.out.println("Received
Message from queue is : "
+
strMessage);
} else
{
System.out.println("Received
: " + message);
}
// closing
the resources
msgConsumer.close();
session.close();
con.close();
} catch
(Exception e) {
System.out.println("Exception
thrown : " + e);
e.printStackTrace();
}
}
@Override
public synchronized
void onException(JMSException jmsEx) {
System.out.println("JMS
Exception Occured, So closing the client ");
}
}
- Create an ActiveMQProducerConsumerCaller.java in the above specified package.
package com.gaurav.jms.activemq.threads;
public class ActiveMQProducerConsumerCaller {
public static
void main(String args[]) throws Exception {
executeThread(new
ActiveMQMessageProducerThread(), false);
executeThread(new
ActiveMQMessageProducerThread(), false);
executeThread(new
ActiveMQMessageProducerThread(), false);
executeThread(new
ActiveMQMessageProducerThread(), false);
executeThread(new
ActiveMQMessageProducerThread(), false);
Thread.sleep(3000);
executeThread(new
ActiveMQMessageConsumerThread(), false);
executeThread(new
ActiveMQMessageConsumerThread(), false);
executeThread(new
ActiveMQMessageConsumerThread(), false);
executeThread(new
ActiveMQMessageConsumerThread(), false);
executeThread(new
ActiveMQMessageConsumerThread(), false);
Thread.sleep(3000);
}
public static
void executeThread(Runnable runnable, boolean daemonThreadFlag) {
Thread thread = new
Thread(runnable);
thread.setDaemon(daemonThreadFlag);
thread.start();
}
}
- Execute the ActiveMQProducerConsumerCaller.java by selecting the option Run as Java Application.
Result of ActiveMQProducerConsumerCaller.java
successful execution.
Sent message : 1075196385 : Thread-1
Sent message : 1075196388 : Thread-4
Sent message : 1075196387 : Thread-3
Sent message : 1075196386 : Thread-2
Sent message : 1075196384 : Thread-0
Received Message from queue is : WELCOME GAURAV BY
Thread-1
Received Message from queue is : WELCOME GAURAV BY
Thread-2
Received Message from queue is : WELCOME GAURAV BY
Thread-4
Received Message from queue is : WELCOME GAURAV BY
Thread-3
Received Message from queue is : WELCOME GAURAV BY
Thread-0
Great ! Much helpful ! Thank you.
ReplyDelete