JMS - ActiveMQ Message Priority Example
Table of Contents
Priority levels are a powerful instrument on JMS messages which allow building robust applications where for example peak traffic will not block important messages (set with a higher priority) from getting through the queue.
The following post explains the basics of JMS priority and illustrates them with a code sample using ActiveMQ and Maven.
Setting JMS Message Priority Levels #
Message priority levels can be used to instruct the JMS provider to deliver urgent messages first. The message’s priority is contained in the JMSPriority header. There are ten levels of priority, ranging from 0 (lowest) to 9 (highest). If you do not specify a priority level, the default level is set to 4.
You can set the priority level in either of two ways:
- You can use the
setPriority()
method of theMessageProducer
interface to set the priority level for all messages sent by that producer. For example, the following call sets a priority level of ‘7’ for a producer:
producer.setPriority(7);
- You can use the long form of the
send()
or thepublish()
method to set the priority level for a specific message. The third argument sets the priority level. For example, the following send call sets the priority level for message to ‘3’:
producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 0);
Setting the priority directly on the JMS Message using the
setJMSPriority()
method of theMessage
interface does not work as in that case the priority of the producer is taken!
It is very interesting to have a look at what the Java Message Service 1.1 Specification has to say about priority and the different levels as it gives a better understanding of what can be achieved when used:
JMS defines a ten-level priority value, with 0 as the lowest priority and 9 as
the highest. In addition, clients should consider priorities 0-4 as gradations
of normal priority and priorities 5-9 as gradations of expedited priority.
JMS does not require that a provider strictly implement priority ordering of
messages; however, it should do its best to deliver expedited messages ahead of
normal messages.
When implementing JMS priority it is important to realize that correct configuration of the JMS provider and consumers/producers is key in getting higher-priority messages delivered before lower-priority ones. It is also important to note that the JMS specification does not mandate a provider to implement a strict priority ordering of messages.
For example on ActiveMQ, there are a number of settings that need to be made in order to support message priority. A typical example is lowering the consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages. However this sort of tradeoff can have significant performance implications, so always test your scenarios thoroughly.
Checkout following link if you would like to know more about how ActiveMQ message priorities work.
ActiveMQ Message Priority Example #
The below example uses Maven and assumes an ActiveMQ message broker is installed and up and running.
Tools used:
- ActiveMQ 5.15
- Maven 3.5
Let’s illustrate JMS priority by creating a simple producer with two different send()
methods. The first method will send a message to a queue with the default priority level and the second method will accept an additional parameter specifying the priority to be set on the message. We will then create a consumer to read the messages from the queue and observe in which order they are read.
First let’s look at the below Maven POM file which contains the needed dependencies for Logback, JUnit and Apache ActiveMQ.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.codenotfound</groupId>
<artifactId>jms-activemq-priority</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>JMS - ActiveMQ Message Priority Example</name>
<description>JMS - ActiveMQ Message Priority Example</description>
<url>https://www.codenotfound.com/jms-activemq-message-priority-example.html</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<logback.version>1.2.3</logback.version>
<slf4j.version>1.7.25</slf4j.version>
<junit.version>4.12</junit.version>
<activemq.version>5.15.2</activemq.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${ch.qos.logback.version}</version>
</dependency>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Next is the Producer
class which contains the two send()
methods: one which applies default priority and one which applies a custom priority. The class also contains two methods for opening/closing a connection to the message broker as well as a method for creating the message producer.
package com.codenotfound.jms.priority;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Producer {
private static final Logger LOGGER =
LoggerFactory.getLogger(Producer.class);
private Connection connection;
private Session session;
private MessageProducer producer;
public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}
public void closeConnection() throws JMSException {
connection.close();
}
public void createProducer(String queue) throws JMSException {
// Create a session for sending messages
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination to which messages will be sent
Destination destination = session.createQueue(queue);
// Create a MessageProducer for sending the messages
producer = session.createProducer(destination);
}
public void send(String text) throws JMSException {
TextMessage message = session.createTextMessage(text);
producer.send(message);
LOGGER.info("{} sent with default priority(=4)", text);
}
public void send(String text, int priority) throws JMSException {
TextMessage message = session.createTextMessage(text);
// Note: setting the priority directly on the JMS Message does not work
// as in that case the priority of the producer is taken
producer.send(message, DeliveryMode.PERSISTENT, priority, 0);
LOGGER.info("{} sent with priority={}", text, priority);
}
}
For receiving the messages, a Consumer
class is created with two methods for opening/closing a connection to the message broker. In addition, a method to create a message consumer for a specific queue and a method to receive a single message are available on the class.
Note that instead of using the default connection to the ActiveMQ broker, we have specified the
'messagePrioritySupported'
property in order to assure the message are prioritized on the consumer side.
package com.codenotfound.jms.priority;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Consumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(Consumer.class);
private Connection connection;
private Session session;
private MessageConsumer consumer;
public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
"tcp://0.0.0.0:61616?jms.messagePrioritySupported=true");
connection = connectionFactory.createConnection();
}
public void closeConnection() throws JMSException {
connection.close();
}
public void createConsumer(String queue) throws JMSException {
// Create a session for receiving messages
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination from which messages will be received
Destination destination = session.createQueue(queue);
// Create a MessageConsumer for receiving messages
consumer = session.createConsumer(destination);
// Start the connection in order to receive messages
connection.start();
}
public String receive(int timeout) throws JMSException {
// Read a message from the destination
Message message = consumer.receive(timeout);
// Cast the message to the correct type
TextMessage input = (TextMessage) message;
// Retrieve the message content
String text = input.getText();
LOGGER.info("{} received", text);
return text;
}
}
Last, a JUnit test class in which a first testSend()
test case will send three messages with default priority to a 'priority.q'
queue. It then verifies if the messages are read in first in, first out (FIFO) order.
A second testSendWithPriority()
test case which will send three messages with custom priorities where the last message gets the highest priority. In turn, it verifies if the messages are read in last in, first out (LIFO) order.
package com.codenotfound.jms.priority;
import static org.junit.Assert.assertEquals;
import javax.jms.JMSException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class ProducerTest {
private static Consumer consumer;
private static Producer producer;
private static String QUEUE = "priority.q";
@BeforeClass
public static void setUpBeforeClass() throws JMSException {
producer = new Producer();
producer.openConnection();
producer.createProducer(QUEUE);
consumer = new Consumer();
consumer.openConnection();
consumer.createConsumer(QUEUE);
}
@AfterClass
public static void tearDownAfterClass() throws JMSException {
producer.closeConnection();
consumer.closeConnection();
}
@Test
public void testSend() throws JMSException, InterruptedException {
producer.send("message1");
producer.send("message2");
producer.send("message3");
Thread.sleep(10000);
// Messages should be received FIFO as priority is the same for all
assertEquals("message1", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message3", consumer.receive(5000));
}
@Test
public void testSendWithPriority()
throws JMSException, InterruptedException {
producer.send("message1", 1);
producer.send("message2", 2);
producer.send("message3", 3);
Thread.sleep(10000);
// Messages should be received LIFO as priority=1 is lower than
// priority=3
assertEquals("message3", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message1", consumer.receive(5000));
}
}
Make sure an ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
mvn test
This will trigger Maven to run the above test case and results in the following log statements. Even though 'message1'
was sent first in both test cases, in the first test it is received first whereas in the second test it is received last because of the different assigned priority.
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.codenotfound.jms.priority.ProducerTest
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/code/local-repo/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/code/local-repo/org/apache/activemq/activemq-all/5.15.2/activemq-all-5.15.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
21:50:34.705 INFO [main][Producer] message1 sent with default priority(=4)
21:50:34.715 INFO [main][Producer] message2 sent with default priority(=4)
21:50:34.719 INFO [main][Producer] message3 sent with default priority(=4)
21:50:44.721 INFO [main][Consumer] message1 received
21:50:44.722 INFO [main][Consumer] message2 received
21:50:44.723 INFO [main][Consumer] message3 received
21:50:44.736 INFO [main][Producer] message1 sent with priority=1
21:50:44.742 INFO [main][Producer] message2 sent with priority=2
21:50:44.748 INFO [main][Producer] message3 sent with priority=3
21:50:54.749 INFO [main][Consumer] message3 received
21:50:54.750 INFO [main][Consumer] message2 received
21:50:54.750 INFO [main][Consumer] message1 received
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 20.622 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 22.719 s
[INFO] Finished at: 2018-01-04T21:50:54+01:00
[INFO] Final Memory: 19M/217M
[INFO] ------------------------------------------------------------------------
By browsing the 'priority.q'
using the ActiveMQ console we can verify the JMSPriority
header that was set on the messages sent by the above test cases. The testSend()
test case will create three messages a shown below, each with priority set to 4.
The second testSendWithPriority()
test case results in three messages, each with a different priority ranging from 1 till 3.
This concludes the example on how to use JMS message priority using ActiveMQ. If you found this post helpful or have any questions or remarks, please leave a comment.