本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Map 状态动态处理数据
该示例项目演示使用 Map
状态的动态并行性。此示例项目创建以下内容:
-
两个 Amazon Lambda 函数
-
Amazon Simple Queue Service (Amazon SQS) 队列
-
Amazon Simple Notification Service (Amazon SNS) 主题
-
Amazon DynamoDB 表
-
Amazon Step Functions 状态机
在本项目中,Step Functions 使用Amazon Lambda函数将消息从 Amazon SQS 队列中拉出,并将这些消息的 JSON 数组传递给Map
状态。对于队列中的每个消息,状态机将消息写入 DynamoDB,调用其他 Lambda 函数以从 Amazon SQS 中删除消息,然后将消息发布到 Amazon SNS 主题。
有关Map
State 和 Step Functions 服务集成,请参阅以下内容:
创建状态机并预配置资源
-
打开Step Functions 操作
然后选择创建状态机. -
选择示例项目,然后选择使用 Map 状态动态处理数据.
此时将显示状态机 Code (代码) 和 Visual Workflow (可视工作流程)。
-
请选择 Next (下一步)。
此时将显示 Deploy resources (部署资源) 页面,其中列出了将创建的资源。对于本示例项目,资源包括:
-
Amazon SQS 队列
-
一个 Amazon SNS 主题
-
DynamoDB 表
-
两个 Lambda 函数
-
Step Functions 状态机
-
-
选择 Deploy Resources (部署资源)。
注意 创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。当显示 Deploy resources (部署资源) 页面时,您可打开 Stack ID (堆栈 ID) 链接以查看正在预置的资源。
部署了示例项目的资源后,您需要在开始执行状态机之前,将项添加到 Amazon SQS 队列并订阅 Amazon SNS 主题。
订阅 Amazon SNS 主题
-
打开 Amazon SNS 控制台
。 -
选择 Topics (主题),然后选择
Map
状态示例项目创建的主题。该名称将类似于MapSampleProj-SNSTOPIC-1CQO4HQ3IR1KN.
-
在 Subscriptions (订阅) 下,选择 Create subscription (创建订阅)。
将显示创建订阅页面,其中列出该主题的主题 ARN。
-
对于 Protocol (协议),选择 Email (电子邮件)。
-
在 Endpoint (终端节点) 下,输入一个电子邮件地址以订阅该主题。
-
选择 Create subscription(创建订阅)。
注意 您必须先在电子邮件中确认订阅,然后才能激活该订阅。
-
在相关账户中打开 Subscription Confirmation (订阅确认) 电子邮件,然后打开 Confirm subscription (确认订阅) URL。
Subscription confirmed! (订阅确认!) 页面随即显示出来。
将消息添加到 Amazon SQS 队列
-
打开 Amazon SQS 控制台
。 -
选择
Map
状态示例项目创建的队列。该名称将类似于MapSampleProj-SQSQUeue-1UDIC9VZDORN7.
-
在 Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)。
-
在 Send a Message (发送消息) 窗口中,输入消息并选择 Send Message (发送消息)。
-
选择 Send Another Message (发送另一条消息)。
继续输入消息,直到 Amazon SQS 队列中有几条消息为止。
-
选择关闭。
启动新的执行
Amazon SNS 中的队列具有最终一致性。为了获得最佳结果,请在填充队列与运行状态机执行之间等待几分钟。
-
在存储库的状态机页面上,选择MapStateStateMachine示例项目创建的状态机,然后选择启动执行.
-
在 New execution 页面上,输入执行名称 (可选),然后选择 Start Execution (开始执行)。
(可选)要识别您的执行,您可以在名称。默认情况下,Step Functions 会自动生成唯一的执行名称。
注意 Step Functions 允许您创建包含非 ASCII 字符的状态机、执行和活动名称。这些非 ASCII 名称不适用于亚马逊CloudWatch. 为了确保你可以跟踪CloudWatch指标,请选择仅使用 ASCII 字符的名称。
-
(可选)在 Step Functions 上转到新创建的状态机控制面板,然后选择新的执行.
-
执行完成后,您可以在 Visual workflow (可视工作流) 上选择状态,并浏览 Step details (步骤详细信息) 下的 Input (输入) 和 Output (输出)。
示例状态机代码
此示例项目中的状态机通过将参数直接传递给 Amazon SQS、Amazon SNS 和 Lambda 来与这些资源进行集成。
浏览此示例状态机 Lambda 了解 Step Functions 如何通过连接到Resource
字段,然后通过Parameters
转到服务 API。
有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅将 Amazon Step Functions 与其他服务一起使用。
{
"Comment": "An example of the Amazon States Language for reading messages from an SQS queue and iteratively processing each message.",
"StartAt": "Read messages from SQS Queue",
"States": {
"Read messages from SQS Queue": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "MapSampleProj-ReadFromSQSQueueLambda-1MY3M63RMJVA9"
},
"Next": "Are there messages to process?"
},
"Are there messages to process?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$",
"StringEquals": "No messages",
"Next": "Finish"
}
],
"Default": "Process messages"
},
"Process messages": {
"Type": "Map",
"Next": "Finish",
"ItemsPath": "$",
"Parameters": {
"MessageNumber.$": "$$.Map.Item.Index",
"MessageDetails.$": "$$.Map.Item.Value"
},
"Iterator": {
"StartAt": "Write message to DynamoDB",
"States": {
"Write message to DynamoDB": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"ResultPath": null,
"Parameters": {
"TableName": "MapSampleProj-DDBTable-YJDJ1MKIN6C5",
"ReturnConsumedCapacity": "TOTAL",
"Item": {
"MessageId": {
"S.$": "$.MessageDetails.MessageId"
},
"Body": {
"S.$": "$.MessageDetails.Body"
}
}
},
"Next": "Remove message from SQS queue"
},
"Remove message from SQS queue": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"InputPath": "$.MessageDetails",
"ResultPath": null,
"Parameters": {
"FunctionName": "MapSampleProj-DeleteFromSQSQueueLambda-198J2839ZO5K2",
"Payload": {
"ReceiptHandle.$": "$.ReceiptHandle"
}
},
"Next": "Publish message to SNS topic"
},
"Publish message to SNS topic": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"InputPath": "$.MessageDetails",
"Parameters": {
"Subject": "Message from Step Functions!",
"Message.$": "$.Body",
"TopicArn": "arn:aws:sns:us-east-1:012345678910:MapSampleProj-SNSTopic-1CQO4HQ3IR1KN"
},
"End": true
}
}
}
},
"Finish": {
"Type": "Succeed"
}
}
}
IAM 示例
此示例Amazon Identity and Access Management示例项目生成的 (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在 IAM 策略中仅包含这些必需的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:012345678901:function:MapSampleProj-ReadFromSQSQueueLambda-1MY3M63RMJVA9",
"arn:aws:lambda:us-east-1:012345678901:function:MapSampleProj-DeleteFromSQSQueueLambda-198J2839ZO5K2"
],
"Effect": "Allow"
},
{
"Action": [
"dynamodb:PutItem"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:012345678901:table/MapSampleProj-DDBTable-YJDJ1MKIN6C5"
],
"Effect": "Allow"
},
{
"Action": [
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-1:012345678901:MapSampleProj-SNSTopic-1CQO4HQ3IR1KN"
],
"Effect": "Allow"
}
]
}
有关在将 Step Functions 与其他配置时如何配置 IAM 的信息。Amazon服务,请参阅集成服务的 IAM 政策.