Consume messages with the Amazon Message Processing Framework for .NET - Amazon SDK for .NET
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Consume messages with the Amazon Message Processing Framework for .NET

This is prerelease documentation for a feature in preview release. It is subject to change.

The Amazon Message Processing Framework for .NET allows you to consume messages that have been published by using the framework or one of the messaging services. The messages can be consumed in a variety of ways, some of which are described below.

Message Handlers

To consume messages, implement a message handler using the IMessageHandler interface for each message type you wish to process. The mapping between message types and message handlers is configured in the project startup.

await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd"); // Register all IMessageHandler implementations with the message type they should process. // Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();

The following code shows a sample message handler for a ChatMessage message.

public class ChatMessageHandler : IMessageHandler<ChatMessage> { public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default) { // Add business and validation logic here. if (messageEnvelope == null) { return Task.FromResult(MessageProcessStatus.Failed()); } if (messageEnvelope.Message == null) { return Task.FromResult(MessageProcessStatus.Failed()); } ChatMessage message = messageEnvelope.Message; Console.WriteLine($"Message Description: {message.MessageDescription}"); // Return success so the framework will delete the message from the queue. return Task.FromResult(MessageProcessStatus.Success()); } }

The outer MessageEnvelope contains metadata used by the framework. Its message property is the message type (in this case ChatMessage).

You can return MessageProcessStatus.Success() to indicate that the message was processed successfully and the framework will delete the message from the Amazon SQS queue. When returning MessageProcessStatus.Failed(), the message will remain in the queue where it can be processed again or moved to a dead-letter queue, if configured.

Handling Messages in a Long-Running Process

You can call AddSQSPoller with an SQS queue URL to start a long-running BackgroundService that will continuously poll the queue and process messages.

await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { // The maximum number of messages from this queue that the framework will process concurrently on this client. options.MaxNumberOfConcurrentMessages = 10; // The duration each call to SQS will wait for new messages. options.WaitTimeSeconds = 20; }); // Register all IMessageHandler implementations with the message type they should process. builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();

Configuring the SQS Message Poller

The SQS message poller can be configured by the SQSMessagePollerOptions when calling AddSQSPoller.

  • MaxNumberOfConcurrentMessages - The maximum number of messages from the queue to process concurrently. The default value is 10.

  • WaitTimeSeconds - The duration (in seconds) for which the ReceiveMessage SQS call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. The default value is 20.

Message Visibility Timeout Handling

SQS messages have a visibility timeout period. When one consumer begins handling a given message, it remains in the queue but is hidden from other consumers to avoid processing it more than once. If the message is not handled and deleted before becoming visible again, another consumer might attempt to handle the same message.

The framework will track and attempt to extend the visibility timeout for messages that it is currently handling. You can configure this behavior on the SQSMessagePollerOptions when calling AddSQSPoller.

  • VisibilityTimeout - The duration in seconds that received messages are hidden from subsequent retrieve requests. The default value is 30.

  • VisibilityTimeoutExtensionThreshold - When a message's visibility timeout is within this many seconds of expiring, the framework will extend the visibility timeout (by another VisibilityTimeout seconds). The default value is 5.

  • VisibilityTimeoutExtensionHeartbeatInterval - How often in seconds that the framework will check for messages that are within VisibilityTimeoutExtensionThreshold seconds of expiring, and then extend their visibility timeout. The default value is 1.

In the following example, the framework will check every 1 second for messages that are still being handled. For those messages within 5 seconds of becoming visible again, the framework will automatically extend the visibility timeout of each message by another 30 seconds.

// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { options.VisibilityTimeout = 30; options.VisibilityTimeoutExtensionThreshold = 5; VisibilityTimeoutExtensionHeartbeatInterval = 1; });

Handling messages in Amazon Lambda functions

You can use the Amazon Message Processing Framework for .NET with SQS's integration with Lambda. This is provided by the AWS.Messaging.Lambda package. Refer to its README to get started.