Amazon SQS – Listening To SQS Using Apache Camel The Spring DSL Way

In my previous post Amazon SQS – Listening to amazon SQS queue using Apache Camel we saw how we can leverage Apache Camel to listen to a Amazon SQS queue. The example we created was simple one. We used Java DSL to route messages from SQS and process it in our anonymous Processor class. While the example worked, it was a very basic one. There were two cons immediately visible.

  1. Manually Starting camelcontext – The camelcontext was initialized directly in code. While there is no problem with that, in production system we would like to do that automatically. For this we can take help of Spring. We can register our camelcontext as a bean and then load the beans during server startup using spring context listener.
  2. Tight Coupling – We directly consumed the messages within our RouteBuilder class by creating an instance of processor. In ideal case, routebuilder should not be used to process messages. We should have a separate class for these processing. RouteBuilder should be used just to register endpoints, apply filters and endpoint chaining, any processing should be delegated to a separate class, so that we can have a clean, loosely coupled code.

In today’s post we will see how we can use bean binding for same purpose.

In today’s example we will use Spring and Spring DSL to produce, route and consume messages from SQS.  To set the agenda we will try to perform following to get a working spring DSL listener.

  1. Send a message to sqs. We won’t be using camel for this, but it will be fairly simple task to achieve this part.
  2. Register our camelcontext in a spring config xml.
  3. Create a POJO to consume the SQS message and register it as spring bean.
  4. Create a custom routebuilder class with sqs and bean endpoints and register it with camelcontext in spring config file.
  5. Start the camel context and have fun.

Download the code for this tutorial from following SVN location

https://www.assembla.com/code/weblog4j/subversion/nodes/19/awsdemo/trunk

1. Maven dependencies. Following dependencies are required to be downloaded

     <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.5.6</version>
     </dependency>
     <!-- concrete Log4J Implementation for SLF4J API-->
     <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.5.6</version>
     </dependency>

     <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-core</artifactId>
          <version>${spring.version}</version>
      </dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-web</artifactId>
          <version>${spring.version}</version>
      </dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-beans</artifactId>
          <version>${spring.version}</version>
      </dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>${spring.version}</version>
      </dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jdbc</artifactId>
          <version>${spring.version}</version>
      </dependency>

      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-orm</artifactId>
          <version>${spring.version}</version>
      </dependency>

       <dependency>
            <groupId>org.imgscalr</groupId>
            <artifactId>imgscalr-lib</artifactId>
            <version>4.2</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.3.33</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>${camel-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-aws</artifactId>
            <version>${camel-version}</version>
        </dependency>


2. Create a spring config xml with basic camelcontext element in it. Let us name it as camelconfig.xml. We will be modifying this file till end to get a working example.

<camelContext id="sqsContext" xmlns="http://camel.apache.org/schema/spring">
.....
</camelContext>

3. Create a Bean processor class which will receive and process messages from SQS queue. com.aranin.aws.sqs.BeanProcessor. We have a single method in this bean

public void processSQSMessage(Exchange exchange)

Exchange is a container of message. It is created when a message is received by a consumer during routing process. The camelcontext creates an exchange when it receives a message and passes it to the binded bean which is BeanProcessor in this case. Here is the complete class.

package com.aranin.aws.sqs;

import com.aranin.aws.s3.PhotoProcessor;
import org.apache.camel.Exchange;

import java.util.StringTokenizer;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 5/14/13
 * Time: 11:21 AM
 * To change this template use File | Settings | File Templates.
 */
public class BeanProcessor {

    public void processSQSMessage(Exchange exchange){
        System.out.println("processSQSMessage");
        String messagestring = exchange.getIn().toString();
        System.out.println("messagestring : " + messagestring);
        StringTokenizer photoTokenizer = new StringTokenizer(messagestring, ",");
        String source = null;
        String target = null;
        String path = null;

        source = photoTokenizer.nextToken();
        source = source.substring("Message: ".length());
        System.out.println("source : " + source);
        target = photoTokenizer.nextToken();
        path = photoTokenizer.nextToken();
        System.out.println("source : " + source);
        System.out.println("target : " + target);
        System.out.println("path : " + path);
        /**
         * generate thumbmail within 150*150 container
         */
        PhotoProcessor.generateImage(path, source, target, 150);
    }
}

4. Register BeanProcessor in camelconfig.xml. So you will have an entry like.

<bean id="sqsBeanProcessor" class="com.aranin.aws.sqs.BeanProcessor"/>

Please note that id of bean is “sqsBeanProcessor”

5. Spring DSL – Register you routes in camelcontext. Once this is done your camelcontext will look like

<camelContext id="sqsContext" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="aws-sqs://PhotoQueue?accessKey=abcd&amp;secretKey=abcd&amp;amazonSQSEndpoint=https://sqs.ap-southeast-1.amazonaws.com"/>
<to uri="bean:sqsBeanProcessor?method=processSQSMessage"/>
</route>
</camelContext>
  1. Here aws-sqs://PhotoQueue is uri of the queue.
  2. aws-sqs tell camel context to use aws-sqs component to return an sqs endpoint.
  3. Photoqueue is name of the queue we are operating on
  4. secretKey and accessKeys are your amazon api keys which you should not share with anyone.
  5. amazonSQSEndpoint is the region where your key is present

The to uri is specially interesting bean:sqsBeanProcessor?method=processSQSMessage.

  1. This tell camelcontext that we have a bean named sqsBeanProcessor which will act as consumer of incoming message.
  2. This bean has a method named processSQSMessage which will consume the message. Camel is responsible for parameter binding of the method. The processSQSMessage has exchange as parameter. This parameter is supplied by camel.

6. Your whole camelconfig.xml will now look like

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <camelContext id="sqsContext" xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="aws-sqs://PhotoQueue?accessKey=abcd&amp;secretKey=abcd&amp;amazonSQSEndpoint=https://sqs.ap-southeast-1.amazonaws.com"/>
            <to uri="bean:sqsBeanProcessor?method=processSQSMessage"/>
        </route>
    </camelContext>

    <bean id="sqsRouter" class="com.aranin.aws.sqs.SQSBeanRouterBuilder"/>

    <bean id="sqsBeanProcessor" class="com.aranin.aws.sqs.BeanProcessor"/>

</beans>

7. So now we create a manager which we will name as SpringCamelPhotoManager. SpringCamelPhotoManager will help us load the beans and start the camelcontext.

package com.aranin.aws.sqs;

import com.aranin.aws.s3.PhotoFile;
import org.apache.camel.CamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

/**
 * Created by IntelliJ IDEA.
 * User: Niraj Singh
 * Date: 5/14/13
 * Time: 11:30 AM
 * To change this template use File | Settings | File Templates.
 */

public class SpringCamelPhotoManager {
    public void startCamelServer() {
    try {
        ApplicationContext springcontext = new FileSystemXmlApplicationContext("D:/samayik/awsdemo/src/main/resources/camelconfig.xml");
        CamelContext context = springcontext.getBean("sqsContext", CamelContext.class);
        context.start();
        Thread.sleep(10000);
        context.stop();

    } catch ( Exception e ) {
        System.out.println(e);
    }

    }

    public void sendMessage(){
        AWSSimpleQueueServiceUtil awssqsUtil =   AWSSimpleQueueServiceUtil.getInstance();
        /**
         * 1. get the url for your photo queue
         */
        String queueUrl  = awssqsUtil.getQueueUrl(awssqsUtil.getQueueName());
        System.out.println("queueUrl : " + queueUrl);

        /**
         * 2. Add a photo to the queue to be processed
         */

        PhotoFile photo = new PhotoFile();
        photo.setImagePath("d:/vids");
        photo.setOrigName("Dock.jpg");
        photo.setTargetName("dock_thumb.jpg");

        /**
         * 3. set the photofile in queue for processing
         */

         awssqsUtil.sendMessageToQueue(queueUrl, photo.toString());
    }

    public static void main(String[] args){
        SpringCamelPhotoManager springCamelPhotoManager = new SpringCamelPhotoManager();

         /**
         * send a message
         */

        springCamelPhotoManager.sendMessage();

         /**
         * start camel as standalone and keep on receiving and processing messages asynchrounously
         */

         springCamelPhotoManager.startCamelServer();

    }

}

If you check out the main method we make two calls

  • springCamelPhotoManager.sendMessage(); – This sends a message to SQS queue.
  • springCamelPhotoManager.startCamelServer(); – This will start the camel context which in turn will start to listen to sqs and consume the messages in background.

8. Now you are ready – Supply the secret and access keys to the routes. Modify the file names in SpringCamelPhotoManager and run the class. You should have output like this

com.aranin.aws.sqs.SpringCamelPhotoManager
log4j:WARN No appenders could be found for logger (com.amazonaws.auth.AWS4Signer).
log4j:WARN Please initialize the log4j system properly.
queueUrl : https://sqs.ap-southeast-1.amazonaws.com/282733326245/PhotoQueue
{MD5OfMessageBody: 2cf93320514509eb56e960f999b5cd1f, MessageId: ff683b8c-03bb-45ba-8c60-4937116593cf, }
processSQSMessage
messagestring : Message: Dock.jpg,dock_thumb.jpg,d:/vids
source : Dock.jpg
source : Dock.jpg
target : dock_thumb.jpg
path : d:/vids
Process finished with exit code 0

I hope you enjoyed this post. If this post helped you in any way please post some comments and luv to encourage me to write more. Till then good bye.

Warm Regards

Niraj Singh

 

 

Print Friendly

About Niraj Singh

I am CEO and CoFounder of a startup "Aranin Software Private Limited, Bangalore. I completed my graduation in 2002 as an Aerospace Engineer from IIT Kharagpur. I love working on new ideas and projects and recently released my first open source project JaiomServer "http://jaiomserver.org". I have 9 years of experience in IT industries most of which I have spent in developing community applications for various clients using java. Some of the sites in which I have actively involved with are hgtv.com, food.com, foodnetwork.com, pickle.com, diynetwork.com etc.
This entry was posted in Amazon Web Services, Cloud Computing, SQS. Bookmark the permalink.

Comments are closed.