First steps with messaging in Java
In this article, I will introduce some messaging concepts and try to emphasize some insights which are important for beginners. At the end of this article I will show a short example with ActiveMQ.
Types of messaging
Communication among processes can be synchronous or asynchronous and persistent or transient. If we want to build a complex distributed system, we have to use various combinations of communication. The rest of this article is focused on JMS, which is an interface of Message-Oriented Middleware and can be typically used for asynchronous persistent communication. In this short article you can see basic information about MOM and differences among other types of communication.
Typically, a sender (producer) program creates a message and pushes it to a queue and a receiver (consumer) program gets the message from the queue and processes it. Producer and consumer don’t know about their existence. Message queues are typically provided by Message Brokers. A Message Broker is a standalone application (service) that other applications connect to and send/receive messages. A Message Broker is responsible for storing messages until a receiver receives them. A Message Broker can route messages across machines to deliver a message to the destination application and can try delivering the message until the receiver correctly handles it. [1] Message broker as a message transfer agent is an extra component in the architecture and is the primary disadvantage of many message-oriented middleware systems [2].
ActiveMQ, HornetQ, RabbitMQ, ZeroMQ or blablaMQ?
We need to build asynchronous communication between two programs. Which library and implementation of Message Broker should we use? Do we really need a message broker or JMS compliant solution? ActiveMQ, HornetQ and RabbitMQ are message brokers and offer support for enterprise integration patterns. Basically all brokers are highly scalable, robust and reliable with some limitations. ZeroMq is a library to create distributed and concurrent applications. The ZeroMQ API is similar to the low level Socket API for communication over networks and also suitable for in-process asynchronous programming. Over the internet you can find some articles dealing with comparison of message brokers like this or Kuntal Ganguly’s comparison. I like the most Thomas Bayer’s comparison, where everything important is highlighted. Before you choose your …MQ solution, I recommend to you read this article.
Point-to-point vs Publish/Subscribe model
When we choose our messaging library and message broker, we should decide what kind of JMS destination is suitable. JMS destination is an object that represents the target of messages that the client produces and the source of messages that the client consumes. Queue represents a point-to-point model, where a single message is received by exactly one consumer and messages have to be delivered in the order sent. Topic represents a publish and subscribe model, where zero to multiple clients can subscribe to the message and there is no guarantee for the order of delivery from the messages sent. [3]
Messaging anti-patterns
We chose a library, message broker, destination and now we can build a message system, right? No, we can’t. If we don’t have any experience with messaging, we should know about common messaging anti-patterns. We don’t find them in documentations and should keep them in mind. Dejan Bosanac wrote great articles about messaging anti-patterns. I think the first is the most important, where you can find out how you shouldn’t use destinations. The second part is about managing connections and the third about message types.
Sources
[1] DotNetMQ: A Complete Message Queue System for .NET
[2] Message-oriented middleware
[3] JMS Queue – Difference between a Queue and a Topic
Example
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory | |
# use the following property to configure the default connector | |
java.naming.provider.url = tcp://localhost:61616 | |
# register some topics in JNDI using the form | |
# topic.[jndiName] = [physicalName] | |
topic.MyTopic = example.MyTopic |
package activemq.test; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Map; | |
import org.apache.activemq.api.core.TransportConfiguration; | |
import org.apache.activemq.core.config.Configuration; | |
import org.apache.activemq.core.config.impl.ConfigurationImpl; | |
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory; | |
import org.apache.activemq.core.server.ActiveMQServer; | |
import org.apache.activemq.core.server.ActiveMQServers; | |
public class Server { | |
public static void main(final String arg[]) throws Exception { | |
try { | |
// Step 1. Create the Configuration, and set the properties | |
// accordingly | |
Configuration configuration = new ConfigurationImpl(); | |
// we only need this for the server lock file | |
configuration.setJournalDirectory("target/data/journal"); | |
configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html | |
configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html | |
/** | |
* this map with configuration values is not necessary (it | |
* configures the default values). If you want to modify it to run | |
* the example in two different hosts, remember to also modify the | |
* client's Connector at {@link EmbeddedRemoteExample}. | |
*/ | |
Map<String, Object> map = new HashMap<String, Object>(); | |
map.put("host", "localhost"); | |
map.put("port", 61616); | |
// https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html | |
TransportConfiguration transpConf = new TransportConfiguration( | |
NettyAcceptorFactory.class.getName(), map); | |
HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>(); | |
setTransp.add(transpConf); | |
configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java | |
// Step 2. Create and start the server | |
ActiveMQServer server = ActiveMQServers | |
.newActiveMQServer(configuration); | |
server.start(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw e; | |
} | |
} | |
} |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import javax.jms.Connection; | |
import javax.jms.DeliveryMode; | |
import javax.jms.JMSException; | |
import javax.jms.Message; | |
import javax.jms.MessageListener; | |
import javax.jms.Session; | |
import javax.jms.Destination; | |
import javax.jms.TextMessage; | |
public class App { | |
public static void main(final String arg[]) throws Exception { | |
try { | |
javax.naming.Context context = new javax.naming.InitialContext(); | |
// Create a ConnectionFactory | |
ActiveMQConnectionFactory connectionFactory = | |
(org.apache.activemq.ActiveMQConnectionFactory) context.lookup("ConnectionFactory"); | |
// Create a Connection | |
Connection connection = connectionFactory.createConnection(); | |
connection.start(); | |
// Create a session | |
Session session = connection.createSession(false, | |
Session.AUTO_ACKNOWLEDGE); | |
// Create a destination | |
Destination topic = (javax.jms.Topic) context.lookup("MyTopic"); | |
// Create first consumer | |
javax.jms.MessageConsumer consumer1 = session.createConsumer(topic); | |
consumer1.setMessageListener(new MessageListener() { | |
public void onMessage(Message message) { | |
try { | |
if (message instanceof TextMessage) { | |
TextMessage textMessage = (TextMessage) message; | |
System.out.println("Consumer 1 received message " | |
+ textMessage.getText()); | |
} | |
} catch (JMSException e) { | |
System.out.println("Caught:" + e); | |
e.printStackTrace(); | |
} | |
} | |
}); | |
// Create second consumer | |
javax.jms.MessageConsumer consumer2 = session.createConsumer(topic); | |
consumer2.setMessageListener(new MessageListener() { | |
public void onMessage(Message message) { | |
try { | |
if (message instanceof TextMessage) { | |
TextMessage textMessage = (TextMessage) message; | |
System.out.println("Consumer 2 received message " | |
+ textMessage.getText()); | |
} | |
} catch (JMSException e) { | |
System.out.println("Caught:" + e); | |
e.printStackTrace(); | |
} | |
} | |
}); | |
// Create a producer | |
javax.jms.MessageProducer producer = session.createProducer(topic); | |
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); | |
// Send some messages | |
TextMessage message = session.createTextMessage(""); | |
for (int i = 0; i < 10; i++) { | |
try { | |
Date date = new Date(); | |
String formattedDate = new SimpleDateFormat( | |
"MM/dd/yyyy HH:mm:ss").format(date); | |
message = session.createTextMessage(formattedDate | |
+ " number of iteration " + i); | |
producer.send(message); | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
connection.close(); | |
context.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |