FIFO 主题的代码示例 - Amazon Simple Notification Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

FIFO 主题的代码示例

您可以使用以下代码示例将 auto parts 价格管理示例使用案例与 SNS FIFO 主题和 SQS FIFO 队列集成。

使用 适用于 Java 的 AWS 开发工具包 2.x

使用 适用于 Java 的 AWS 开发工具包 2.x,您可以通过将其 Amazon SNS 属性设置为 true 来创建 FifoTopic FIFO 主题。您可以通过将其 Amazon SQS 属性设置为 true 来创建 FifoQueue FIFO 队列。此外,您必须将 .fifo 后缀添加到每个 FIFO 资源的名称。

注意

创建主题或队列资源作为 FIFO 后,您无法将其转换为标准主题或队列。

要运行这些示例,请按照 https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/getting-started.html适用于 Java 的 AWS 开发工具包 2.0 入门适用于 Java 的 AWS 开发工具包 2.x 开发人员指南中的说明操作。

创建 FIFO 主题 (适用于 Java 的开发工具包)

首先,创建以下 FIFO 资源:

  • 用于分发价格更新的 SNS FIFO 主题

  • SQS FIFO 队列,为两个应用程序(整个和零售)提供这些更新

  • 将两个队列连接到主题的 SNS FIFO 订阅

注意

如果您通过向主题发布消息来测试此代码示例,请确保您使用 business 属性发布消息。为属性值指定 retailwholesale。否则,消息将被筛选掉,并且不会传输到订阅的队列。

// Create API clients AWSCredentialsProvider credentials = getCredentials(); AmazonSNS sns = new AmazonSNSClient(credentials); AmazonSQS sqs = new AmazonSQSClient(credentials); // Create FIFO topic Map<String, String> topicAttributes = new HashMap<String, String>(); topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); String topicArn = sns.createTopic( new CreateTopicRequest() .withName("PriceUpdatesTopic.fifo") .withAttributes(topicAttributes) ).getTopicArn(); // Create FIFO queues Map<String, String> queueAttributes = new HashMap<String, String>(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "false"); String wholesaleQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("WholesaleQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); String retailQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("RetailQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); // Subscribe FIFO queues to FIFO topic, setting required permissions String wholesaleSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, wholesaleQueueUrl); String retailSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, retailQueueUrl);

请注意,已禁用基于内容的重复数据删除,因为发布具有相同正文的消息可能具有不同的属性,这些属性必须单独处理。价格管理系统使用消息属性来定义给定的价格更新是适用于批发应用程序、零售应用程序还是这两者。

有关消息重复数据删除的更多信息,请参阅FIFO 主题的消息重复数据删除

为 FIFO 订阅设置筛选策略 (适用于 Java 的开发工具包)

创建 SNS FIFO 订阅后,您可以设置其筛选策略,以便每个订阅者应用程序仅接收所需的价格更新。以下代码示例设置这些策略,其中,属性表示企业的类型:批发或零售。您可以在 SNSMessageFilterPolicy 中找到 订阅筛选策略作为 Java 集合 类的 Java 源代码。有关更多信息,请参阅FIFO 主题的消息筛选

// Set the Amazon SNS subscription filter polcies SNSMessageFilterPolicy wholesalePolicy = new SNSMessageFilterPolicy(); wholesalePolicy.addAttribute("business", "wholesale"); wholesalePolicy.apply(sns, wholesaleSubscriptionArn); SNSMessageFilterPolicy retailPolicy = new SNSMessageFilterPolicy(); retailPolicy.addAttribute("business", "retail"); retailPolicy.apply(sns, retailSubscriptionArn);

将消息发布到 FIFO 主题 (适用于 Java 的开发工具包)

现在,您已准备就绪,可将消息发布到 Amazon SNS FIFO 主题。以下代码示例编写并发布批发价格更新消息。

// Publish message to FIFO topic String subject = "Price Update"; String payload = "{\"product\": 214, \"price\": 79.99}"; String groupId = "PID-214"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put( attributeName, new MessageAttributeValue() .withDataType("String") .withStringValue(attributeValue)); sns.publish( new PublishRequest() .withTopicArn(topicArn) .withSubject(subject) .withMessage(payload) .withMessageGroupId(groupId); .withMessageDeduplicationId(dedupId) .withMessageAttributes(attributes);

从 FIFO 订阅接收消息

您现在可以在批发和零售应用程序中接收价格更新。如 FIFO 主题示例使用案例 中所示,每个使用者应用程序的入口点均为 SQS FIFO 队列,其相应的 AWS Lambda 函数可以自动轮询该队列。当 SQS FIFO 队列是 Lambda 函数的事件源时,Lambda 根据需要扩展其轮询器队列以有效使用消息。

有关更多信息,请参阅 AWS LambdaAmazon SQS 中的 与 AWS Lambda Developer Guide 一起使用。有关编写您自己的队列轮询器的信息,请参阅 Amazon SQS 中的 标准和 FIFO 队列的建议以及 Amazon Simple Queue Service 开发人员指南 中的 ReceiveMessage。Amazon Simple Queue Service API Reference

使用AWS CloudFormation

利用 AWS CloudFormation,您可以使用模板文件,创建并配置 AWS 资源作为单一单元。本节有一个示例模板,用于创建以下内容:

  • 用于分发价格更新的 SNS FIFO 主题

  • SQS FIFO 队列,为两个应用程序(整个和零售)提供这些更新

  • 将两个队列连接到主题的 SNS FIFO 订阅

  • 一个筛选策略,指定订阅者应用程序仅接收所需的价格更新

注意

如果您通过向主题发布消息来测试此代码示例,请确保您使用 business 属性发布消息。为属性值指定 retailwholesale。否则,消息将被筛选掉,并且不会传输到订阅的队列。

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "retail" ] } } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" } ] } } } }

有关使用 AWS CloudFormation 模板部署 AWS 资源的更多信息,请参阅 用户指南 中的入门AWS CloudFormation。