Amazon SQS – Listening to amazon SQS queue using Apache Camel

In my previous post, Working with Amazon Simple Queue Service using java, I discussed how to post and retrieve messages from Amazon SQS queue using amazon SDK. There was one major problem in the code. Instead of listening to the queue we used a simple thread which polled on the queue and retrieved the message. This takes out the fun in using message oriented middle-ware. So let us take this one step further and create a listener which will be listing to the SQS and retrieve the message as soon as it is posted to the queue. We will be using Apache Camel for this purpose.

What is Apache Camel? For the readers not familiar with Apache Camel, it is a open source java framework based on Enterprise Integration Patterns. Camel is a rule based routing and mediation engine written in java and can be used across various transports and messaging models like HTTP, JMS Queues, Web Services etc. Camel allows you to declare endpoints using URIs and facilitates the delivery of the messages between the endpoints. For more knowledge you can visit following links

  1. http://architects.dzone.com/articles/apache-camel-integration
  2. http://camel.apache.org/
  3. http://stackoverflow.com/questions/8845186/what-exactly-is-apache-camel
  4. http://camel.apache.org/tutorials.html

You can also buy a book Camel in Action 

Setting up SQS? Please revisit my previous post on SQS to get you started with SQS. Working with Amazon Simple Queue Service using java

So once you are through with the post you should have following

  1. Live SQS queue called PhotoQueue.
  2. A working AWSSimpleQueueServiceUtil.java file which contains utility methods to connect and perform send, receive and delete operation on queue.
  3. SQSPhotoManager.java file which makes actual connection and send photo processing messages to SQS queue. This class also defines and starts a thread which retrieves messages from SQS via polling.

Purpose of this tutorial is to replace the SQSPhotoManager with a CamelPhotoManager which will leverage Apache Camel to act as mediator and router of photo processing messages.  CamelPhotoManager  will be responsible for posting message to SQS and then use Camel framework to retrieve and process the message.

Getting started part 2

1. Download Camel – It is always good to have maven manage our dependencies so you can add following to you pom.xml. Your version should be higher that 2.6 else it will not work.

 <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
      <version>${camel-version}</version>
 </dependency>

 <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-spring</artifactId>
      <version>${camel-version}</version>
 </dependency>

<dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-aws</artifactId>
       <version>${camel-version}</version>
</dependency>

2. Creating CamelContext  - CamelContext is a container which provides camel run time system. Most important thing to know about camel context is that they have a hold on your routes. Routes are  from and to endpoints of the message exchange. There are two ways of creating a camel context. One is registering them in spring and other one is to instantiate in java. I will be using java for this post. 

public void asyncProcess() {
    try {
    // create CamelContext
        SimpleRegistry registry = new SimpleRegistry();
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        AmazonSQS sqsClient =  awssqsUtil.getAWSSQSClient();
        registry.put("amazonSQSClient" , sqsClient);
        CamelContext context = new DefaultCamelContext(registry);

        // add our route to the CamelContext
        context.addRoutes(new MySQSRouterBuilder());

        context.start();
        Thread.sleep(100000);
        context.stop();

    } catch ( Exception e ) {
        System.out.println(e);
    }

    }
  • Here as a first step we have created a simple registry where we have stored the Amazon SQS client instance. You can get the code for AWSSimpleQueueService from my previous post Working with Amazon Simple Queue Service using java
  • Then we create a CamelContext and pass the registry to it.                      CamelContext context = new DefaultCamelContext(registry);
  • Then we register our route with the CamelContext.
  • Finally we start the CamelContext using context.start()

3. Creating the Route – RouteBuilder is the base class which implements the routing rules using DSL. We have to extend RouteBuilder and add its instance to camelContext. The complete discussion of camel routes is beyond the scope of this post. We can create the Route using Spring DSL but for simplicity we will be using java DSL.

package com.aranin.adconnect.util.aws;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

import java.util.StringTokenizer;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 4/12/13
 * Time: 12:42 PM
 * To change this template use File | Settings | File Templates.
 */
public class MySQSRouterBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        try{
            //Properties properties = new Properties();
            //properties.load(new FileInputStream("D:/samayik/adkonnection/src/main/resources/AwsCredentials.properties"));
            String sqs = "aws-sqs://PhotoQueue?amazonSQSClient=#amazonSQSClient";
            from( sqs).process(new Processor() {
                public void process(Exchange exchange)
                        throws Exception {

                    String messagestring = exchange.getIn().toString();
                    System.out.println("messagestring : " + messagestring);
                    StringTokenizer photoTokenizer = new StringTokenizer(messagestring, ",");
                    String source = null;
                    String target = null;
                    String path = null;

                    source = photoTokenizer.nextToken();
                    source = source.substring("Message: ".length());
                    System.out.println("source : " + source);
                    target = photoTokenizer.nextToken();
                    path = photoTokenizer.nextToken();
                    System.out.println("source : " + source);
                    System.out.println("target : " + target);
                    System.out.println("path : " + path);
                    /**
                     * generate thumbmail within 150*150 container
                     */
                    PhotoProcessor.generateImage(path, source, target, 150);

                }
            });

        }catch(Exception e){

        }
    }

}

There are three things to be highlighted over here.

  1. The class extends RouterBuilder and overriders configure method.
  2. The from route contains the uri for our sqs queue which is                                                aws-sqs://PhotoQueue?amazonSQSClient=#amazonSQSClient
  3. The call of process method where we pass an instance of org.apache.camel.Processor class. Here we are using a anonymous class but you can as well extend the Processor class and implement the process method.

The uri for sqs queue is

aws-sqs://queue-name[?options]

Options are parameter we want to pass to the SQS for making connection. For this we can either pass on the access and secret keys or pass in an instance of SQSClient which is present in jndi repository. For complete reference please visit http://camel.apache.org/aws-sqs.html

Tying things up. 

We saw the asynchProcess method which instantiates and starts the camel context. So here is the full CamelPhotoManager  class with main method which will send the message to the SQS queue and retrieve it via camel.

<strong>CamelPhotoManager  class</strong>
package com.aranin.adconnect.util.aws;

import com.amazonaws.services.sqs.AmazonSQS;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 4/12/13
 * Time: 3:27 PM
 * To change this template use File | Settings | File Templates.
 */
public class CamelPhotoManager {
    public void asyncProcess() {
    try {
    // create CamelContext
        SimpleRegistry registry = new SimpleRegistry();
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        AmazonSQS sqsClient =  awssqsUtil.getAWSSQSClient();
        registry.put("amazonSQSClient" , sqsClient);
        CamelContext context = new DefaultCamelContext(registry);

        // add our route to the CamelContext
        context.addRoutes(new MySQSRouterBuilder());

        context.start();
        Thread.sleep(10000);
        context.stop();

    } catch ( Exception e ) {
        System.out.println(e);
    }

    }

    public void sendMessage(){
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        /**
         * 1. get the url for your photo queue
         */
        String queueUrl  = awssqsUtil.getQueueUrl(awssqsUtil.getQueueName());
        System.out.println("queueUrl : " + queueUrl);

        /**
         * 2. Add a photo to the queue to be processed
         */

        PhotoFile photo = new PhotoFile();
        photo.setImagePath("d:/vids");
        photo.setOrigName("Dock.jpg");
        photo.setTargetName("dock_thumb.jpg");

        /**
         * 3. set the photofile in queue for processing
         */

         awssqsUtil.sendMessageToQueue(queueUrl, photo.toString());
    }

    public static void main(String[] args){
        CamelPhotoManager camelPhotoManager = new CamelPhotoManager();

         /**
         * send a message
         */

        camelPhotoManager.sendMessage();

         /**
         * start camel as standalone and keep on receiving and processing messages asynchrounously
         */

         camelPhotoManager.asyncProcess();

    }

}

Run this class and see the camel in action. One point to note is Camel is still polling to retrieve the message but it can be changed to asynchronous receiver. For that you need to change your routebuilder is following way

from(SqsEndpoint(String uri, SqsComponent component, SqsConfiguration configuration)).createConsumer(Processor)

I am still researching on that and if any of the readers can crack it then please let me know. I would love to see that solution.

So here it is, we have created our own SQS receiver powered by Apache Camel, but we have only scratched the surface, there is so much more we can do with Camel and SQS. I have really struggled to get a working tutorial in place. First things I wanted to use was spring DSL, then I wanted to use SQSConsumer which comes with camel sqs api. But I could not make them work fast enough(Simple POCs should not take more than one day in my opinion). So I would love to hear what breakthrough my dear young readers come up with. But with this tutorial I hope that atleast I can get some of you started.

Please share your comments for this post. I would love to hear from you.

Regards

Niraj

Print Friendly

About Niraj Singh

I am CEO and CoFounder of a startup "Aranin Software Private Limited, Bangalore. I completed my graduation in 2002 as an Aerospace Engineer from IIT Kharagpur. I love working on new ideas and projects and recently released my first open source project JaiomServer "http://jaiomserver.org". I have 9 years of experience in IT industries most of which I have spent in developing community applications for various clients using java. Some of the sites in which I have actively involved with are hgtv.com, food.com, foodnetwork.com, pickle.com, diynetwork.com etc.
This entry was posted in Amazon Web Services, Cloud Computing, SQS and tagged , , , . Bookmark the permalink.

2 Responses to Amazon SQS – Listening to amazon SQS queue using Apache Camel

  1. Jenefar says:

    Thanks Niraj,
    I googled a lot for implementing amazon SQS, i couldn’t find any other site better than this.
    Thanks for this wonderful article.
    Keep up your good work…. looking forward for your further articles.

  2. Pingback: Amazon SQS – Listening To SQS Using Apache Camel The Spring DSL Way | Java Logs

Leave a Reply

Your email address will not be published. Required fields are marked *

Connect with Facebook

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

CommentLuv badge