Amazon SQS Polling Consumer using WSO2 MI 4.2.0
Introduction
The WSO2 MI product supports you to poll Amazon SQS queue using Inbound Endpoint which allows you to connect to Amazon and consume messages from the queue.
Amazon SQS offers reliable and scalable hosted queues for storing messages as they travel between computers. By using Amazon SQS, you can move data between distributed components of your applications that perform different tasks without losing messages or requiring each component to be always available.
In this blog I’m going to discuss how the AmazonSQS inbound endpoint works as a message consumer using a simple example.
Prerequisites
As a prerequisite you need to have an Amazon AWS account. Log into your AWS account and follow the steps to get an AWS Access Key for the AWS root account or the IAM user account. Once you generate an access key, keep the Access Key ID and Secret Access Key to use them as the ‘accessKey’ and ‘secretKey’ properties of the inboundEndpoint definition.
Next you can create a queue in Amazon Simple Queue Service (SQS). In AWS search bar, search for SQS and select the ‘Simple Queue Service’. Then use the ‘Create queue’ option to create a new queue.
Here I have used the FIFO queue option and provided a name for the queue. In this scenario I have kept rest of the queue configurations to default.
Once you have created the queue, you can find the queue details as below.
Please note the ‘URL’ above, we will be using this URL as the ‘destination’ parameter of the inbound endpoint configurations while developing the mediation.
Now we have completed the prerequisites, we are going to develop the integration runtime to keep listening to the messages from that queue.
Develop the Integration Project and deploy the CApp
Let’s see how we can configure inbound endpoint using WSO2 Integration Studio.
Step 01: First download the Integration Studio from here.
Step 02: Create a new Integration Project.
Step 03: Provide the Integration Project Name and click finish. Here I have provided the name as ‘AmazonSQS’ as the project name.
After you create the project you will be able to see the project as below.
Step 04: Create a New Inbound Endpoint using the below option.
Here I have provided the inbound endpoint name as ‘AmazonSQSInboundEndpoint’.
Navigate to the created inbound endpoint using the left side panel and use the ‘Source’ view option to modify the content as below.
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="AmazonSQSInboundEndpoint"
onError="ErrorSequence"
sequence="ProcessSequence"
suspend="false"
class="org.wso2.carbon.inbound.amazonsqs.AmazonSQSPollingConsumer"
xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">2000</parameter>
<parameter name="coordination">true</parameter>
<parameter name="waitTime">1</parameter>
<parameter name="maxNoOfMessage">10</parameter>
<parameter name="destination">https://sqs.us-east-1.amazonaws.com/324037293231/MyQueue.fifo</parameter>
<parameter name="accessKey">xxxxxxxxxxxxxxxxxxxx</parameter>
<parameter name="secretKey">xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="class">org.wso2.carbon.inbound.amazonsqs.AmazonSQSPollingConsumer</parameter>
<parameter name="inbound.behavior">polling</parameter>
</parameters>
</inboundEndpoint>
You can refer to the documentation to understand more about the defined properties.
Step 05: Create a sequence which is pointed in the above inbound endpoint. Since we have provided the sequence name as ‘ProcessSequence’ in the above, I will name the sequence as ‘ProcessSequence’.
You can use the ‘Source’ view option to modify the sequence mediation logic. Here I have added only a log to print the message information.
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="ProcessSequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="full"/>
</sequence>
Step 05: Define the error sequence which is pointed in the inbound endpoint. Since we have provided the error sequence name as ‘ErrorSequence’ in the above, I will name the sequence as ‘ErrorSequence’. Modify the below error sequence to handle the error scenarios.
Step 06: Use the below option to export the project as a deployable CApp (Carbon Application) file.
While exporting you can deploy the CApp to the MI server’s /repository/deployment/server/carbonapps folder.
Deploy the Amazon SQS Connector
You can browse the WSO2 connector store to download the required connectors for your project. For this project, we are downloading the AmazonSQS connector from here.
Download the org.apache.synapse.amazonsqs.poll.class-<VERSION>.jar and add it into the <MI-HOME>/dropins directory.
Starting the MI server
Since we have deployed the AmazonSQS connector artifacts and the CApp in the MI server, we are going to start the MI server.
In the logs you can observe below for AmazonSQS and carbon application deployment.
Testing the scenario
Navigate to the created queue in the AWS as a prerequisite. You can list the available queues by navigating to ‘Amazon SQS’ -> Queues.
Click on the queue name and select the ‘Send and receive messages’ option.
In the ‘Send and receive messages’ section, you will be able to send messages to a queue.
When you send a message to the queue, the inbound endpoint will be polling for the messages and print the message sent to the queue. In the ‘ProcessSequence’ since we have defined log mediator, the below will be getting printed in the logs.
So now you have successfully created anAmazonSQS Inbound Endpoint polling scenario and deployed it using the WSO2 MI server.
If you need further information regarding this, please refer to the official documentation from here.