Community

The Anypoint™ Connector for Amazon SQS provides an easy way to interface with the Amazon Simple Queue Service API, allowing Mule ESB users to manage SQS queueing services without having to deal with the API directly.

Introduction

Amazon Simple Queue Service (Amazon SQS) offers a reliable, highly scalable hosted queue for storing messages as they travel between computers. By using Amazon SQS, developers can simply move data between distributed application components performing different tasks, without losing messages or requiring each component to be always available. Amazon SQS makes it easy to build an automated workflow, working in close conjunction with the Amazon Elastic Compute Cloud (Amazon EC2) and the other AWS infrastructure web services.

The AWS SDK for Java provides a Java API for AWS infrastructure services. The Amazon SQS connector is built using the SDK for Java. For the complete list of operations supported by the connector, see Apidocs and Samples.

Although it is called a queue, Amazon SQS does not guarantee FIFO access to messages in Amazon SQS queues, mainly because of its distributed nature. If you require specific message ordering, you need to design your application to handle it.

Prerequisites

This document assumes you are familiar with Mule, Anypoint Connectors, and Anypoint Studio Essentials. To increase your familiarity with Studio, consider completing one or more Anypoint Studio Tutorials. Further, this page assumes that you have a basic understanding of Mule flows and Mule Global Elements.

This document describes implementation examples within the context of Anypoint Studio, Mule ESB’s graphical user interface, and, in parallel, includes configuration details for doing the same in the XML Editor.

Dependencies

As the connector comes pre-bundled with Anypoint Studio, you must:

  • Install Anypoint Studio.

  • Sign Up for Amazon Web Services. To access AWS with the connector, you need the credentials in the form of IAM.

Compatibility Matrix

Application/Service Version

Mule Runtime

3.5.0 and above

AWS SDK for Java

1.9.39

Installing and Configuring

Installing

You can install a connector in Anypoint Studio using the instructions in Installing a Connector from Anypoint Exchange.

Updating From an Older Version

Everytime an updated version of a connector is released, Anypoint studio displays a popup in the bottom right corner of you screen with the following message: Updates Available. To upgrade to the newer version of the Amazon SQS connector:

  1. Click the popup and check for the available updates.

  2. Select the Amazon SQS connector version 3.0.x check-box and click Next.

  3. Follow the instructions provided by the user interface.

  4. Restart Studio when prompted. After restarting, if you have several versions of the connector installed, Mule asks you for the version of the connector you like to use.

Creating a New Project

To use the Amazon SQS connector in a Mule application project:

  1. In Studio, select File>New>Mule Project.

    File Menu
  2. Enter a name for your new project and leave the remaining options with their default values.

    New Project
  3. If you plan to use Git, select Create a .gitignore file for the project with default ignores for Studio Projects, and then click Next.

  4. Click Finish to create the project.

Configuring the Amazon SQS Connector Global Element

To use the Amazon SQS connector in your Mule application, you must configure a global element that can be used by all the Amazon SQS connectors in the application (read more about global elements).

To create a global Amazon SQS connector configuration:

  1. Click the Global Elements tab at the base of the canvas.

  2. On the Global Mule Configuration Elements screen, click Create.

  3. In the Choose Global Type wizard, expand Connector Configuration, and then select Amazon SQS: Configuration.

    Connector Configuration
  4. Click OK.

  5. Enter the global element properties:

    Global Configuration
    Key Parameters Description Example

    Access Key

    Alphanumeric text string that uniquely identifies the user who owns the account

    AKIAIA6DCDAES37G62OA

    Secret Key

    Key that plays the role of a password

    MMXcMDzAZ8M2234dogcwuXvWy0+cYuetl-4wAKFJB

    Queue Name

    The default queue name; if it doesn’t exist, Mule automatically creates it

    testQueue

    Queue URL

    The URL of the Amazon SQS queue to act upon

    https://sqs.us-east-1.amazonaws.com/0955506219O07/testQueue

    Region Endpoint

    The regional endpoint to process your requests

    USWEST2

    When a Queue Name is provided in the global element, the connector automatically creates the queue and sets the url of this queue as Queue URL. All the Amazon SQS Message processors that reference the global element perform operations using this Queue URL.

    If you have to reference a different Queue URL for a particular message processor in the flow, you can perform the operation using the Queue URL attribute provided by the Message Processor.

  6. Keep the Pooling Profile and the Reconnection tabs with their default entries.

  7. Click Test Connection to confirm that the parameters of your global configuration are accurate, and that Mule is able to successfully connect to your instance of Amazon SQS. Read more about Testing Connections.

  8. Click OK to save the global connector configurations.

Using the Connector

The Amazon SQS connector is an operation-based connector, which means that when you add the connector to your flow, you need to configure a specific operation the connector is intended to perform. The Amazon SQS connector supports the following operations:

For the all the below operations in the connector to work, you need to enable/update the subset of the overall list of Amazon SQS actions in the SQS Policy to specify that the AWS account has access to the subset actions on the queue.

  • Add Permission

  • Change message visibility

  • Change message visibility batch

  • Create queue

  • Delete message

  • Delete message batch

  • Delete queue

  • Get approximate number of messages

  • Get queue attributes

  • Get queue url

  • List dead letter source queues

  • List queues

  • Purge queue

  • Receive Messages

  • Remove permission

  • Send message batch

  • Send message

  • Set Queue Attributes

Adding the Amazon SQS Connector to a Flow

  1. Create a new Mule project in Anypoint Studio.

  2. Drag the Amazon SQS connector onto the canvas, then select it to open the properties editor.

  3. Configure the connector’s parameters:

    Connector Parameters
    Field Description Example

    Display Name

    Enter a unique label for the connector in your application

    Amazon SQS (Streaming) Receive Messages

    Connector Configuration

    Select a global Amazon SQS connector element from the drop-drown.

    Amazon_SQS_Configuration

    Operation

    Select an operation for the connector perform.

    Receive Messages

    Queue URL

    Select a parameter for the operation.

    #[payload]

  4. Click the blank space on the canvas to save your connector configurations.

Example Use Case

Send a message along with meta data to an Amazon SQS queue and then receive it from the queue. This can be split into the following two flows.

  1. Send message along with meta data, and then get the count of the messages in the queue to validate that the message has been sent.

  2. Receive the message, log the message body, and delete the message from the queue.

Demo Flow

Begin the flow by sending a message to queue.

  1. Create a new Mule project in Anypoint Studio.

  2. Drag an HTTP connector into the canvas, then select it to open the properties editor console.

  3. Add a new HTTP Listener Configuration global element:

    1. In General Settings, click the + button:

      HTTP Listener
    2. Configure the following HTTP parameters, while retaining the default values for the other fileds:

      HTTP Parameters
      Field Value

      Name

      HTTP_Listener_Configuration

      Port

      8081

    3. Reference the HTTP Listener Configuration global element and set the path to '/sendmessage':

      HTTP Connector
  4. Add a Groovy component to attach the metadata and configure it as follows:

    Groovy Transformer
    Field Value

    Name

    Groovy

    Script Text

    import org.mule.modules.sqs.model.MessageAttributeValue; Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("AccountId", new MessageAttributeValue().withDataType("String.AccountId").withStringValue("000123456")); messageAttributes.put("NumberId", new MessageAttributeValue().withDataType("Number").withStringValue("230.000000000000000001")); return messageAttributes;

  5. Drag an Amazon SQS connector into the flow, and double-click the connector to open its Properties Editor.

  6. If you do not have an existing Amazon SQS connector global element to choose, click the plus sign next to Connector Configuration.

    Amazon SQS Global Element
  7. Configure the global element properties, then click OK.

  8. Configure the remaining parameters of the connector:

    Amazon SQS Connector Parameters
    Field Value

    Display Name

    Enter a name for the connector instance.

    Connector Configuration

    Select the global configuration you create.

    Operation

    Send Message

    Message

    #[message.inboundProperties.'http.query.params'.msg]

    Message Attributes

    #[payload] - Previously set in the payload by the Groovy component.

  9. Add a Object To Json transformer to convert the response from connector into JSON.

  10. Add a Logger to print the response in the Mule Console:

    Logger
    Field Value

    Display Name

    Enter a name of your choice.

    Message

    Sent Message : #[payload].

    Level

    INFO (Default)

  11. Add another Amazon SQS connector to get the count of the messages in the queue:

    Get Message Count
    Field Value

    Display Name

    Enter a name for the connector instance.

    Connector Configuration

    Select the global configuration you create.

    Operation

    Get approximate number of messages.

  12. Add a Logger to print the number in the Mule Console.

    Logger

This completes the first part of the use case. Now proceed to the next flow.

  1. Drag an Amazon SQS connector and configure it as an inbound endpoint:

    Receive Messages
    Field Value

    Display Name

    Describe the operation intended by the message processor.

    Connector Configuration

    Select the global configuration you create.

    Operation

    Receive messages

    Number of Messages

    1

    Visibility Timeout

    30

    The Message processor’s Queue Url attribute takes precedence over the Global Element Properties Queue URL.

    If none of the attributes belonging to Global Element Properties, including Queue Name, Queue URL, and the Message Processor’s Queue Url is provided, the connector throws an exception.

  2. Add a Logger to print the message in the Mule Console:

    Field Value

    Display Name

    Enter a name of your choice.

    Message

    Received Message: #[payload].

    Level

    INFO (Default)

  3. Add another Logger to print the message handle in the console. Delete message processor requires the message handle to delete the message from the queue:

    Display Message Handle
    Field Value

    Display Name

    Enter a name of your choice.

    Message

    Deleting message with handle: #[header:inbound:sqs.message.receipt.handle].

    Level

    INFO (Default)

  4. Now configure an Amazon SQS connector to delete the message from the queue.

    Delete Message
    Field Value

    Display Name

    Enter a name for the connector instance.

    Connector Configuration

    Select the global configuration you create.

    Operation

    Delete Message

  5. Add a Logger to print the status in the mule console after the message is deleted.

Example Code

For this code to work in Anypoint Studio, you must provide Amazon Web Services credentials. You can either replace the variables with their values in the code, or you can provide the values for each variable in the src/main/app/mule-app.properties file.
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
        xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:http="http://www.mulesoft.org/schema/mule/http"
        xmlns:sqs="http://www.mulesoft.org/schema/mule/sqs" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
        xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
        xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.6.1"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/sqs http://www.mulesoft.org/schema/mule/sqs/current/mule-sqs.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd">
        <http:listener-config name="HTTP_Listener_Configuration"
                host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration" />
        <sqs:config name="Amazon_SQS_Connection_Management"
                accessKey="${sqs.accessKey}" secretKey="${sqs.secretKey}" defaultQueueName="${sqs.queueName}"
                region="${sqs.region}" doc:name="Amazon SQS: Connection Management" />

        <flow name="sqs-send-message-operation-demo-flow">
                <http:listener config-ref="HTTP_Listener_Configuration"
                        path="/sendmessage" doc:name="HTTP" />
                <scripting:transformer doc:name="Groovy" encoding="ISO-8859-2">
                        <scripting:script engine="Groovy">
                        <![CDATA[
                        import org.mule.modules.sqs.model.MessageAttributeValue;
                        Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
            messageAttributes.put("AccountId", new MessageAttributeValue().withDataType("String.AccountId").withStringValue("000123456"));
            messageAttributes.put("NumberId", new MessageAttributeValue().withDataType("Number").withStringValue("230.000000000000000001"));
                        return messageAttributes;
                        ]]></scripting:script>
                </scripting:transformer>
        <sqs:send-message config-ref="Amazon_SQS_Connection_Management" message="#[message.inboundProperties.'http.query.params'.msg]" doc:name="Send Message">
            <sqs:message-attributes ref="#[payload]"/>
        </sqs:send-message>

        <json:object-to-json-transformer doc:name="Object to JSON"/>
        <logger message="Sent Message : #[payload]" level="INFO" doc:name="Display Sent Message"/>
        <sqs:get-approximate-number-of-messages config-ref="Amazon_SQS_Connection_Management" doc:name="Get Count of Messages in queue"/>

        <logger message="Approx. messages in queue : #[payload]" level="INFO" doc:name="Count Messages in Queue"/>
                <set-payload value="Operations successful, Please check the log console for output."
                        doc:name="Display Message Count" />
        </flow>
        <flow name="sqs-receive-delete-message-operations-demo-flow">
                <sqs:receive-messages config-ref="Amazon_SQS_Connection_Management"
                        doc:name="Amazon SQS (Streaming) Receive Messages" />
                <logger message="Received Message : #[payload]" level="INFO"
                        doc:name="Display Message" />
        <logger message="Deleting message with handle : #[header:inbound:sqs.message.receipt.handle]" level="INFO" doc:name="Display Message Handle"/>
        <sqs:delete-message config-ref="Amazon_SQS_Connection_Management" doc:name="Delete Message"/>
        <logger message="Message deleted sucessfully from queue." level="INFO" doc:name="Logger"/>

        </flow>
</mule>

Test the flows

After the flows are ready and the Mule app is up, visit http://localhost:8081/sendmessage?msg=<message-body>; to test the flow.

While the mule console logs the output, you can notice that the inbound message processor, Receive Messages, retrieves the message sent.

See Also