处理来自 Amazon SQS(快速工作流)的大批量消息 - Amazon Step Functions
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

处理来自 Amazon SQS(快速工作流)的大批量消息

此示例项目演示了如何使用 Amazon Step Functions Express Workflow 来处理来自高容量事件源(例如亚马逊简单队列服务 (Amazon SQS))的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。

以下是从事件源执行状态机的两种常用方法:

此示例项目使用第二种方法在 Amazon SQS 队列每次发送消息时启动执行。您可以使用类似的配置,触发来自其他事件源的快速工作流执行,比如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。

有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:

第 1 步:创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 在搜索框中键入 Process high-volume messages from SQS,然后从返回的搜索结果中选择处理来自 SQS 的大批量消息

  3. 选择下一步以继续。

  4. Step Functions 列出了您选择的示例项目中 Amazon Web Services 使用的。它还显示了示例项目的工作流图。将此项目部署到您的, Amazon Web Services 账户 或者将其用作构建您自己的项目的起点。根据您想继续的方式,选择运行演示构建依据

    该示例项目部署了以下资源:

    • 四个 Lambda 函数

    • 一个 Amazon SQS 队列

    • Amazon Step Functions 状态机

    • 相关 Amazon Identity and Access Management (IAM) 角色

    下图显示了处理来自 SQS 的大批量消息示例项目的工作流图:

    处理来自 SQS 的大批量消息示例项目的工作流图。
  5. 选择使用模板继续进行选择。

  6. 请执行以下操作之一:

    • 如果您选择构建依据,Step Functions 将为您选择的示例项目创建工作流原型。Step Functions 不会部署工作流定义中列出的资源。

      在 Workflow Studio 的设计模式下,从状态浏览器中拖放状态,继续构建工作流原型。或者切换到代码模式,该模式提供了一个类似于 VS Code 的集成代码编辑器,用于在 Step Functions 控制台中更新状态机的 Amazon States Language (ASL) 定义。有关使用 Workflow Studio 构建状态机的更多信息,请参阅使用 Workflow Studio

      重要

      请记住,在运行工作流之前,为示例项目中使用的资源更新占位符 Amazon 资源名称 (ARN)。

    • 如果您选择了 “运行演示”,Step Functions 将创建一个只读示例项目,该项目使用 Amazon CloudFormation 模板将该模板中列出的 Amazon 资源部署到您的 Amazon Web Services 账户。

      提示

      要查看示例项目的状态机定义,请选择代码

      准备就绪后,选择部署并运行以部署示例项目并创建资源。

      创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。在部署资源时,您可以打开 CloudFormation 堆栈 ID 链接以查看正在配置哪些资源。

      创建示例项目中的所有资源后,您可以在状态机页面上看到新的示例项目。

      重要

      CloudFormation 模板中使用的每项服务都可能收取标准费用。

第 2 步:触发状态机执行

  1. 打开 Amazon SQS 控制台

  2. 选择示例项目创建的队列。

    该名称将类似于 Example-SQSQueue-wJalrXUtnFEMI

  3. Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)

  4. 使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)

    注意

    在此示例消息中,已使用换行符对 input: 进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 选择关闭

  6. 打开 Step Functions 控制台

  7. 前往您的 Amazon CloudWatch 日志组并检查日志。日志组的名称将类似于示例 ExpressLogGroup-wj alrxutnFemi。

示例 Lambda 函数代码

以下是 Lambda 函数代码,它显示了启动的 Lambda 函数如何使用软件开发工具包启动状态机执行。 Amazon

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

示例状态机代码

此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。

有关 Amazon Step Functions 如何控制其他 Amazon 服务的更多信息,请参阅与其他服务 Amazon Step Functions 一起使用

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 示例

示例项目生成的此示例 Amazon Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

以下策略可确保有足够的 CloudWatch 日志权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他 Amazon 服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 策略