FIFO 主题的代码示例 - Amazon Simple Notification Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

FIFO 主题的代码示例

您可以使用以下代码示例来将汽车零部件价格管理示例使用案例与 SNS FIFO 主题和 SQS FIFO 队列。

使用 Amazon SDK for Java 2.x

使用Amazon SDK for Java 2.x创建 Amazon SNS FIFO 主题,方法是设置FifoTopic属性为 true。您可以创建一个 Amazon SQS FIFO 队列,方法是设置FifoQueue属性为 true。此外,您必须将.fifo后缀为每个 FIFO 资源的名称。

注意

将主题或队列资源创建为 FIFO 后,无法将其转换为标准主题或队列。

若要运行以下示例,请参阅以下说明:入门AmazonSDK for Java 2.0中的Amazon SDK for Java 2.x开发人员指南

创建 FIFO 主题(SDK for Java)

首先,创建以下 FIFO 资源:

  • 分发价格更新的 SNS FIFO 主题

  • 为两个应用程序(批发和零售)提供这些更新的 SQS FIFO 队列

  • 将两个队列连接到主题的 SNS FIFO 订阅

注意

如果通过向主题发布消息来测试此代码示例,请确保使用business中庭 指定retail或者wholesale作为属性值。否则,消息将被过滤掉,而不会传递到订阅的队列。

// Create API clients AWSCredentialsProvider credentials = getCredentials(); AmazonSNS sns = new AmazonSNSClient(credentials); AmazonSQS sqs = new AmazonSQSClient(credentials); // Create FIFO topic Map<String, String> topicAttributes = new HashMap<String, String>(); topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); String topicArn = sns.createTopic( new CreateTopicRequest() .withName("PriceUpdatesTopic.fifo") .withAttributes(topicAttributes) ).getTopicArn(); // Create FIFO queues Map<String, String> queueAttributes = new HashMap<String, String>(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "false"); String wholesaleQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("WholesaleQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); String retailQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("RetailQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); // Subscribe FIFO queues to FIFO topic, setting required permissions String wholesaleSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, wholesaleQueueUrl); String retailSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, retailQueueUrl);

请注意,已禁用基于内容的重复数据消除,因为使用相同正文发布的邮件可能具有不同的属性,必须独立处理。价格管理系统使用消息属性来定义给定价格更新是适用于批发应用程序、零售应用程序还是应用于这两者。

有关消息重复数据消除的更多信息,请参阅FIFO 主题的消息重复数据消除

为 FIFO 订阅设置筛选器策略(SDK for Java)

创建 SNS FIFO 订阅后,您可以将其筛选策略,以便每个订阅者应用程序只接收它需要的价格更新。以下代码示例设置这些策略,其中属性表示业务类型(批发或零售)。您可以找到适用于SNSMessageFilterPolicy订阅筛选策略作为 Java 集合。有关更多信息,请参阅FIFO 主题的消息过滤

// Set the Amazon SNS subscription filter polcies SNSMessageFilterPolicy wholesalePolicy = new SNSMessageFilterPolicy(); wholesalePolicy.addAttribute("business", "wholesale"); wholesalePolicy.apply(sns, wholesaleSubscriptionArn); SNSMessageFilterPolicy retailPolicy = new SNSMessageFilterPolicy(); retailPolicy.addAttribute("business", "retail"); retailPolicy.apply(sns, retailSubscriptionArn);

将消息发布到 FIFO 主题(SDK for Java)

现在,您已准备就绪,可以向 Amazon SNS FIFO 主题发布消息。以下代码示例合成并发布批发价格更新消息。

// Publish message to FIFO topic String subject = "Price Update"; String payload = "{\"product\": 214, \"price\": 79.99}"; String groupId = "PID-214"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put( attributeName, new MessageAttributeValue() .withDataType("String") .withStringValue(attributeValue)); sns.publish( new PublishRequest() .withTopicArn(topicArn) .withSubject(subject) .withMessage(payload) .withMessageGroupId(groupId); .withMessageDeduplicationId(dedupId) .withMessageAttributes(attributes);

从 FIFO 订阅接收消息

您现在可以在批发和零售应用程序中接收价格更新。如以下所示:FIFO 主题示例用例,每个消费者应用程序的入口点是 SQS FIFO 队列,其相应的Amazon Lambda函数可以自动轮询。当 SQS FIFO 队列是 Lambda 函数的事件源时,Lambda 会根据需要扩展其轮询器队列,以有效地使用消息。

有关更多信息,请参阅 。使用Amazon LambdaAmazon SQS中的Amazon Lambda开发人员指南。有关编写自己的队列轮询程序的信息,请参阅针对 Amazon SQS 标准和 FIFO 队列的建议中的Amazon Simple Queue Service 开发指南ReceiveMessage中的Amazon Simple Queue Service API 参考

使用 Amazon CloudFormation

Amazon CloudFormation利用,您可以使用模板文件创建并配置Amazon资源作为单一单元。通过本部分提供的模板示例,您可以创建以下内容:

  • 分发价格更新的 SNS FIFO 主题

  • 为两个应用程序(批发和零售)提供这些更新的 SQS FIFO 队列

  • 将两个队列连接到主题的 SNS FIFO 订阅

  • A筛选策略,指定订阅者应用程序只接收他们需要的价格更新

注意

如果通过向主题发布消息来测试此代码示例,请确保使用business中庭 指定retail或者wholesale作为属性值。否则,消息将被过滤掉,而不会传递到订阅的队列。

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "retail" ] } } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" } ] } } } }

有关部署Amazon资源,使用Amazon CloudFormation模板,请参阅开始使用中的Amazon CloudFormation用户指南