FIFO 主题的代码示例 - Amazon Simple Notification Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

FIFO 主题的代码示例

您可以使用以下代码示例来将汽车零部件价格管理示例使用案例与 SNS FIFO 主题和 SQS FIFO 队列集成。

使用 Amazon 软件开发工具包

使用 Amazon 开发工具包,您可以通过将 Amazon SNS FIFO 主题的 FifoTopic 属性设置为 true 来创建该主题。您可以通过将 Amazon SQS FIFO 队列的 FifoQueue 属性设置为 true 来创建该队列。此外,您必须将 .fifo 后缀添加到每个 FIFO 资源的名称。创建 FIFO 主题或队列后,无法将其转换为标准主题或队列。

以下代码示例创建这些 FIFO 资源:

  • 分发价格更新的 SNS FIFO 主题

  • 为两个应用程序(批发和零售)提供这些更新的 SQS FIFO 队列

  • 将两个队列连接到主题的 SNS FIFO 订阅

本示例将设置订阅上的筛选条件策略。如果通过向主题发布消息来测试示例,请确保您发布的是带 business 属性的消息。为属性值指定 retailwholesale。否则,消息将被筛选掉,且不会传递到订阅的队列中。有关更多信息,请参阅 FIFO 主题的消息筛选

Java
SDK for Java 1.x

创建 FIFO 主题和 FIFO 队列。为队列订阅主题。

// Create API clients AWSCredentialsProvider credentials = getCredentials(); AmazonSNS sns = new AmazonSNSClient(credentials); AmazonSQS sqs = new AmazonSQSClient(credentials); // Create FIFO topic Map<String, String> topicAttributes = new HashMap<String, String>(); topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); String topicArn = sns.createTopic( new CreateTopicRequest() .withName("PriceUpdatesTopic.fifo") .withAttributes(topicAttributes) ).getTopicArn(); // Create FIFO queues Map<String, String> queueAttributes = new HashMap<String, String>(); queueAttributes.put("FifoQueue", "true"); // Disable content-based deduplication because messages published with the same body // might carry different attributes that must be processed independently. // The price management system uses the message attributes to define whether a given // price update applies to the wholesale application or to the retail application. queueAttributes.put("ContentBasedDeduplication", "false"); String wholesaleQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("WholesaleQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); String retailQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("RetailQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); // Subscribe FIFO queues to FIFO topic, setting required permissions String wholesaleSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, wholesaleQueueUrl); String retailSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, retailQueueUrl);

为每个订阅设置筛选条件策略,以便每个订阅者应用程序仅接收它需要的价格更新。

// Set the Amazon SNS subscription filter policies SNSMessageFilterPolicy wholesalePolicy = new SNSMessageFilterPolicy(); wholesalePolicy.addAttribute("business", "wholesale"); wholesalePolicy.apply(sns, wholesaleSubscriptionArn); SNSMessageFilterPolicy retailPolicy = new SNSMessageFilterPolicy(); retailPolicy.addAttribute("business", "retail"); retailPolicy.apply(sns, retailSubscriptionArn);

编写并发布更新批发价格的消息。

// Publish message to FIFO topic String subject = "Price Update"; String payload = "{\"product\": 214, \"price\": 79.99}"; String groupId = "PID-214"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put( attributeName, new MessageAttributeValue() .withDataType("String") .withStringValue(attributeValue)); sns.publish( new PublishRequest() .withTopicArn(topicArn) .withSubject(subject) .withMessage(payload) .withMessageGroupId(groupId); .withMessageDeduplicationId(dedupId) .withMessageAttributes(attributes);
  • GitHub 中查找说明和更多代码。

接收来自 FIFO 订阅的消息

您现在可以在批发和零售应用程序中接收价格更新。如 FIFO 主题示例使用案例 中所示,每个使用者应用程序的入口点是 SQS FIFO 队列,其相应的 Amazon Lambda 函数可以自动轮询。当 SQS FIFO 队列是 Lambda 函数的事件源时,Lambda 会根据需要扩展其轮询器队列,以高效地使用消息。

有关更多信息,请参阅 Amazon Lambda 开发人员指南中的将 Amazon Lambda 与 Amazon SQS 结合使用。有关编写自己的队列轮询器的信息,请参阅 Amazon Simple Queue Service 开发人员指南中的 Amazon SQS 标准和 FIFO 队列的建议Amazon Simple Queue Service API 参考 中的 ReceiveMessage

使用 Amazon CloudFormation

利用 Amazon CloudFormation,您可以使用模板文件,创建并配置 Amazon 资源的集合作为单一单元。本部分提供的模板示例,用于创建以下内容:

  • 分发价格更新的 SNS FIFO 主题

  • 为两个应用程序(批发和零售)提供这些更新的 SQS FIFO 队列

  • 将两个队列连接到主题的 SNS FIFO 订阅

  • 指定订阅者应用程序的筛选策略只接收他们需要的价格更新

注意

如果通过向主题发布消息来测试此代码示例,请确保您发布的是带 business 属性的消息。为属性值指定 retailwholesale。否则,消息将被筛选掉,且不会传递到订阅的队列中。

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicy": { "business": [ "retail" ] } } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" } ] } } } }

有关使用 Amazon CloudFormation 模板部署 Amazon 资源的更多信息,请参阅 Amazon CloudFormation 用户指南中的入门