的版本 4 (V4) 适用于 .NET 的 Amazon SDK 已经发布!
要开始使用新版本的 SDK,请参阅 适用于 .NET 的 Amazon SDK (V4)开发人员指南,尤其是关于迁移到版本 4 的主题。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
借助 Amazon Message Processing Framework for .NET 来使用消息
Amazon Message Processing Framework for .NET 使您能够使用通过该框架或其中一个消息收发服务发布的消息。这些消息可以通过多种方式使用,其中一些方式如下所述。
消息处理程序
要使用消息,请使用 IMessageHandler 接口为要处理的每种消息类型实施消息处理程序。消息类型和消息处理程序之间的映射是在项目启动时配置的。
await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd"); // Register all IMessageHandler implementations with the message type they should process. // Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();
以下代码显示了 ChatMessage 消息的示例消息处理程序。
public class ChatMessageHandler : IMessageHandler<ChatMessage> { public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default) { // Add business and validation logic here. if (messageEnvelope == null) { return Task.FromResult(MessageProcessStatus.Failed()); } if (messageEnvelope.Message == null) { return Task.FromResult(MessageProcessStatus.Failed()); } ChatMessage message = messageEnvelope.Message; Console.WriteLine($"Message Description: {message.MessageDescription}"); // Return success so the framework will delete the message from the queue. return Task.FromResult(MessageProcessStatus.Success()); } }
外部 MessageEnvelope 包含框架使用的元数据。它的 message 属性是消息类型(在本例中为 ChatMessage)。
您可以返回 MessageProcessStatus.Success() 以表明消息已成功处理,框架将从 Amazon SQS 队列中删除该消息。返回 MessageProcessStatus.Failed() 时,消息将保留在队列中,以便再次得到处理或移至死信队列(如果已配置)。
在长时间运行的进程中处理消息
您可以使用 SQS 队列 URL 调用 AddSQSPoller 以启动一个长时间运行的 BackgroundService
await Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // Register the AWS Message Processing Framework for .NET services.AddAWSMessageBus(builder => { // Register an SQS Queue that the framework will poll for messages. // NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { // The maximum number of messages from this queue that the framework will process concurrently on this client. options.MaxNumberOfConcurrentMessages = 10; // The duration each call to SQS will wait for new messages. options.WaitTimeSeconds = 20; }); // Register all IMessageHandler implementations with the message type they should process. builder.AddMessageHandler<ChatMessageHandler, ChatMessage>(); }); }) .Build() .RunAsync();
配置 SQS 消息轮询器
SQS 消息轮询器可在调用 AddSQSPoller 时通过 SQSMessagePollerOptions 进行配置。
-
MaxNumberOfConcurrentMessages- 队列中可同时处理的最大消息数量。默认值是 10。 -
WaitTimeSeconds-ReceiveMessageSQS 调用在返回前等待队列中消息到达的时间(以秒为单位)。如果消息可用,则调用会早于WaitTimeSeconds返回。默认值为 20。
消息可见性超时处理
SQS 消息有可见性超时时段。当一个使用者开始处理给定消息时,该消息仍会保留在队列中,但对其他使用者不可见,以避免重复处理。如果消息在再次可见之前未被处理和删除,则其他使用者可能会尝试处理同一条消息。
该框架将跟踪并尝试延长其当前正在处理的消息的可见性超时时间。您可以在调用 AddSQSPoller 时在 SQSMessagePollerOptions 上配置此行为。
-
VisibilityTimeout- 接收到的消息对后续检索请求不可见的时长(以秒为单位)。默认值为 30。 -
VisibilityTimeoutExtensionThreshold- 当消息的可见性超时时间距离到期剩余此秒数时,框架将延长可见性超时时间(再增加VisibilityTimeout秒)。默认值是 5。 -
VisibilityTimeoutExtensionHeartbeatInterval- 框架检查距离过期时间不足VisibilityTimeoutExtensionThreshold秒的消息并延长其可见性超时时间的频率(以秒为单位)。默认值是 1。
在以下示例中,框架将每 1 秒检查一次仍在处理中的消息。对于在 5 秒内会再次可见的消息,框架会自动将每条消息的可见性超时时间再延长 30 秒。
// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue. builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options => { options.VisibilityTimeout = 30; options.VisibilityTimeoutExtensionThreshold = 5; VisibilityTimeoutExtensionHeartbeatInterval = 1; });
在 Amazon Lambda 函数中处理消息
您可以将 Amazon Message Processing Framework for .NET 和 SQS 与 Lambda 的集成结合使用。这是由 AWS.Messaging.Lambda 软件包提供的。请参阅其自述文件