处理来自 Amazon SQS(快速工作流)的大批量消息 - AWS Step Functions
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

处理来自 Amazon SQS(快速工作流)的大批量消息

此示例项目演示如何使用 AWS Step Functions 快速工作流处理来自大批量事件源(如 Amazon Simple Queue Service (Amazon SQS))的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。

以下是从事件源执行状态机的两种常用方法:

  • 配置 Amazon CloudWatch Events 事件规则,以便在事件源发出事件时启动状态机执行。 有关更多信息,请参阅创建触发事件的 CloudWatch 事件规则

  • 将事件源映射到 Lambda 函数,并编写函数代码来执行状态机。 每次事件源发出事件时都会调用 AWS Lambda 函数,从而启动状态机执行。有关更多信息,请参阅将 AWS Lambda 与 Amazon SQS 结合使用

此示例项目使用第二种方法在 Amazon SQS 队列每次发送消息时启动执行。您可以使用类似的配置从其他事件源(如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis)触发快速工作流执行。

有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:

创建状态机并预置资源

  1. 打开 Step Functions 控制台,然后选择 Create a state machine (创建状态机)

  2. 选择 Run a sample project (运行示例项目),然后选择 Process high-volume messages from Amazon SQS (处理来自 SQS 的大批量消息)

    此时将显示状态机 Code (代码)Visual Workflow (可视工作流程)

    
          表达大批量 SQS 工作流。
  3. 选择 Next (下一步)

    此时将显示 Deploy resources (部署资源) 页面,其中列出了将创建的资源。对于本示例项目,资源包括:

    • Step Functions 状态机

    • Amazon SQS 队列

    • Lambda 函数

  4. 选择 Deploy Resources (部署资源)

    注意

    创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。当显示 Deploy resources (部署资源) 页面时,您可打开 Stack ID (堆栈 ID) 链接以查看正在预置的资源。

触发执行

  1. 打开 Amazon SQS 控制台

  2. 选择示例项目创建的队列。

    该名称将类似于 Example-SQSQueue-wJalrXUtnFEMI

  3. Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)

  4. 使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)

    注意

    在此示例消息中,已使用换行符对 input: 进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 选择 Close

  6. 打开 Step Functions 控制台

  7. 转到您的 Amazon CloudWatch Logs 日志组并检查日志。日志组的名称将类似于 example-ExpressLogGroup-wJalrXUtnFEMI

示例 Lambda 函数代码

以下 Lambda 函数代码显示启动的 Lambda 函数如何使用 AWS 开发工具包启动状态机执行。

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

示例状态机代码

此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。

有关 AWS Step Functions 如何控制其他 AWS 服务的更多信息,请参阅 服务与 AWS Step Functions 集成

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 示例

示例项目生成的此示例 AWS Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最小权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws-cn:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws-cn:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

以下策略确保 CloudWatch Logs 有足够的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他 AWS 服务一起使用时如何配置 IAM 的信息,请参阅 集成服务的 IAM 策略