使用映射状态动态处理数据 - AWS Step Functions
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用映射状态动态处理数据

该示例项目演示使用 Map 状态的动态并行性。此示例项目创建以下内容:

  • 两个 AWS Lambda 函数

  • Amazon Simple Queue Service (Amazon SQS) 队列

  • Amazon Simple Notification Service (Amazon SNS) 主题

  • 一个 Amazon DynamoDB 表

  • AWS Step Functions 状态机

在该项目中,Step Functions 使用一个 AWS Lambda 函数将消息从 Amazon SQS 队列中拉出,并将这些消息的 JSON 数组传递给 Map 状态。对于队列中的每个消息,状态机将消息写入 DynamoDB,调用其他 Lambda 函数以从 Amazon SQS 中删除消息,然后将消息发布到 Amazon SNS 主题。

有关 Map 状态和 Step Functions 服务集成的更多信息,请参阅以下内容。

创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择 Create a state machine (创建状态机)

  2. 选择 Sample Projects (示例项目),然后选择 Iterate steps with a Map state (具有映射状态的迭代步骤)

    此时将显示状态机 Code (代码)Visual Workflow (可视工作流程)

    
          映射状态工作流程。
  3. 选择 Next

    此时将显示 Deploy resources (部署资源) 页面,其中列出了将创建的资源。对于本示例项目,资源包括:

    • Amazon SQS 队列

    • Amazon SNS 主题

    • 一个 DynamoDB 表

    • 两个 Lambda 函数

    • Step Functions 状态机

  4. 选择 Deploy Resources (部署资源)

    注意

    创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。当显示 Deploy resources (部署资源) 页面时,您可打开 Stack ID (堆栈 ID) 链接以查看正在预置的资源。

部署了示例项目的资源后,您需要在开始执行状态机之前,将各个项添加到 Amazon SQS 队列并订阅 Amazon SNS 主题。

订阅 Amazon SNS 主题

  1. 打开 Amazon SNS 控制台

  2. 选择 Topics (主题),然后选择 Map 状态示例项目创建的主题。

    该名称将类似于 MapSampleProj-SNSTopic-1CQO4HQ3IR1KN

  3. Subscriptions (订阅) 下,选择 Create subscription (创建订阅)

    将显示创建订阅页面,其中列出该主题的主题 ARN

  4. 对于 Protocol (协议),选择 Email (电子邮件)

  5. Endpoint (终端节点) 下,输入一个电子邮件地址以订阅该主题。

  6. 选择 Create subscription (创建订阅)

    注意

    您必须先在电子邮件中确认订阅,然后才能激活该订阅。

  7. 在相关账户中打开 Subscription Confirmation (订阅确认) 电子邮件,然后打开 Confirm subscription (确认订阅) URL。

    Subscription confirmed! (订阅确认!) 页面随即显示出来。

将消息添加到 Amazon SQS 队列

  1. 打开 Amazon SQS 控制台

  2. 选择 Map 状态示例项目创建的队列。

    该名称将类似于 MapSampleProj-SQSQueue-1UDIC9VZDORN7

  3. Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)

  4. Send a Message (发送消息) 窗口中,输入消息并选择 Send Message (发送消息)

  5. 选择 Send Another Message (发送另一条消息)

    继续输入消息,直到 Amazon SQS 队列中有几条消息为止。

  6. 选择 Close

启动新的执行

注意

Amazon SNS 中的队列具有最终一致性。为了获得最佳结果,请在填充队列与运行状态机执行之间等待几分钟。

  1. 打开 Step Functions 控制台

  2. State machines (状态机) 页面上,选择示例项目创建的 MapStateStateMachine 状态机,然后选择 Start execution (开始执行)

  3. New execution 页面上,输入执行名称 (可选),然后选择 Start Execution (开始执行)

  4. (可选)为帮助您标识执行,您可以在 Enter an execution name (输入执行名称) 框中为执行指定一个 ID。如果未输入 ID,Step Functions 将自动生成一个唯一 ID。

    注意

    Step Functions 允许您创建包含非 ASCII 字符的状态机、执行和活动名称。这些非 ASCII 名称不适用于 Amazon CloudWatch。为确保您可以跟踪 CloudWatch 指标,请选择一个只使用 ASCII 字符的名称。

  5. (可选)转到 Step Functions Dashboard (控制面板) 上新创建的状态机,然后选择 New execution (新执行)

  6. 执行完成后,您可以在 Visual workflow (可视工作流) 上选择状态,并浏览 Step details (步骤详细信息) 下的 Input (输入)Output (输出)

示例状态机代码

此示例项目中的状态机通过将参数直接传递给这些资源来与 Amazon SQS、Amazon SNS 和 Lambda 集成。

浏览此示例状态机,通过连接到 Resource 字段中的 Amazon 资源名称 (ARN),并将 Parameters 传递给服务 API,查看 Step Functions 如何控制 Lambda、DynamoDB 和 Amazon SNS。

有关 AWS Step Functions 如何控制其他 AWS 服务的更多信息,请参阅 服务与 AWS Step Functions 集成

{ "Comment": "An example of the Amazon States Language for reading messages from an SQS queue and iteratively processing each message.", "StartAt": "Read messages from SQS Queue", "States": { "Read Messages from SQS Queue": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "MapSampleProj-ReadFromSQSQueueLambda-1MY3M63RMJVA9" }, "Next": "Are there messages to process?" }, "Are there messages to process?": { "Type": "Choice", "Choices": [ { "Variable": "$", "StringEquals": "No messages", "Next": "Finish" } ], "Default": "Process messages" }, "Process messages": { "Type": "Map", "Next": "Finish", "ItemsPath": "$", "Parameters": { "MessageNumber.$": "$$.Map.Item.Index", "MessageDetails.$": "$$.Map.Item.Value" }, "Iterator": { "StartAt": "Write message to DynamoDB", "States": { "Write message to DynamoDB": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:putItem", "ResultPath": null, "Parameters": { "TableName": "MapSampleProj-DDBTable-YJDJ1MKIN6C5", "ReturnConsumedCapacity": "TOTAL", "Item": { "MessageId": { "S.$": "$.MessageDetails.MessageId" }, "Body": { "S.$": "$.MessageDetails.Body" } } }, "Next": "Remove message from SQS queue" }, "Remove message from SQS queue": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "InputPath": "$.MessageDetails", "ResultPath": null, "Parameters": { "FunctionName": "MapSampleProj-DeleteFromSQSQueueLambda-198J2839ZO5K2", "Payload": { "ReceiptHandle.$": "$.ReceiptHandle" } }, "Next": "Publish message to SNS topic" }, "Publish message to SNS topic": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "InputPath": "$.MessageDetails", "Parameters": { "Subject": "Message from Step Functions!", "Message.$": "$.Body", "TopicArn": "arn:aws:sns:us-east-1:012345678910:MapSampleProj-SNSTopic-1CQO4HQ3IR1KN" }, "End": true } } } }, "Finish": { "Type": "Succeed" } } }

IAM 示例

示例项目生成的此示例 AWS Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:012345678901:function:MapSampleProj-ReadFromSQSQueueLambda-1MY3M63RMJVA9", "arn:aws:lambda:us-east-1:012345678901:function:MapSampleProj-DeleteFromSQSQueueLambda-198J2839ZO5K2" ], "Effect": "Allow" }, { "Action": [ "dynamodb:PutItem" ], "Resource": [ "arn:aws:dynamodb:us-east-1:012345678901:table/MapSampleProj-DDBTable-YJDJ1MKIN6C5" ], "Effect": "Allow" }, { "Action": [ "sns:Publish" ], "Resource": [ "arn:aws:sns:us-east-1:012345678901:MapSampleProj-SNSTopic-1CQO4HQ3IR1KN" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他 AWS 服务一起使用时如何配置 IAM 的信息,请参阅 集成服务的 IAM 策略