Process High-Volume Messages from Amazon SQS (Express Workflows)
This sample project demonstrates how to use an Amazon Step Functions Express Workflow to process messages or data from a high-volume event source, such as Amazon Simple Queue Service (Amazon SQS). Because Express Workflows can be started at a very high rate, they are ideal for high-volume event processing or streaming data workloads.
Here are two commonly used methods to execute your state machine from an event source:
-
Configure an Amazon CloudWatch Events rule to start a state machine execution whenever the event source emits an event. For more information, see Creating a CloudWatch Events Rule That Triggers on an Event.
-
Map the event source to a Lambda function, and write function code to execute your state machine. The Amazon Lambda function is invoked each time your event source emits an event, in turn starting a state machine execution. For more information see Using Amazon Lambda with Amazon SQS.
This sample project uses the second method to start an execution each time the Amazon SQS queue sends a message. You can use a similar configuration to trigger Express Workflows execution from other event sources, such as Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and Amazon Kinesis.
For more information about Express Workflows and Step Functions service integrations, see the following:
Create the State Machine and Provision Resources
-
Open the Step Functions console
and choose Create a state machine. -
Choose Run a sample project, and then choose Process high-volume messages from Amazon SQS.
The state machine Code and Visual Workflow are displayed.
-
Choose Next.
The Deploy resources page is displayed, listing the resources that will be created. For this sample project, the resources include:
-
A Step Functions state machine
-
An Amazon SQS queue
-
A Lambda function
-
-
Choose Deploy Resources.
Note
It can take up to 10 minutes for these resources and related IAM permissions to be created. While the Deploy resources page is displayed, you can open the Stack ID link to see which resources are being provisioned.
Trigger Execution
-
Open the Amazon SQS console
. -
Select the queue that was created by the sample project.
The name will be similar to Example-SQSQueue-wJalrXUtnFEMI.
-
In the Queue Actions list, select Send a Message.
-
Use the copy button to copy the following message, and on the Send a Message window, enter it, and choose Send Message.
Note
In this sample message, the
input:
line has been formatted with line breaks to fit the page. Use the copy button or otherwise ensure that it is entered as a single line with no breaks.{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
Choose Close.
-
Open the Step Functions console.
-
Go to your Amazon CloudWatch Logs log group
and inspect the logs. The name of the log group will look like example-ExpressLogGroup-wJalrXUtnFEMI.
Example Lambda Function Code
The following is Lambda function code that shows how the initiating Lambda function starts a state machine execution using the Amazon SDK.
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
Example State Machine Code
The Express Workflow in this sample project consists of a set of Lambda functions for text processing.
For more information about how Amazon Step Functions can control other Amazon services, see Using Amazon Step Functions with other services.
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM Example
This example Amazon Identity and Access Management (IAM) policy generated by the sample project includes the least privilege necessary to execute the state machine and related resources. We recommend that you include only those permissions that are necessary in your IAM policies.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
The folowing policy ensures that there are sufficient permissions for CloudWatch Logs.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
For information about how to configure IAM when using Step Functions with other Amazon services, see IAM Policies for integrated services.