使用适用于 JavaScript (v3) 的软件开发工具包的亚马逊 SQS 示例 - Amazon SDK for JavaScript
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon SDK for JavaScript V3 API 参考指南详细描述了 Amazon SDK for JavaScript 版本 3 (V3) 的所有 API 操作。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用适用于 JavaScript (v3) 的软件开发工具包的亚马逊 SQS 示例

以下代码示例向您展示如何使用带有 Amazon SQS 的 Amazon SDK for JavaScript (v3) 来执行操作和实现常见场景。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景和跨服务示例的上下文查看操作。

场景 是展示如何通过在同一服务中调用多个函数来完成特定任务的代码示例。

每个示例都包含一个指向的链接 GitHub,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 Amazon SQS。

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

初始化 Amazon SQS 客户端并列出队列。

import { SQSClient, paginateListQueues } from "@aws-sdk/client-sqs"; export const helloSqs = async () => { // The configuration object (`{}`) is required. If the region and credentials // are omitted, the SDK uses your local configuration if it exists. const client = new SQSClient({}); // You can also use `ListQueuesCommand`, but to use that command you must // handle the pagination yourself. You can do that by sending the `ListQueuesCommand` // with the `NextToken` parameter from the previous request. const paginatedQueues = paginateListQueues({ client }, {}); const queues = []; for await (const page of paginatedQueues) { if (page.QueueUrls?.length) { queues.push(...page.QueueUrls); } } const suffix = queues.length === 1 ? "" : "s"; console.log( `Hello, Amazon SQS! You have ${queues.length} queue${suffix} in your account.`, ); console.log(queues.map((t) => ` * ${t}`).join("\n")); };
  • 有关 API 的详细信息,请参阅 Amazon SDK for JavaScript API 参考ListQueues中的。

操作

以下代码示例演示了如何使用 ChangeMessageVisibility

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收 Amazon SQS 消息并更改其超时可见性。

import { ReceiveMessageCommand, ChangeMessageVisibilityCommand, SQSClient, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 1, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); const response = await client.send( new ChangeMessageVisibilityCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, VisibilityTimeout: 20, }), ); console.log(response); return response; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收 Amazon SQS 消息并更改其超时可见性。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region to us-west-2 AWS.config.update({ region: "us-west-2" }); // Create the SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var queueURL = "https://sqs.REGION.amazonaws.com/ACCOUNT-ID/QUEUE-NAME"; var params = { AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueURL, }; sqs.receiveMessage(params, function (err, data) { if (err) { console.log("Receive Error", err); } else { // Make sure we have a message if (data.Messages != null) { var visibilityParams = { QueueUrl: queueURL, ReceiptHandle: data.Messages[0].ReceiptHandle, VisibilityTimeout: 20, // 20 second timeout }; sqs.changeMessageVisibility(visibilityParams, function (err, data) { if (err) { console.log("Delete Error", err); } else { console.log("Timeout Changed", data); } }); } else { console.log("No messages to change"); } } });

以下代码示例演示了如何使用 CreateQueue

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建 Amazon SQS 标准队列。

import { CreateQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "test-queue"; export const main = async (sqsQueueName = SQS_QUEUE_NAME) => { const command = new CreateQueueCommand({ QueueName: sqsQueueName, Attributes: { DelaySeconds: "60", MessageRetentionPeriod: "86400", }, }); const response = await client.send(command); console.log(response); return response; };

使用长轮询创建 Amazon SQS 队列。

import { CreateQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "queue_name"; export const main = async (queueName = SQS_QUEUE_NAME) => { const response = await client.send( new CreateQueueCommand({ QueueName: queueName, Attributes: { // When the wait time for the ReceiveMessage API action is greater than 0, // long polling is in effect. The maximum long polling wait time is 20 // seconds. Long polling helps reduce the cost of using Amazon SQS by, // eliminating the number of empty responses and false empty responses. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html ReceiveMessageWaitTimeSeconds: "20", }, }), ); console.log(response); return response; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建 Amazon SQS 标准队列。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = { QueueName: "SQS_QUEUE_NAME", Attributes: { DelaySeconds: "60", MessageRetentionPeriod: "86400", }, }; sqs.createQueue(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data.QueueUrl); } });

创建等待消息到达的 Amazon SQS 队列。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create the SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = { QueueName: "SQS_QUEUE_NAME", Attributes: { ReceiveMessageWaitTimeSeconds: "20", }, }; sqs.createQueue(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data.QueueUrl); } });

以下代码示例演示了如何使用 DeleteMessage

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收和删除 Amazon SQS 消息。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };
  • 有关 API 的详细信息,请参阅 Amazon SDK for JavaScript API 参考DeleteMessage中的。

适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收和删除 Amazon SQS 消息。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var queueURL = "SQS_QUEUE_URL"; var params = { AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueURL, VisibilityTimeout: 20, WaitTimeSeconds: 0, }; sqs.receiveMessage(params, function (err, data) { if (err) { console.log("Receive Error", err); } else if (data.Messages) { var deleteParams = { QueueUrl: queueURL, ReceiptHandle: data.Messages[0].ReceiptHandle, }; sqs.deleteMessage(deleteParams, function (err, data) { if (err) { console.log("Delete Error", err); } else { console.log("Message Deleted", data); } }); } });

以下代码示例演示了如何使用 DeleteMessageBatch

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };
  • 有关 API 的详细信息,请参阅《Amazon SDK for JavaScript API 参考》中的 DeleteMessageBatch

以下代码示例演示了如何使用 DeleteQueue

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

删除 Amazon SQS 队列。

import { DeleteQueueCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "test-queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new DeleteQueueCommand({ QueueUrl: queueUrl }); const response = await client.send(command); console.log(response); return response; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

删除 Amazon SQS 队列。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = { QueueUrl: "SQS_QUEUE_URL", }; sqs.deleteQueue(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data); } });

以下代码示例演示了如何使用 GetQueueAttributes

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { GetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const getQueueAttributes = async (queueUrl = SQS_QUEUE_URL) => { const command = new GetQueueAttributesCommand({ QueueUrl: queueUrl, AttributeNames: ["DelaySeconds"], }); const response = await client.send(command); console.log(response); // { // '$metadata': { // httpStatusCode: 200, // requestId: '747a1192-c334-5682-a508-4cd5e8dc4e79', // extendedRequestId: undefined, // cfId: undefined, // attempts: 1, // totalRetryDelay: 0 // }, // Attributes: { DelaySeconds: '1' } // } return response; };
  • 有关 API 的详细信息,请参阅 Amazon SDK for JavaScript API 参考中的GetQueue属性

以下代码示例演示了如何使用 GetQueueUrl

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

获取 Amazon SQS 队列的 URL。

import { GetQueueUrlCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_NAME = "test-queue"; export const main = async (queueName = SQS_QUEUE_NAME) => { const command = new GetQueueUrlCommand({ QueueName: queueName }); const response = await client.send(command); console.log(response); return response; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

获取 Amazon SQS 队列的 URL。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = { QueueName: "SQS_QUEUE_NAME", }; sqs.getQueueUrl(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data.QueueUrl); } });

以下代码示例演示了如何使用 ListQueues

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

列出 Amazon SQS 队列。

import { paginateListQueues, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); export const main = async () => { const paginatedListQueues = paginateListQueues({ client }, {}); /** @type {string[]} */ const urls = []; for await (const page of paginatedListQueues) { const nextUrls = page.QueueUrls?.filter((qurl) => !!qurl) || []; urls.push(...nextUrls); urls.forEach((url) => console.log(url)); } return urls; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

列出 Amazon SQS 队列。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = {}; sqs.listQueues(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data.QueueUrls); } });

以下代码示例演示了如何使用 ReceiveMessage

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

接收来自 Amazon SQS 队列的消息。

import { ReceiveMessageCommand, DeleteMessageCommand, SQSClient, DeleteMessageBatchCommand, } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const receiveMessage = (queueUrl) => client.send( new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 10, MessageAttributeNames: ["All"], QueueUrl: queueUrl, WaitTimeSeconds: 20, VisibilityTimeout: 20, }), ); export const main = async (queueUrl = SQS_QUEUE_URL) => { const { Messages } = await receiveMessage(queueUrl); if (!Messages) { return; } if (Messages.length === 1) { console.log(Messages[0].Body); await client.send( new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: Messages[0].ReceiptHandle, }), ); } else { await client.send( new DeleteMessageBatchCommand({ QueueUrl: queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } };

使用长轮询支持接收来自 Amazon SQS 队列的消息。

import { ReceiveMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new ReceiveMessageCommand({ AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueUrl, // The duration (in seconds) for which the call waits for a message // to arrive in the queue before returning. If a message is available, // the call returns sooner than WaitTimeSeconds. If no messages are // available and the wait time expires, the call returns successfully // with an empty list of messages. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestSyntax WaitTimeSeconds: 20, }); const response = await client.send(command); console.log(response); return response; };
  • 有关 API 的详细信息,请参阅 Amazon SDK for JavaScript API 参考ReceiveMessage中的。

适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

使用长轮询接收来自 Amazon SQS 队列的消息。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create the SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var queueURL = "SQS_QUEUE_URL"; var params = { AttributeNames: ["SentTimestamp"], MaxNumberOfMessages: 1, MessageAttributeNames: ["All"], QueueUrl: queueURL, WaitTimeSeconds: 20, }; sqs.receiveMessage(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data); } });

以下代码示例演示了如何使用 SendMessage

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

向 Amazon SQS 队列发送消息。

import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; export const main = async (sqsQueueUrl = SQS_QUEUE_URL) => { const command = new SendMessageCommand({ QueueUrl: sqsQueueUrl, DelaySeconds: 10, MessageAttributes: { Title: { DataType: "String", StringValue: "The Whistler", }, Author: { DataType: "String", StringValue: "John Grisham", }, WeeksOn: { DataType: "Number", StringValue: "6", }, }, MessageBody: "Information about current NY Times fiction bestseller for week of 12/11/2016.", }); const response = await client.send(command); console.log(response); return response; };
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

向 Amazon SQS 队列发送消息。

// Load the AWS SDK for Node.js var AWS = require("aws-sdk"); // Set the region AWS.config.update({ region: "REGION" }); // Create an SQS service object var sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); var params = { // Remove DelaySeconds parameter and value for FIFO queues DelaySeconds: 10, MessageAttributes: { Title: { DataType: "String", StringValue: "The Whistler", }, Author: { DataType: "String", StringValue: "John Grisham", }, WeeksOn: { DataType: "Number", StringValue: "6", }, }, MessageBody: "Information about current NY Times fiction bestseller for week of 12/11/2016.", // MessageDeduplicationId: "TheWhistler", // Required for FIFO queues // MessageGroupId: "Group1", // Required for FIFO queues QueueUrl: "SQS_QUEUE_URL", }; sqs.sendMessage(params, function (err, data) { if (err) { console.log("Error", err); } else { console.log("Success", data.MessageId); } });

以下代码示例演示了如何使用 SetQueueAttributes

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue-url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new SetQueueAttributesCommand({ QueueUrl: queueUrl, Attributes: { DelaySeconds: "1", }, }); const response = await client.send(command); console.log(response); return response; };

将 Amazon SQS 队列配置为使用长轮询。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; export const main = async (queueUrl = SQS_QUEUE_URL) => { const command = new SetQueueAttributesCommand({ Attributes: { ReceiveMessageWaitTimeSeconds: "20", }, QueueUrl: queueUrl, }); const response = await client.send(command); console.log(response); return response; };

配置死信队列。

import { SetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs"; const client = new SQSClient({}); const SQS_QUEUE_URL = "queue_url"; const DEAD_LETTER_QUEUE_ARN = "dead_letter_queue_arn"; export const main = async ( queueUrl = SQS_QUEUE_URL, deadLetterQueueArn = DEAD_LETTER_QUEUE_ARN, ) => { const command = new SetQueueAttributesCommand({ Attributes: { RedrivePolicy: JSON.stringify({ // Amazon SQS supports dead-letter queues (DLQ), which other // queues (source queues) can target for messages that can't // be processed (consumed) successfully. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html deadLetterTargetArn: deadLetterQueueArn, maxReceiveCount: "10", }), }, QueueUrl: queueUrl, }); const response = await client.send(command); console.log(response); return response; };
  • 有关 API 的详细信息,请参阅 Amazon SDK for JavaScript API 参考中的SetQueue属性

场景

以下代码示例演示了操作流程:

  • 创建主题(FIFO 或非 FIFO)。

  • 针对主题订阅多个队列,并提供应用筛选条件的选项。

  • 将消息发布到主题。

  • 轮询队列中是否有收到的消息。

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

这是此工作流程的入口点。

import { SNSClient } from "@aws-sdk/client-sns"; import { SQSClient } from "@aws-sdk/client-sqs"; import { TopicsQueuesWkflw } from "./TopicsQueuesWkflw.js"; import { Prompter } from "@aws-doc-sdk-examples/lib/prompter.js"; import { SlowLogger } from "@aws-doc-sdk-examples/lib/slow-logger.js"; export const startSnsWorkflow = () => { const noLoggerDelay = process.argv.find((arg) => arg === "--no-logger-delay"); const snsClient = new SNSClient({}); const sqsClient = new SQSClient({}); const prompter = new Prompter(); const logger = noLoggerDelay ? console : new SlowLogger(25); const wkflw = new TopicsQueuesWkflw(snsClient, sqsClient, prompter, logger); wkflw.start(); };

前面的代码提供必要的依赖关系并启动工作流程。下一节包含示例的大部分内容。

const toneChoices = [ { name: "cheerful", value: "cheerful" }, { name: "funny", value: "funny" }, { name: "serious", value: "serious" }, { name: "sincere", value: "sincere" }, ]; export class TopicsQueuesWkflw { // SNS topic is configured as First-In-First-Out isFifo = true; // Automatic content-based deduplication is enabled. autoDedup = false; snsClient; sqsClient; topicName; topicArn; subscriptionArns = []; /** * @type {{ queueName: string, queueArn: string, queueUrl: string, policy?: string }[]} */ queues = []; prompter; /** * @param {import('@aws-sdk/client-sns').SNSClient} snsClient * @param {import('@aws-sdk/client-sqs').SQSClient} sqsClient * @param {import('../../libs/prompter.js').Prompter} prompter * @param {import('../../libs/logger.js').Logger} logger */ constructor(snsClient, sqsClient, prompter, logger) { this.snsClient = snsClient; this.sqsClient = sqsClient; this.prompter = prompter; this.logger = logger; } async welcome() { await this.logger.log(MESSAGES.description); } async confirmFifo() { await this.logger.log(MESSAGES.snsFifoDescription); this.isFifo = await this.prompter.confirm({ message: MESSAGES.snsFifoPrompt, }); if (this.isFifo) { this.logger.logSeparator(MESSAGES.headerDedup); await this.logger.log(MESSAGES.deduplicationNotice); await this.logger.log(MESSAGES.deduplicationDescription); this.autoDedup = await this.prompter.confirm({ message: MESSAGES.deduplicationPrompt, }); } } async createTopic() { await this.logger.log(MESSAGES.creatingTopics); this.topicName = await this.prompter.input({ message: MESSAGES.topicNamePrompt, }); if (this.isFifo) { this.topicName += ".fifo"; this.logger.logSeparator(MESSAGES.headerFifoNaming); await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.snsClient.send( new CreateTopicCommand({ Name: this.topicName, Attributes: { FifoTopic: this.isFifo ? "true" : "false", ...(this.autoDedup ? { ContentBasedDeduplication: "true" } : {}), }, }), ); this.topicArn = response.TopicArn; await this.logger.log( MESSAGES.topicCreatedNotice .replace("${TOPIC_NAME}", this.topicName) .replace("${TOPIC_ARN}", this.topicArn), ); } async createQueues() { await this.logger.log(MESSAGES.createQueuesNotice); // Increase this number to add more queues. let maxQueues = 2; for (let i = 0; i < maxQueues; i++) { await this.logger.log(MESSAGES.queueCount.replace("${COUNT}", i + 1)); let queueName = await this.prompter.input({ message: MESSAGES.queueNamePrompt.replace( "${EXAMPLE_NAME}", i === 0 ? "good-news" : "bad-news", ), }); if (this.isFifo) { queueName += ".fifo"; await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.sqsClient.send( new CreateQueueCommand({ QueueName: queueName, Attributes: { ...(this.isFifo ? { FifoQueue: "true" } : {}) }, }), ); const { Attributes } = await this.sqsClient.send( new GetQueueAttributesCommand({ QueueUrl: response.QueueUrl, AttributeNames: ["QueueArn"], }), ); this.queues.push({ queueName, queueArn: Attributes.QueueArn, queueUrl: response.QueueUrl, }); await this.logger.log( MESSAGES.queueCreatedNotice .replace("${QUEUE_NAME}", queueName) .replace("${QUEUE_URL}", response.QueueUrl) .replace("${QUEUE_ARN}", Attributes.QueueArn), ); } } async attachQueueIamPolicies() { for (const [index, queue] of this.queues.entries()) { const policy = JSON.stringify( { Statement: [ { Effect: "Allow", Principal: { Service: "sns.amazonaws.com", }, Action: "sqs:SendMessage", Resource: queue.queueArn, Condition: { ArnEquals: { "aws:SourceArn": this.topicArn, }, }, }, ], }, null, 2, ); if (index !== 0) { this.logger.logSeparator(); } await this.logger.log(MESSAGES.attachPolicyNotice); console.log(policy); const addPolicy = await this.prompter.confirm({ message: MESSAGES.addPolicyConfirmation.replace( "${QUEUE_NAME}", queue.queueName, ), }); if (addPolicy) { await this.sqsClient.send( new SetQueueAttributesCommand({ QueueUrl: queue.queueUrl, Attributes: { Policy: policy, }, }), ); queue.policy = policy; } else { await this.logger.log( MESSAGES.policyNotAttachedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } } async subscribeQueuesToTopic() { for (const [index, queue] of this.queues.entries()) { /** * @type {import('@aws-sdk/client-sns').SubscribeCommandInput} */ const subscribeParams = { TopicArn: this.topicArn, Protocol: "sqs", Endpoint: queue.queueArn, }; let tones = []; if (this.isFifo) { if (index === 0) { await this.logger.log(MESSAGES.fifoFilterNotice); } tones = await this.prompter.checkbox({ message: MESSAGES.fifoFilterSelect.replace( "${QUEUE_NAME}", queue.queueName, ), choices: toneChoices, }); if (tones.length) { subscribeParams.Attributes = { FilterPolicyScope: "MessageAttributes", FilterPolicy: JSON.stringify({ tone: tones, }), }; } } const { SubscriptionArn } = await this.snsClient.send( new SubscribeCommand(subscribeParams), ); this.subscriptionArns.push(SubscriptionArn); await this.logger.log( MESSAGES.queueSubscribedNotice .replace("${QUEUE_NAME}", queue.queueName) .replace("${TOPIC_NAME}", this.topicName) .replace("${TONES}", tones.length ? tones.join(", ") : "none"), ); } } async publishMessages() { const message = await this.prompter.input({ message: MESSAGES.publishMessagePrompt, }); let groupId, deduplicationId, choices; if (this.isFifo) { await this.logger.log(MESSAGES.groupIdNotice); groupId = await this.prompter.input({ message: MESSAGES.groupIdPrompt, }); if (this.autoDedup === false) { await this.logger.log(MESSAGES.deduplicationIdNotice); deduplicationId = await this.prompter.input({ message: MESSAGES.deduplicationIdPrompt, }); } choices = await this.prompter.checkbox({ message: MESSAGES.messageAttributesPrompt, choices: toneChoices, }); } await this.snsClient.send( new PublishCommand({ TopicArn: this.topicArn, Message: message, ...(groupId ? { MessageGroupId: groupId, } : {}), ...(deduplicationId ? { MessageDeduplicationId: deduplicationId, } : {}), ...(choices ? { MessageAttributes: { tone: { DataType: "String.Array", StringValue: JSON.stringify(choices), }, }, } : {}), }), ); const publishAnother = await this.prompter.confirm({ message: MESSAGES.publishAnother, }); if (publishAnother) { await this.publishMessages(); } } async receiveAndDeleteMessages() { for (const queue of this.queues) { const { Messages } = await this.sqsClient.send( new ReceiveMessageCommand({ QueueUrl: queue.queueUrl, }), ); if (Messages) { await this.logger.log( MESSAGES.messagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); console.log(Messages); await this.sqsClient.send( new DeleteMessageBatchCommand({ QueueUrl: queue.queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } else { await this.logger.log( MESSAGES.noMessagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } const deleteAndPoll = await this.prompter.confirm({ message: MESSAGES.deleteAndPollConfirmation, }); if (deleteAndPoll) { await this.receiveAndDeleteMessages(); } } async destroyResources() { for (const subscriptionArn of this.subscriptionArns) { await this.snsClient.send( new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }), ); } for (const queue of this.queues) { await this.sqsClient.send( new DeleteQueueCommand({ QueueUrl: queue.queueUrl }), ); } if (this.topicArn) { await this.snsClient.send( new DeleteTopicCommand({ TopicArn: this.topicArn }), ); } } async start() { console.clear(); try { this.logger.logSeparator(MESSAGES.headerWelcome); await this.welcome(); this.logger.logSeparator(MESSAGES.headerFifo); await this.confirmFifo(); this.logger.logSeparator(MESSAGES.headerCreateTopic); await this.createTopic(); this.logger.logSeparator(MESSAGES.headerCreateQueues); await this.createQueues(); this.logger.logSeparator(MESSAGES.headerAttachPolicy); await this.attachQueueIamPolicies(); this.logger.logSeparator(MESSAGES.headerSubscribeQueues); await this.subscribeQueuesToTopic(); this.logger.logSeparator(MESSAGES.headerPublishMessage); await this.publishMessages(); this.logger.logSeparator(MESSAGES.headerReceiveMessages); await this.receiveAndDeleteMessages(); } catch (err) { console.error(err); } finally { await this.destroyResources(); } } }

无服务器示例

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收因接收来自 SNS 队列的消息而触发的事件。该函数从事件参数检索消息并记录每条消息的内容。

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Lambd JavaScript a 使用 SQS 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message) { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }

使用 Lambd TypeScript a 使用 SQS 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, Context, SQSHandler, SQSRecord } from "aws-lambda"; export const functionHandler: SQSHandler = async ( event: SQSEvent, context: Context ): Promise<void> => { for (const message of event.Records) { await processMessageAsync(message); } console.info("done"); }; async function processMessageAsync(message: SQSRecord): Promise<any> { try { console.log(`Processed message ${message.body}`); // TODO: Do interesting work based on the new message await Promise.resolve(1); //Placeholder for actual async work } catch (err) { console.error("An error occurred"); throw err; } }

以下代码示例展示了如何为接收来自 SQS 队列的事件的 Lambda 函数实现部分批处理响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

适用于 JavaScript (v3) 的软件开发工具包
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用报告 Lambda JavaScript 的 SQS 批处理项目失败。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

使用报告 Lambda TypeScript 的 SQS 批处理项目失败。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }