Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
FIFO主题的代码示例
您可以使用以下代码示例将使用亚马逊SNSFIFO主题的汽车零件价格管理示例用例与亚马逊SQSFIFO队列或标准队列进行集成。
使用 Amazon SDK
使用时 Amazon SDK,您可以通过将其FifoTopic
属性设置为来创建亚马逊SNSFIFO主题true
。您可以通过将 Amazon SQS FIFO 队列的FifoQueue
属性设置为来创建该队列true
。此外,您还必须在每个FIFO资源的名称中添加.fifo
后缀。创建FIFO主题或队列后,您无法将其转换为标准主题或队列。
以下代码示例创建了这些资源FIFO和标准队列资源:
-
发布价格SNSFIFO更新的 Amazon 主题
-
为批发和零售应用程序提供这些更新的Amazon SQS FIFO 队列
-
用于存储记录的分析应用程序的 Amazon SQS 标准队列,可以查询这些记录以获取商业智能 (BI)
-
将三个队列与主题关联的 Amazon SNS FIFO 订阅
本示例将设置订阅中的筛选条件策略。如果通过向主题发布消息来测试示例,请确保您发布的是带 business
属性的消息。为属性值指定 retail
或 wholesale
。否则,消息将被筛选掉,且不会传递到订阅的队列中。有关更多信息,请参阅 针对FIFO主题的消息过滤。
- Java
-
- SDK适用于 Java 2.x
-
此示例
该测试验证每个队列是否收到消息。完整的示例还显示了添加访问策略,并在最后删除了资源。
public class PriceUpdateExample {
public final static SnsClient snsClient = SnsClient.create();
public final static SqsClient sqsClient = SqsClient.create();
public static void main(String[] args) {
final String usage = "\n" +
"Usage: " +
" <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" +
"Where:\n" +
" fifoTopicName - The name of the FIFO topic that you want to create. \n\n" +
" wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n"
+
" retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" +
" analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n";
if (args.length != 4) {
System.out.println(usage);
System.exit(1);
}
final String fifoTopicName = args[0];
final String wholeSaleQueueName = args[1];
final String retailQueueName = args[2];
final String analyticsQueueName = args[3];
// For convenience, the QueueData class holds metadata about a queue: ARN, URL,
// name and type.
List<QueueData> queues = List.of(
new QueueData(wholeSaleQueueName, QueueType.FIFO),
new QueueData(retailQueueName, QueueType.FIFO),
new QueueData(analyticsQueueName, QueueType.Standard));
// Create queues.
createQueues(queues);
// Create a topic.
String topicARN = createFIFOTopic(fifoTopicName);
// Subscribe each queue to the topic.
subscribeQueues(queues, topicARN);
// Allow the newly created topic to send messages to the queues.
addAccessPolicyToQueuesFINAL(queues, topicARN);
// Publish a sample price update message with payload.
publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables");
// Clean up resources.
deleteSubscriptions(queues);
deleteQueues(queues);
deleteTopic(topicARN);
}
public static String createFIFOTopic(String topicName) {
try {
// Create a FIFO topic by using the SNS service client.
Map<String, String> topicAttributes = Map.of(
"FifoTopic", "true",
"ContentBasedDeduplication", "false");
CreateTopicRequest topicRequest = CreateTopicRequest.builder()
.name(topicName)
.attributes(topicAttributes)
.build();
CreateTopicResponse response = snsClient.createTopic(topicRequest);
String topicArn = response.topicArn();
System.out.println("The topic ARN is" + topicArn);
return topicArn;
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
return "";
}
public static void subscribeQueues(List<QueueData> queues, String topicARN) {
queues.forEach(queue -> {
SubscribeRequest subscribeRequest = SubscribeRequest.builder()
.topicArn(topicARN)
.endpoint(queue.queueARN)
.protocol("sqs")
.build();
// Subscribe to the endpoint by using the SNS service client.
// Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
// topic.
SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest);
System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]");
queue.subscriptionARN = subscribeResponse.subscriptionArn();
});
}
public static void publishPriceUpdate(String topicArn, String payload, String groupId) {
try {
// Create and publish a message that updates the wholesale price.
String subject = "Price Update";
String dedupId = UUID.randomUUID().toString();
String attributeName = "business";
String attributeValue = "wholesale";
MessageAttributeValue msgAttValue = MessageAttributeValue.builder()
.dataType("String")
.stringValue(attributeValue)
.build();
Map<String, MessageAttributeValue> attributes = new HashMap<>();
attributes.put(attributeName, msgAttValue);
PublishRequest pubRequest = PublishRequest.builder()
.topicArn(topicArn)
.subject(subject)
.message(payload)
.messageGroupId(groupId)
.messageDeduplicationId(dedupId)
.messageAttributes(attributes)
.build();
final PublishResponse response = snsClient.publish(pubRequest);
System.out.println(response.messageId());
System.out.println(response.sequenceNumber());
System.out.println("Message was published to " + topicArn);
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
}
- Python
-
- SDK适用于 Python (Boto3)
-
创建亚马逊SNSFIFO主题,为该主题订阅亚马逊SQSFIFO和标准队列,然后向该主题发布消息。
def usage_demo():
"""Shows how to subscribe queues to a FIFO topic."""
print("-" * 88)
print("Welcome to the `Subscribe queues to a FIFO topic` demo!")
print("-" * 88)
sns = boto3.resource("sns")
sqs = boto3.resource("sqs")
fifo_topic_wrapper = FifoTopicWrapper(sns)
sns_wrapper = SnsWrapper(sns)
prefix = "sqs-subscribe-demo-"
queues = set()
subscriptions = set()
wholesale_queue = sqs.create_queue(
QueueName=prefix + "wholesale.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(wholesale_queue)
print(f"Created FIFO queue with URL: {wholesale_queue.url}.")
retail_queue = sqs.create_queue(
QueueName=prefix + "retail.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(retail_queue)
print(f"Created FIFO queue with URL: {retail_queue.url}.")
analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={})
queues.add(analytics_queue)
print(f"Created standard queue with URL: {analytics_queue.url}.")
topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo")
print(f"Created FIFO topic: {topic.attributes['TopicArn']}.")
for q in queues:
fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"])
print(f"Added access policies for topic: {topic.attributes['TopicArn']}.")
for q in queues:
sub = fifo_topic_wrapper.subscribe_queue_to_topic(
topic, q.attributes["QueueArn"]
)
subscriptions.add(sub)
print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.")
input("Press Enter to publish a message to the topic.")
message_id = fifo_topic_wrapper.publish_price_update(
topic, '{"product": 214, "price": 79.99}', "Consumables"
)
print(f"Published price update with message ID: {message_id}.")
# Clean up the subscriptions, queues, and topic.
input("Press Enter to clean up resources.")
for s in subscriptions:
sns_wrapper.delete_subscription(s)
sns_wrapper.delete_topic(topic)
for q in queues:
fifo_topic_wrapper.delete_queue(q)
print(f"Deleted subscriptions, queues, and topic.")
print("Thanks for watching!")
print("-" * 88)
class FifoTopicWrapper:
"""Encapsulates Amazon SNS FIFO topic and subscription functions."""
def __init__(self, sns_resource):
"""
:param sns_resource: A Boto3 Amazon SNS resource.
"""
self.sns_resource = sns_resource
def create_fifo_topic(self, topic_name):
"""
Create a FIFO topic.
Topic names must be made up of only uppercase and lowercase ASCII letters,
numbers, underscores, and hyphens, and must be between 1 and 256 characters long.
For a FIFO topic, the name must end with the .fifo suffix.
:param topic_name: The name for the topic.
:return: The new topic.
"""
try:
topic = self.sns_resource.create_topic(
Name=topic_name,
Attributes={
"FifoTopic": str(True),
"ContentBasedDeduplication": str(False),
},
)
logger.info("Created FIFO topic with name=%s.", topic_name)
return topic
except ClientError as error:
logger.exception("Couldn't create topic with name=%s!", topic_name)
raise error
@staticmethod
def add_access_policy(queue, topic_arn):
"""
Add the necessary access policy to a queue, so
it can receive messages from a topic.
:param queue: The queue resource.
:param topic_arn: The ARN of the topic.
:return: None.
"""
try:
queue.set_attributes(
Attributes={
"Policy": json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "test-sid",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SQS:SendMessage",
"Resource": queue.attributes["QueueArn"],
"Condition": {
"ArnLike": {"aws:SourceArn": topic_arn}
},
}
],
}
)
}
)
logger.info("Added trust policy to the queue.")
except ClientError as error:
logger.exception("Couldn't add trust policy to the queue!")
raise error
@staticmethod
def subscribe_queue_to_topic(topic, queue_arn):
"""
Subscribe a queue to a topic.
:param topic: The topic resource.
:param queue_arn: The ARN of the queue.
:return: The subscription resource.
"""
try:
subscription = topic.subscribe(
Protocol="sqs",
Endpoint=queue_arn,
)
logger.info("The queue is subscribed to the topic.")
return subscription
except ClientError as error:
logger.exception("Couldn't subscribe queue to topic!")
raise error
@staticmethod
def publish_price_update(topic, payload, group_id):
"""
Compose and publish a message that updates the wholesale price.
:param topic: The topic to publish to.
:param payload: The message to publish.
:param group_id: The group ID for the message.
:return: The ID of the message.
"""
try:
att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}}
dedup_id = uuid.uuid4()
response = topic.publish(
Subject="Price Update",
Message=payload,
MessageAttributes=att_dict,
MessageGroupId=group_id,
MessageDeduplicationId=str(dedup_id),
)
message_id = response["MessageId"]
logger.info("Published message to topic %s.", topic.arn)
except ClientError as error:
logger.exception("Couldn't publish message to topic %s.", topic.arn)
raise error
return message_id
@staticmethod
def delete_queue(queue):
"""
Removes an SQS queue. When run against an AWS account, it can take up to
60 seconds before the queue is actually deleted.
:param queue: The queue to delete.
:return: None
"""
try:
queue.delete()
logger.info("Deleted queue with URL=%s.", queue.url)
except ClientError as error:
logger.exception("Couldn't delete queue with URL=%s!", queue.url)
raise error
- SAP ABAP
-
- SDK对于 SAP ABAP
-
创建FIFO主题,在 Amazon SQS FIFO 队列中订阅该主题,然后向亚马逊SNS主题发布消息。
" Creates a FIFO topic. "
DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap.
DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow.
ls_tpc_attributes-key = 'FifoTopic'.
ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ).
INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes.
TRY.
DATA(lo_create_result) = lo_sns->createtopic(
iv_name = iv_topic_name
it_attributes = lt_tpc_attributes
).
DATA(lv_topic_arn) = lo_create_result->get_topicarn( ).
ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. "
MESSAGE 'FIFO topic created' TYPE 'I'.
CATCH /aws1/cx_snstopiclimitexcdex.
MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'.
ENDTRY.
" Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. "
" Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. "
TRY.
DATA(lo_subscribe_result) = lo_sns->subscribe(
iv_topicarn = lv_topic_arn
iv_protocol = 'sqs'
iv_endpoint = iv_queue_arn
).
DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ).
ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. "
MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
CATCH /aws1/cx_snssubscriptionlmte00.
MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'.
ENDTRY.
" Publish message to SNS topic. "
TRY.
DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap.
DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow.
ls_msg_attributes-key = 'Importance'.
ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ).
INSERT ls_msg_attributes INTO TABLE lt_msg_attributes.
DATA(lo_result) = lo_sns->publish(
iv_topicarn = lv_topic_arn
iv_message = 'The price of your mobile plan has been increased from $19 to $23'
iv_subject = 'Changes to mobile plan'
iv_messagegroupid = 'Update-2'
iv_messagededuplicationid = 'Update-2.1'
it_messageattributes = lt_msg_attributes
).
ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. "
MESSAGE 'Message was published to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
ENDTRY.
接收来自FIFO订阅的消息
现在,您可以在三个订阅的应用程序中接收价格更新。如中所示FIFO主题示例用例,每个使用者应用程序的入口点都是 Amazon SQS 队列,其相应 Amazon Lambda 函数可以自动轮询该队列。当 Amazon SQS 队列是 Lambda 函数的事件源时,Lambda 会根据需要扩展其轮询器队列以高效使用消息。
有关更多信息,请参阅《Amazon Lambda 开发者指南》SQS中的Amazon Lambda 与 Amazon 一起使用。有关编写自己的队列轮询器的信息,请参阅《亚马逊简单FIFO队列服务开发者指南》和《亚马逊简单队列服务API参考》ReceiveMessage中的 Amazon SQS 标准和队列建议。
使用 Amazon CloudFormation
Amazon CloudFormation 允许您使用模板文件将 Amazon 资源集合一起创建和配置为一个单元。本部分提供的模板示例,用于创建以下内容:
-
发布价格SNSFIFO更新的 Amazon 主题
-
为批发和零售应用程序提供这些更新的Amazon SQS FIFO 队列
-
用于存储记录的分析应用程序的 Amazon SQS 标准队列,可以查询这些记录以获取商业智能 (BI)
-
将三个队列与主题关联的 Amazon SNS FIFO 订阅
-
指定订阅者应用程序的筛选策略只接收他们需要的价格更新
如果通过向主题发布消息来测试此代码示例,请确保您发布的是带 business
属性的消息。为属性值指定 retail
或 wholesale
。否则,消息将被筛选掉,且不会传递到订阅的队列中。
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"PriceUpdatesTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"TopicName": "PriceUpdatesTopic.fifo",
"FifoTopic": true,
"ContentBasedDeduplication": false,
"ArchivePolicy": {
"MessageRetentionPeriod": "30"
}
}
},
"WholesaleQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "WholesaleQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"RetailQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "RetailQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"AnalyticsQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "AnalyticsQueue"
}
},
"WholesaleSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"WholesaleQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"wholesale"
]
}
}
},
"RetailSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"RetailQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"retail"
]
}
}
},
"AnalyticsSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"AnalyticsQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false"
}
},
"SalesQueuesPolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": [
"sqs:SendMessage"
],
"Resource": "*",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "PriceUpdatesTopic"
}
}
}
}
]
},
"Queues": [
{
"Ref": "WholesaleQueue"
},
{
"Ref": "RetailQueue"
},
{
"Ref": "AnalyticsQueue"
}
]
}
}
}
}
有关使用 Amazon CloudFormation 模板部署 Amazon 资源的更多信息,请参阅《Amazon CloudFormation 用户指南》中的 “入门”。