本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理来自 Amazon SQS 的大批量消息(快速工作流)
此示例项目演示如何使用Amazon Step Functions快速工作流处理来自大批量事件源(如 Amazon SQS)的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。
以下是从事件源执行状态机的两种常用方法:
-
配置 Amazon CloudWatch Events 规则,以便在事件源发出事件时启动状态机执行。有关更多信息,请参阅创建触发事件的 CloudWatch Events 规则。
-
将事件源映射到 Lambda 函数,并编写函数代码来执行状态机。每次事件源发出事件时都会调用 Amazon Lambda 函数,从而启动状态机执行。有关更多信息,请参阅使用Amazon Lambda使用 Amazon SQS.
此示例项目使用第二种方法在 Amazon SQS 队列发送消息时启动执行。您可以使用类似的配置从其它事件源(如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis)触发快速工作流执行。
有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:
创建状态机并预置资源
-
打开Step Functions 控制台
然后选择创建状态机. -
选择运行示例项目,然后选择处理来自 Amazon SQS 的大批量消息.
此时将显示状态机 Code (代码) 和 Visual Workflow (可视工作流程)。
-
选择下一步。
此时将显示 Deploy resources (部署资源) 页面,其中列出了将创建的资源。对于本示例项目,资源包括:
-
Step Functions 状态机
-
Amazon SQS 队列
-
Lambda 函数
-
-
选择 Deploy Resources (部署资源)。
注意 创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。当显示 Deploy resources (部署资源) 页面时,您可打开 Stack ID (堆栈 ID) 链接以查看正在预置的资源。
触发执行
-
打开 Amazon SQS 控制台
。 -
选择示例项目创建的队列。
该名称将类似于 Example-SQSQueue-wJalrXUtnFEMI。
-
在 Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)。
-
使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)。
注意 在此示例消息中,已使用换行符对
input:
进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
选择关闭。
-
转到您的Amazon CloudWatch Logs 日志组
然后检查日志。日志组的名称将类似于 example-ExpressLogGroup-wJalrXUtnFEMI。
示例 Lambda 函数代码
以下 Lambda 函数代码显示启动的 Lambda 函数如何使用AmazonSDK。
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
示例状态机代码
此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。
有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅将 Amazon Step Functions 与其他服务一起使用。
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM 示例
此示例Amazon Identity and Access Management示例项目生成的 (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
以下策略确保 CloudWatch Logs 有足够的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
有关在将 Step Functions 与其它配置时如何配置 IAM 的信息Amazon服务,请参阅集成服务的 IAM 政策.