Create and publish to a FIFO Amazon SNS topic using an Amazon SDK
The following code examples show how to create and publish to a FIFO Amazon SNS topic.
- Java
-
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository
. This example
-
creates an Amazon SNS FIFO topic, two Amazon SQS FIFO queues, and one Standard queue.
-
subscribes the queues to the topic and publishes a message to the topic.
The test
verifies the receipt of the message to each queue. The complete example also shows the addition of access policies and deletes the resources at the end. public class PriceUpdateExample { public final static SnsClient snsClient = SnsClient.create(); public final static SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" + "Where:\n" + " fifoTopicName - The name of the FIFO topic that you want to create. \n\n" + " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n" + " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" + " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n"; if (args.length != 4) { System.out.println(usage); System.exit(1); } final String fifoTopicName = args[0]; final String wholeSaleQueueName = args[1]; final String retailQueueName = args[2]; final String analyticsQueueName = args[3]; // For convenience, the QueueData class holds metadata about a queue: ARN, URL, // name and type. List<QueueData> queues = List.of( new QueueData(wholeSaleQueueName, QueueType.FIFO), new QueueData(retailQueueName, QueueType.FIFO), new QueueData(analyticsQueueName, QueueType.Standard)); // Create queues. createQueues(queues); // Create a topic. String topicARN = createFIFOTopic(fifoTopicName); // Subscribe each queue to the topic. subscribeQueues(queues, topicARN); // Allow the newly created topic to send messages to the queues. addAccessPolicyToQueuesFINAL(queues, topicARN); // Publish a sample price update message with payload. publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables"); // Clean up resources. deleteSubscriptions(queues); deleteQueues(queues); deleteTopic(topicARN); } public static String createFIFOTopic(String topicName) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = Map.of( "FifoTopic", "true", "ContentBasedDeduplication", "false"); CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); String topicArn = response.topicArn(); System.out.println("The topic ARN is" + topicArn); return topicArn; } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void subscribeQueues(List<QueueData> queues, String topicARN) { queues.forEach(queue -> { SubscribeRequest subscribeRequest = SubscribeRequest.builder() .topicArn(topicARN) .endpoint(queue.queueARN) .protocol("sqs") .build(); // Subscribe to the endpoint by using the SNS service client. // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO // topic. SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest); System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]"); queue.subscriptionARN = subscribeResponse.subscriptionArn(); }); } public static void publishPriceUpdate(String topicArn, String payload, String groupId) { try { // Create and publish a message that updates the wholesale price. String subject = "Price Update"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; MessageAttributeValue msgAttValue = MessageAttributeValue.builder() .dataType("String") .stringValue(attributeValue) .build(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put(attributeName, msgAttValue); PublishRequest pubRequest = PublishRequest.builder() .topicArn(topicArn) .subject(subject) .message(payload) .messageGroupId(groupId) .messageDeduplicationId(dedupId) .messageAttributes(attributes) .build(); final PublishResponse response = snsClient.publish(pubRequest); System.out.println(response.messageId()); System.out.println(response.sequenceNumber()); System.out.println("Message was published to " + topicArn); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } }
-
For API details, see the following topics in Amazon SDK for Java 2.x API Reference.
-
- Python
-
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository
. Create an Amazon SNS FIFO topic, subscribe Amazon SQS FIFO and standard queues to the topic, and publish a message to the topic.
def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
-
For API details, see the following topics in Amazon SDK for Python (Boto3) API Reference.
-
- SAP ABAP
-
- SDK for SAP ABAP
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository
. Create a FIFO topic, subscribe an Amazon SQS FIFO queue to the topic, and publish a message to an Amazon SNS topic.
" Creates a FIFO topic. " DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap. DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow. ls_tpc_attributes-key = 'FifoTopic'. ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ). INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes. TRY. DATA(lo_create_result) = lo_sns->createtopic( iv_name = iv_topic_name it_attributes = lt_tpc_attributes ). DATA(lv_topic_arn) = lo_create_result->get_topicarn( ). ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. " MESSAGE 'FIFO topic created' TYPE 'I'. CATCH /aws1/cx_snstopiclimitexcdex. MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'. ENDTRY. " Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. " " Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. " TRY. DATA(lo_subscribe_result) = lo_sns->subscribe( iv_topicarn = lv_topic_arn iv_protocol = 'sqs' iv_endpoint = iv_queue_arn ). DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ). ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. " MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. CATCH /aws1/cx_snssubscriptionlmte00. MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'. ENDTRY. " Publish message to SNS topic. " TRY. DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap. DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow. ls_msg_attributes-key = 'Importance'. ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ). INSERT ls_msg_attributes INTO TABLE lt_msg_attributes. DATA(lo_result) = lo_sns->publish( iv_topicarn = lv_topic_arn iv_message = 'The price of your mobile plan has been increased from $19 to $23' iv_subject = 'Changes to mobile plan' iv_messagegroupid = 'Update-2' iv_messagededuplicationid = 'Update-2.1' it_messageattributes = lt_msg_attributes ). ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. " MESSAGE 'Message was published to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. ENDTRY.
-
For API details, see the following topics in Amazon SDK for SAP ABAP API reference.
-
For a complete list of Amazon SDK developer guides and code examples, see Using Amazon SNS with an Amazon SDK. This topic also includes information about getting started and details about previous SDK versions.