教程:将 Lambda 与 Amazon SQS 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:将 Lambda 与 Amazon SQS 结合使用

在此教程中,您将创建一个会使用来自某个 Amazon Simple Queue Service(Amazon SQS)队列的消息的 Lambda 函数。只要有新消息添加到队列,Lambda 函数就会运行。函数会将消息写入 Amazon CloudWatch Logs 日志流。下图显示了您用于完成教程的 Amazon 资源。


        显示 Amazon SQS 消息、Lambda 函数和 CloudWatch Logs 日志流的示意图

要完成本教程,请执行以下步骤:

  1. 创建将消息写入 CloudWatch Logs 的 Lambda 函数。

  2. 创建 Amazon SQS 队列。

  3. 创建 Lambda 事件源映射。事件源映射会读取 Amazon SQS 队列并在添加新消息时调用 Lambda 函数。

  4. 将消息添加到队列并在 CloudWatch Logs 中监控结果来测试设置。

先决条件

如果您还没有 Amazon Web Services 账户,请完成以下步骤来创建一个。

注册 Amazon Web Services 账户
  1. 打开 https://portal.aws.amazon.com/billing/signup

  2. 按照屏幕上的说明进行操作。

    在注册时,将接到一通电话,要求使用电话键盘输入一个验证码。

    当您注册 Amazon Web Services 账户时,系统将会创建一个 Amazon Web Services 账户根用户。根用户有权访问该账户中的所有 Amazon Web Services 和资源。作为安全最佳实践,请 为管理用户分配管理访问权限,并且只使用根用户执行需要根用户访问权限的任务

注册过程完成后,Amazon 会向您发送一封确认电子邮件。在任何时候,您都可以通过转至 https://aws.amazon.com/ 并选择我的账户来查看当前的账户活动并管理您的账户。

注册 Amazon Web Services 账户 后,启用多重身份验证 (MFA) 来保护您的管理用户。有关说明,请参阅 IAM 用户指南中的 为 IAM 用户启用虚拟 MFA 设备(控制台)

要授予其他用户访问您的 Amazon Web Services 账户资源的权限,请创建 IAM 用户。为了保护您的 IAM 用户,请启用 MFA 并仅向 IAM 用户授予执行任务所需的权限。

有关创建和保护 IAM 用户的更多信息,请参阅《IAM 用户指南》中的以下主题:

如果您尚未安装 Amazon Command Line Interface,请按照安装或更新最新版本的 Amazon CLI 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中,可使用您首选的 Shell 和程序包管理器。

注意

在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip)。安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。

创建执行角色


        步骤 1 创建执行角色

执行角色是一个 Amazon Identity and Access Management(IAM)角色,用于向 Lambda 函数授予访问 Amazon 服务和资源的权限。要允许函数从 Amazon SQS 中读取项目,请附加 AWSLambdaSQSQueueExecutionRole 权限策略。

创建执行角色并附加 Amazon SQS 权限策略
  1. 打开 IAM 控制台的角色页面

  2. 选择创建角色

  3. 可信实体类型中选择 Amazon 服务

  4. 使用案例中选择 Lambda

  5. 选择下一步

  6. 权限策略搜索框中输入 AWSLambdaSQSQueueExecutionRole

  7. 选择 AWSLambdaSQSQueueExecutionRole 策略,然后选择下一步

  8. 角色详细信息下,为角色名称输入 lambda-sqs-role,然后选择创建角色

角色创建后,记下执行角色的 Amazon 资源名称(ARN)。您将在后面的步骤中用到它。

创建函数


        步骤 2 创建 Lambda 函数

创建一个处理您的 Amazon SQS 消息的 Lambda 函数。函数代码将 Amazon SQS 消息的正文记录到 CloudWatch Logs 中。

本教程使用 Node.js 18.x 运行时系统,但我们还提供了其他运行时系统语言的示例代码。您可以选择以下框中的选项卡,查看适用于您感兴趣的运行时系统的代码。您将在此步骤中使用的 JavaScript 代码是 JavaScript 选项卡中显示的第一个示例。

.NET
Amazon SDK for .NET
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 .NET 将 SQS 事件与 Lambda 结合使用。

using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SqsIntegrationSampleCode { public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context) { foreach (var message in evnt.Records) { await ProcessMessageAsync(message, context); } context.Logger.LogInformation("done"); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { try { context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } catch (Exception e) { //You can use Dead Letter Queue to handle failures. By configuring a Lambda DLQ. context.Logger.LogError($"An error occurred"); throw; } } }
Go
适用于 Go V2 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Go 将 SQS 事件与 Lambda 结合使用。

package integration_sqs_to_lambda import ( "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.SQSEvent) error { for _, record := range event.Records { err := processMessage(record) if err != nil { return err } } fmt.Println("done") return nil } func processMessage(record events.SQSMessage) error { fmt.Printf("Processed message %s\n", record.Body) // TODO: Do interesting work based on the new message return nil } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Java 将 SQS 事件与 Lambda 结合使用。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; public class Function implements RequestHandler<SQSEvent, Void> { @Override public Void handleRequest(SQSEvent sqsEvent, Context context) { for (SQSMessage msg : sqsEvent.getRecords()) { processMessage(msg, context); } context.getLogger().log("done"); return null; } private void processMessage(SQSMessage msg, Context context) { try { context.getLogger().log("Processed message " + msg.getBody()); // TODO: Do interesting work based on the new message } catch (Exception e) { context.getLogger().log("An error occurred"); throw e; } } }
JavaScript
SDK for JavaScript (v2)
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 JavaScript 将 SQS 事件与 Lambda 结合使用。

exports.handler = async (event, context) => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message) { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }

通过 TypeScript 将 SQS 事件与 Lambda 结合使用。

import { SQSEvent, Context, SQSHandler, SQSRecord } from "aws-lambda"; export const functionHandler: SQSHandler = async ( event: SQSEvent, context: Context ): Promise<void> => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message: SQSRecord): Promise<any> { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }
PHP
适用于 PHP 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 PHP 将 SQS 事件与 Lambda 结合使用。

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\InvalidLambdaEvent; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @param SqsEvent $event * @param Context $context * @return void * @throws InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { try { foreach ($event->getRecords() as $record) { $body = $record->getBody(); // TODO: Do interesting work based on the new message } } catch (InvalidLambdaEvent $e) { $this->logger->error($e->getMessage()); throw $e; } } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python(Boto3)
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 SQS 事件与 Lambda 结合使用。

def lambda_handler(event, context): for message in event['Records']: process_message(message) print("done") def process_message(message): try: print(f"Processed message {message['body']}") # TODO: Do interesting work based on the new message except Exception as err: print("An error occurred") raise err
Ruby
适用于 Ruby 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Ruby 将 SQS 事件与 Lambda 结合使用。

def lambda_handler(event:, context:) event['Records'].each do |message| process_message(message) end puts "done" end def process_message(message) begin puts "Processed message #{message['body']}" # TODO: Do interesting work based on the new message rescue StandardError => err puts "An error occurred" raise err end end
Rust
适用于 Rust 的 SDK
注意

在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Rust 将 SQS 事件与 Lambda 结合使用。

use aws_lambda_events::event::sqs::SqsEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> { event.payload.records.iter().for_each(|record| { // process the record tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default()) }); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
创建 Node.js Lambda 函数
  1. 为项目创建一个目录,然后切换到该目录。

    mkdir sqs-tutorial cd sqs-tutorial
  2. 将示例 JavaScript 代码复制到名为 index.js 的新文件中。

  3. 使用以下 zip 命令创建部署包。

    zip function.zip index.js
  4. 使用 create-function Amazon CLI 命令创建 Lambda 函数。对于 role 参数,请输入您之前创建的执行角色的 ARN。

    aws lambda create-function --function-name ProcessSQSRecord \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-sqs-role

测试此函数


        步骤 3 测试 Lambda 函数

使用 invoke Amazon CLI 命令和一个示例 Amazon SQS 事件手动调用您的 Lambda 函数。

使用示例事件调用 Lambda 函数
  1. 将下列 JSON保存为名为 input.json 的文件。此 JSON 模拟 Amazon SQS 可能发送到 Lambda 函数的事件,其中 "body" 包含来自该队列的实际消息。在本示例中,消息为 "test"

    例 Amazon SQS 事件

    此为测试事件,无需您更改消息或账号。

    { "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "test", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "098f6bcd4621d373cade4e832627b4f6", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:my-queue", "awsRegion": "us-east-1" } ] }
  2. 运行以下调用 Amazon CLI 命令。此命令将在响应中返回 CloudWatch 日志。有关检索日志的更多信息,请参阅使用 Amazon CLI 访问日志

    aws lambda invoke --function-name ProcessSQSRecord --payload file://input.json out --log-type Tail \ --query 'LogResult' --output text --cli-binary-format raw-in-base64-out | base64 --decode

    如果使用 cli-binary-format 版本 2,则 Amazon CLI 选项是必需的。要将其设为默认设置,请运行 aws configure set cli-binary-format raw-in-base64-out。有关更多信息,请参阅版本 2 的 Amazon Command Line Interface 用户指南中的 Amazon CLI 支持的全局命令行选项

  3. 在响应中查找 INFO 日志。Lambda 函数会在此处记录消息正文。应看到类似如下内容的日志:

    2023-09-11T22:45:04.271Z 348529ce-2211-4222-9099-59d07d837b60 INFO Processed message test 2023-09-11T22:45:04.288Z 348529ce-2211-4222-9099-59d07d837b60 INFO done

创建 Amazon SQS 队列


        步骤 4 创建 Amazon SQS 队列

创建一个可由 Lambda 函数用作事件源的 Amazon SQS 队列。

创建队列
  1. 打开 Amazon SQS 控制台

  2. 选择创建队列

  3. 输入队列名称。将所有其他选项保留为默认设置。

  4. 选择创建队列

在创建队列后,记下其 ARN。在下一步中将该队列与您的 Lambda 函数关联时,您将需要此类信息。

配置事件源


        步骤 5 配置事件源映射

创建事件源映射,将 Amazon SQS 队列连接到 Lambda 函数。事件源映射会读取 Amazon SQS 队列并在添加新消息时调用 Lambda 函数。

要在 Amazon SQS 队列与 Lambda 函数之间创建映射,请使用以下 create-event-source-mapping Amazon CLI 命令。例如:

aws lambda create-event-source-mapping --function-name ProcessSQSRecord --batch-size 10 \ --event-source-arn arn:aws:sqs:us-east-1:111122223333:my-queue

要获取事件源映射列表,请使用 list-event-source-mappings 命令。例如:

aws lambda list-event-source-mappings --function-name ProcessSQSRecord

发送测试消息


        步骤 6 发送测试消息
将 Amazon SQS 消息发送到 Lambda 函数
  1. 打开 Amazon SQS 控制台

  2. 选择您之前创建的队列。

  3. 选择发送和接收消息

  4. 消息正文下输入测试消息,例如“这是一条测试消息”。

  5. 选择 Send message(发送消息)。

Lambda 轮询队列以获取更新。当有新消息时,Lambda 会使用该新的事件数据从队列中调用您的函数。如果该函数处理程序正常返回并且没有异常,则 Lambda 认为该消息得到成功处理并开始读取队列中的新消息。成功处理消息后,Lambda 从队列中将其自动删除。如果该处理程序引发异常,则 Lambda 认为消息的批量处理并未成功进行,并且 Lambda 会用相同的批量消息调用该函数。

查看 CloudWatch 日志


        步骤 6 发送测试消息
确认函数已处理消息
  1. 打开 Lamba 控制台的函数页面

  2. 选择 ProcessSQSRecord 函数。

  3. 选择 Monitor (监控)

  4. 选择查看 CloudWatch 日志

  5. 在 CloudWatch 控制台中,为该函数选择日志流

  6. 查找 INFO 日志。Lambda 函数会在此处记录消息正文。您应该能看到从 Amazon SQS 队列发送的消息。例如:

    2023-09-11T22:49:12.730Z b0c41e9c-0556-5a8b-af83-43e59efeec71 INFO Processed message this is a test message.

清除资源

除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除不再使用的Amazon资源,可防止您的Amazon账户产生不必要的费用。

删除执行角色
  1. 打开 IAM 控制台的角色页面

  2. 选择您创建的执行角色。

  3. 选择删除

  4. 在文本输入字段中输入角色名称,然后选择 Delete(删除)。

删除 Lambda 函数
  1. 打开 Lamba 控制台的 Functions(函数)页面

  2. 选择您创建的函数。

  3. 依次选择操作删除

  4. 在文本输入字段中键入 delete,然后选择 Delete(删除)。

删除 Amazon SQS 队列
  1. 登录到 Amazon Web Services Management Console 并打开 Amazon SQS 控制台,网址:https://console.aws.amazon.com/vpc/

  2. 选择创建的队列。

  3. 选择 删除

  4. 在文本输入字段中输入 confirm

  5. 选择 Delete(删除)。