Web technologies -- Laboratory 15 -- 2009-2010 -- info.uvt.ro

Message Queue Systems

edit
  • Message queues are components used for inter-process/thread communication.
  • They rely on queues to deliver/process messages in a FIFO manner.
  • The communication is usually asynchronous.
  • They rely on point-to-point (the sender needs to know the destination) or publish/subscribe (the sender does not need to know the destination) models to handle messages.

Java Message Service (JMS)

edit

JMS is a middleware API for sending messages between two or more clients. The current version specification is JMS 1.1 (since 2002).

ActiveMQ

edit

Apache's ActiveMQ middleware is one JMS solution. It has support for Java, .NET, C++, Python, PHP, Ruby, etc. The current version is 5.4.2 (since December 2010). It can be downloaded from here.

Installing ActiveMQ is straightforward:

  • unzip the archive into the desired directory;
  • open a console;
  • cd into that directory;
  • you should see several subdirectories including one called bin;
  • cd into bin;
  • type activemq into the console;
  • the server should start and some information should be displayed on the console;
  • test your server by typing: http://localhost:8161/admin in a browser.

A simple example showing how one can create a producer and consumer using ActiveMQ is shown bellow:

NOTE: Remember to add the libraries found in the lib directory of the ActiveMQ distribution to the Java project.


// Classes required by ActiveMQ JMS
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;

public class SimpleActiveMQExample {

	private static ActiveMQConnectionFactory connectionFactory;
	private static Connection connection;
	private static Session session;
	private static Destination destination;
	private static boolean transacted = false;

	public static void main(String[] args) throws Exception {
		// Create the service broker
		BrokerService broker = new BrokerService();
		broker.setUseJmx(true);
		// Add a connector to the port where the messages will be sent
		broker.addConnector("tcp://localhost:61616");
		// Start the broker
		broker.start();

		// Initialise the system
		setUp();
		// Initialise the producer and send a message
		createProducerAndSendAMessage();
		
		// Sleep a little...
		Thread.sleep(4000);
		// Initialise the consumer and grab the message
		createConsumerAndReceiveAMessage();

		broker.stop();
	}
	
	private static void setUp() throws JMSException {
		// We need a connection factory. It produces connections
		connectionFactory = new ActiveMQConnectionFactory(
			ActiveMQConnection.DEFAULT_USER,
			ActiveMQConnection.DEFAULT_PASSWORD,
			ActiveMQConnection.DEFAULT_BROKER_URL);
		// Create a connection (this one is used for the producer)
		connection = connectionFactory.createConnection();
		// Activate it
		connection.start();
		// Attach a session to the connection
		session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
		// Create a queue attached to the session
		destination = session.createQueue("mmy first active mq queue");
	}

	private static void createProducerAndSendAMessage() throws JMSException {
		// Create a producer attached to the destination (queue)
		MessageProducer producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
		// Create a simple text message. Other options are available. Even binary data can be sent
		TextMessage message = session.createTextMessage("Hello World!");
		
		// Send the message
		System.out.println("Sending message: " + message.getText());
		producer.send(message);
	}

	private static void createConsumerAndReceiveAMessage() throws JMSException, InterruptedException {
		// Create a connection for the consumer
		connection = connectionFactory.createConnection();
		// Activate it
		connection.start();
		// Create a consumer attached to the destination (queue)
		// NOTICE HOW THE DESTINATION IS THE SAME FOR BOTH CONSUMER AND PRODUCER
		MessageConsumer consumer = session.createConsumer(destination);
		// Create the actual consumer
		MyConsumer myConsumer = new MyConsumer();
		// Exceptions and messages are triggered by events asynchronously
		connection.setExceptionListener(myConsumer);
		consumer.setMessageListener(myConsumer);
	}

	/**
	* This is the consumer class. It can be in its own file but would need to duplicate the 
	* setUp() method for setting the system
	*/
	private static class MyConsumer implements MessageListener, ExceptionListener {
		synchronized public void onException(JMSException ex) {
			System.out.println("JMS Exception occured.  Shutting down client.");
			System.exit(1);
		}

		public void onMessage(Message message) {
			// Remember that the message we've sent is of type Text. If other types
			// are used simply add more ifs
			if (message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("Received message: " + textMessage.getText());
				} catch (JMSException ex) {
					System.out.println("Error reading message: " + ex);
				}
			} else  {
				System.out.println("Received: " + message);
			}
		}
	}
}
edit

Advanced Message Queuing Protocol (AMQP)

edit

Is a middleware that offers:

  • message orientation;
  • queuing;
  • routing (including point-to-point and publish-and-subscribe);
  • reliability;
  • security.

Unlike JMS, AMQP is a wire-level protocol which defines the format the data sent over the network. Thus it opens the way for inter-operability as any software adhering to this format would be able to attach itself to the global system.

It relies on:

  • exchanges : entities to which the messages are sent. They can be of type: direct, fanout, topic, header;
  • and queues : entities which read the messages. They are bound to exchanges.
  • messages : the communication atoms. They are sent to exchanges and consumers having queues bound to that exchange can read from them.

The latest version is 1.0 (May 2010).

RabbitMQ

edit

RabbitMQ is one of the several AMQP based solutions. It can be downloaded from here.

Installation steps are explained here. If you plan on testing it on your machine install both client and server.

NOTE Erlang needs to be also installed on your machine in order for RabbitMQ to work.

The following example shows how a producer and consumer can be created using RabbitMQ:

Producer:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;

public class RabbitMQProducer {
	public static void main(String []args) throws Exception {
		// Create the connection factory. It produces connections
		ConnectionFactory factory = new ConnectionFactory();
		// Set up the credentials
		factory.setUsername("guest");
		factory.setPassword("guest");
		factory.setVirtualHost("/");
		factory.setHost("127.0.0.1");
		factory.setPort(5672);

		// Create a connection
		Connection conn = factory.newConnection();
		// Create the communication channel attached to this connection
		Channel channel = conn.createChannel();
		String exchangeName = "myExchange";
		String routingKey = "testRoute";

		// Create the message
		byte[] messageBodyBytes = "Hello, world!".getBytes();
		// Publish the message to an exchange using a routing key. The routing key allows messages to be sent only to certain exchanges
		channel.basicPublish(
				exchangeName, 
				routingKey,
				MessageProperties.PERSISTENT_TEXT_PLAIN, 
				messageBodyBytes) ;

		// Close the channel and the connection
		channel.close();
		conn.close();
	}
}

Consumer:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
	public static void main(String []args) throws Exception {
		// Create the connection factory. It produces connections
		ConnectionFactory factory = new ConnectionFactory();
		// Set up the credentials
		factory.setUsername("guest");
		factory.setPassword("guest");
		factory.setVirtualHost("/");
		factory.setHost("127.0.0.1");
		factory.setPort(5672);

		// Create a connection
		Connection conn = factory.newConnection();
		// Create a communication channel attached to the connection
		Channel channel = conn.createChannel();
		String exchangeName = "myExchange";
		String queueName = "myQueue";
		String routingKey = "testRoute";
		boolean durable = true;

		// Create an exchange to which messages are to be sent
		channel.exchangeDeclare(exchangeName, "direct", durable);
		// Create a queue to consume messages
		channel.queueDeclare(queueName, durable,false,false,null);
		// Bind the queue to the exchange
		channel.queueBind(queueName, exchangeName, routingKey);
		boolean noAck = false;
		
		// Create the queue consumer
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, noAck, consumer);
		boolean runInfinite = true;
		while (runInfinite) {
			// In an infinite loop grab next message
			QueueingConsumer.Delivery delivery;
			try {
				// If nextDelivery receives no argument it will wait indefinitely for a message. This blocks the rest of the thread. 
				// One solution is to start this in a separate thread 
				delivery = consumer.nextDelivery();
			} catch (InterruptedException ie) {
				continue;
			}
			System.out.println("Message received" + new String(delivery.getBody()));
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}

		// Close the channel and the connection
		channel.close();
		conn.close();
	}
}
edit

Comparison between various Message Queuing Systems

edit

Exercises

edit