Process High-Volume Messages from Amazon SQS (Express Workflows)
This sample project demonstrates how to use an Amazon Step Functions Express Workflow to process messages or data from a high-volume event source, such as Amazon Simple Queue Service (Amazon SQS). Because Express Workflows can be started at a very high rate, they are ideal for high-volume event processing or streaming data workloads.
Here are two commonly used methods to execute your state machine from an event source:
-
Configure an Amazon CloudWatch Events rule to start a state machine execution whenever the event source emits an event. For more information, see Creating a CloudWatch Events Rule That Triggers on an Event.
-
Map the event source to a Lambda function, and write function code to execute your state machine. The Amazon Lambda function is invoked each time your event source emits an event, in turn starting a state machine execution. For more information see Using Amazon Lambda with Amazon SQS.
This sample project uses the second method to start an execution each time the Amazon SQS queue sends a message. You can use a similar configuration to trigger Express Workflows execution from other event sources, such as Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and Amazon Kinesis.
For more information about Express Workflows and Step Functions service integrations, see the following:
Step 1: Create the state machine and provision resources
-
Open the Step Functions console
and choose Create state machine. -
Type
Process high-volume messages from SQS
in the search box, and then choose Process high-volume messages from SQS from the search results that are returned. -
Choose Next to continue.
-
Step Functions lists the Amazon Web Services used in the sample project you selected. It also shows a workflow graph for the sample project. Deploy this project to your Amazon Web Services account or use it as a starting point for building your own projects. Based on how you want to proceed, choose Run a demo or Build on it.
This sample project deploys the following resources:
-
Four Lambda function
-
An Amazon SQS queue
-
An Amazon Step Functions state machine
-
Related Amazon Identity and Access Management (IAM) roles
The following image shows the workflow graph for the Process high-volume messages from SQS sample project:
-
-
Choose Use template to continue with your selection.
-
Do one of the following:
-
If you selected Build on it, Step Functions creates the workflow prototype for the sample project you selected. Step Functions doesn't deploy the resources listed in the workflow definition.
In Workflow Studio's Design mode, drag and drop states from the States browser to continue building your workflow protoype. Or switch to the Code mode that provides an integrated code editor similar to VS Code for updating the Amazon States Language (ASL) definition of your state machine within the Step Functions console. For more information about using Workflow Studio to build your state machines, see Using Workflow Studio.
Important
Remember to update the placeholder Amazon Resource Name (ARN) for the resources used in the sample project before you run your workflow.
-
If you selected Run a demo, Step Functions creates a read-only sample project which uses an Amazon CloudFormation template to deploy the Amazon resources listed in that template to your Amazon Web Services account.
Tip
To view the state machine definition of the sample project, choose Code.
When you're ready, choose Deploy and run to deploy the sample project and create the resources.
It can take up to 10 minutes for these resources and related IAM permissions to be created. While your resources are being deployed, you can open the CloudFormation Stack ID link to see which resources are being provisioned.
After all the resources in the sample project are created, you can see the new sample project listed on the State machines page.
Important
Standard charges may apply for each service used in the CloudFormation template.
-
Step 2: Trigger the state machine execution
-
Open the Amazon SQS console
. -
Select the queue that was created by the sample project.
The name will be similar to Example-SQSQueue-wJalrXUtnFEMI.
-
In the Queue Actions list, select Send a Message.
-
Use the copy button to copy the following message, and on the Send a Message window, enter it, and choose Send Message.
Note
In this sample message, the
input:
line has been formatted with line breaks to fit the page. Use the copy button or otherwise ensure that it is entered as a single line with no breaks.{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
Choose Close.
-
Open the Step Functions console.
-
Go to your Amazon CloudWatch Logs log group
and inspect the logs. The name of the log group will look like example-ExpressLogGroup-wJalrXUtnFEMI.
Example Lambda Function Code
The following is Lambda function code that shows how the initiating Lambda function starts a state machine execution using the Amazon SDK.
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
Example State Machine Code
The Express Workflow in this sample project consists of a set of Lambda functions for text processing.
For more information about how Amazon Step Functions can control other Amazon services, see Using Amazon Step Functions with other services.
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM Example
This example Amazon Identity and Access Management (IAM) policy generated by the sample project includes the least privilege necessary to execute the state machine and related resources. We recommend that you include only those permissions that are necessary in your IAM policies.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws-cn:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
The folowing policy ensures that there are sufficient permissions for CloudWatch Logs.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
For information about how to configure IAM when using Step Functions with other Amazon services, see IAM Policies for integrated services.