Process High-Volume Messages from Amazon SQS (Express Workflows) - Amazon Step Functions
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Process High-Volume Messages from Amazon SQS (Express Workflows)

This sample project demonstrates how to use an Amazon Step Functions Express Workflow to process messages or data from a high-volume event source, such as Amazon Simple Queue Service (Amazon SQS). Because Express Workflows can be started at a very high rate, they are ideal for high-volume event processing or streaming data workloads.

Here are two commonly used methods to execute your state machine from an event source:

  • Configure an Amazon CloudWatch Events rule to start a state machine execution whenever the event source emits an event. For more information, see Creating a CloudWatch Events Rule That Triggers on an Event.

  • Map the event source to a Lambda function, and write function code to execute your state machine. The Amazon Lambda function is invoked each time your event source emits an event, in turn starting a state machine execution. For more information see Using Amazon Lambda with Amazon SQS.

This sample project uses the second method to start an execution each time the Amazon SQS queue sends a message. You can use a similar configuration to trigger Express Workflows execution from other event sources, such as Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and Amazon Kinesis.

For more information about Express Workflows and Step Functions service integrations, see the following:

Step 1: Create the state machine and provision resources

  1. Open the Step Functions console and choose Create state machine.

  2. Type Process high-volume messages from SQS in the search box, and then choose Process high-volume messages from SQS from the search results that are returned.

  3. Choose Next to continue.

  4. Step Functions lists the Amazon Web Services used in the sample project you selected. It also shows a workflow graph for the sample project. Deploy this project to your Amazon Web Services account or use it as a starting point for building your own projects. Based on how you want to proceed, choose Run a demo or Build on it.

    This sample project deploys the following resources:

    • Four Lambda function

    • An Amazon SQS queue

    • An Amazon Step Functions state machine

    • Related Amazon Identity and Access Management (IAM) roles

    The following image shows the workflow graph for the Process high-volume messages from SQS sample project:

    
            Workflow graph of the Process high-volume messages from SQS sample project.
  5. Choose Use template to continue with your selection.

  6. Do one of the following:

    • If you selected Build on it, Step Functions creates the workflow prototype for the sample project you selected. Step Functions doesn't deploy the resources listed in the workflow definition.

      In Workflow Studio's Design mode, drag and drop states from the States browser to continue building your workflow protoype. Or switch to the Code mode that provides an integrated code editor similar to VS Code for updating the Amazon States Language (ASL) definition of your state machine within the Step Functions console. For more information about using Workflow Studio to build your state machines, see Using Workflow Studio.

      Important

      Remember to update the placeholder Amazon Resource Name (ARN) for the resources used in the sample project before you run your workflow.

    • If you selected Run a demo, Step Functions creates a read-only sample project which uses an Amazon CloudFormation template to deploy the Amazon resources listed in that template to your Amazon Web Services account.

      Tip

      To view the state machine definition of the sample project, choose Code.

      When you're ready, choose Deploy and run to deploy the sample project and create the resources.

      It can take up to 10 minutes for these resources and related IAM permissions to be created. While your resources are being deployed, you can open the CloudFormation Stack ID link to see which resources are being provisioned.

      After all the resources in the sample project are created, you can see the new sample project listed on the State machines page.

      Important

      Standard charges may apply for each service used in the CloudFormation template.

Step 2: Trigger the state machine execution

  1. Open the Amazon SQS console.

  2. Select the queue that was created by the sample project.

    The name will be similar to Example-SQSQueue-wJalrXUtnFEMI.

  3. In the Queue Actions list, select Send a Message.

  4. Use the copy button to copy the following message, and on the Send a Message window, enter it, and choose Send Message.

    Note

    In this sample message, the input: line has been formatted with line breaks to fit the page. Use the copy button or otherwise ensure that it is entered as a single line with no breaks.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Choose Close.

  6. Open the Step Functions console.

  7. Go to your Amazon CloudWatch Logs log group and inspect the logs. The name of the log group will look like example-ExpressLogGroup-wJalrXUtnFEMI.

Example Lambda Function Code

The following is Lambda function code that shows how the initiating Lambda function starts a state machine execution using the Amazon SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Example State Machine Code

The Express Workflow in this sample project consists of a set of Lambda functions for text processing.

For more information about how Amazon Step Functions can control other Amazon services, see Using Amazon Step Functions with other services.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM Example

This example Amazon Identity and Access Management (IAM) policy generated by the sample project includes the least privilege necessary to execute the state machine and related resources. We recommend that you include only those permissions that are necessary in your IAM policies.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws-cn:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

The folowing policy ensures that there are sufficient permissions for CloudWatch Logs.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

For information about how to configure IAM when using Step Functions with other Amazon services, see IAM Policies for integrated services.