Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

FIFO (先进先出)队列

在 美国东部(弗吉尼亚北部)、美国东部(俄亥俄州)、美国西部(俄勒冈) 和 欧洲(爱尔兰) 区域都提供 FIFO 队列。除了具有标准队列的所有功能之外,FIFO (先进先出) 队列还能在操作和事件的顺序很重要或不能容忍重复项的情况下增强应用程序之间的消息收发。FIFO 列队还提供了确切一次处理,但其上限是每个 API 操作 300 个事务/秒 (TPS)。

注意

  • 不使用批处理,FIFO 队列每秒最高支持 300 条消息 (每秒 300 个发送、接收或者删除操作)。

  • 如果您利用最大批处理能力 (每个操作 10 条消息),则 FIFO 队列每秒最高可以支持 3,000 条消息。

FIFO 队列能够在操作和事件的顺序很重要的情况下用来改善应用程序之间的消息传送,例如:

  • 确保按正确的顺序执行用户输入的命令。

  • 通过按正确的顺序发送价格修改来显示正确的产品价格。

  • 防止学员在注册账户之前参加课程。

注意

FIFO 队列的名称必须以 .fifo 后缀结尾。 该后缀也包含在 80 个字符的队列名称限制之内。要确定队列是否为 FIFO,您可以检查队列名称是否以该后缀结尾。

有关使用 FIFO 队列的最佳实践,请参阅适用于 FIFO (先进先出) 队列的建议 一般建议

有关客户端和服务与 FIFO 队列的兼容性的信息,请参阅兼容性

消息排序

FIFO 队列在标准队列的基础上改进和补充而来。此队列类型最重要的特性是 FIFO (先进先出)传送确切一次处理:发送和接收消息的顺序严格保持一致;一条消息传送一次后就保持可用,直到使用者处理并删除它为止;系统不会将重复消息引入队列。此外,FIFO 队列还支持消息组,该组允许一个队列中存在多个有序的消息组。

FIFO 队列逻辑

关键术语

以下关键术语可帮助您更好地了解 FIFO 队列的功能。有关详细描述,请参阅 Amazon Simple Queue Service API Reference

消息重复数据删除 ID

The 用于已发送消息的重复数据删除的令牌。如果成功发送了一条具有特定消息重复数据删除 ID 的消息,则在发送时使用了同一消息重复数据删除 ID 的任何消息都会被成功接受,但在 5 分钟重复数据删除时间间隔内不会发出。

注意

消息重复数据删除应用于整个队列,而不应用于单个消息组。

消息组 ID

指定某条消息属于特定消息组的标签。属于同一消息组的消息总是严格按照与消息组有关的某个顺序逐一进行处理(但是属于不同消息组的消息则不会进行有序处理)。

接收请求尝试 ID

用于 ReceiveMessage 调用的重复数据删除的令牌。

序列号

Amazon SQS 分配给每条消息的大型非连续编号。

发送消息

如果将多条消息相继发送到 FIFO 队列,并且每条消息具有不同的消息重复数据删除 ID,Amazon SQS 将存储这些消息并确认传输。然后,可按传输每条消息的确切顺序接收和处理消息。

在 FIFO 队列中,消息基于消息组 ID 进行排序。如果多台主机 (或同一主机上的不同线程) 将具有相同消息组 ID 的消息发送到 FIFO 队列,Amazon SQS 将按消息到达以供处理的顺序存储消息。要确保 Amazon SQS 保留发送和接收消息的顺序,请确保每位创建者均使用唯一的消息组 ID 来发送其所有消息。

FIFO 队列逻辑仅应用于每个消息组 ID。每个消息组 ID 表示 Amazon SQS 队列中的不同的有序的消息组。对于每个消息组 ID,所有消息均按严格的顺序发送和接收。但是,具有不同的消息组 ID 值的消息可能不会按顺序发送和接收。您必须将消息组 ID 与消息关联。如果您未提供消息组 ID,此操作将失败。如果您需要一组有序的消息,请为要将消息发送到的 FIFO 队列提供相同的消息组 ID。

接收消息

您无法请求接收具有特定消息组 ID 的消息。

当接收来自具有多个消息组 ID 的 FIFO 队列的消息时,Amazon SQS 首先会尝试尽可能多地返回具有同一消息组 ID 的消息。这时其他使用者能处理具有不同消息组 ID 的消息。

多次重试

FIFO 队列允许创建者或使用者进行多次重试:

  • 如果创建者检测到失败的 SendMessage 操作,它会使用同一个消息重复数据删除 ID 尽可能多地重新尝试发送。假设创建者在重复数据删除间隔过期前收到至少一次确认,则多次重试不会影响消息排序,也不会引入重复项。

  • 如果使用者检测到失败的 ReceiveMessage 操作,它可使用同一个接收请求尝试 ID 进行所需数量的重试。假设使用者在可见性超时过期前收到至少一次确认,则多次重试不会影响消息排序。

  • 在收到一个具有消息组 ID 的消息时,除非您删除该消息或该消息变为可见,否则不会返回同一消息组 ID 的更多消息。

确切一次处理

不同于标准队列,FIFO 队列不会引入重复消息。FIFO 队列可帮助您避免向一个队列发送重复消息。如果您在 5 分钟重复数据删除时间间隔内重试 SendMessage 操作,则 Amazon SQS 不会将任何重复消息引入队列。

要配置重复数据删除,您必须执行以下操作之一:

  • 启用基于内容的重复数据删除。这将指示 Amazon SQS 使用 SHA-256 哈希通过消息的正文 (而不是消息的属性) 生成消息重复数据删除 ID。有关更多信息,请参阅 Amazon Simple Queue Service API Reference 中有关 CreateQueueGetQueueAttributesSetQueueAttributes 操作的文档。

  • 为消息显式提供消息重复数据删除 ID (或查看序列号)。有关更多信息,请参阅 Amazon Simple Queue Service API Reference 中有关 SendMessageSendMessageBatchReceiveMessage 操作的文档。

FIFO 队列入门

以下示例 Java 代码将创建队列并发送、接收和删除消息。

Copy
/* * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package sqs.fifo.samples; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.DeleteQueueRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; public class SQSFIFOJavaClientSample { public static void main(String[] args) throws Exception { /* * The ProfileCredentialsProvider returns your [default] * credential profile by reading from the credentials file located at * (~/.aws/credentials). */ AWSCredentials credentials = null; try { credentials = new ProfileCredentialsProvider().getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Can't load the credentials from the credential profiles file. " + "Please make sure that your credentials file is at the correct " + "location (~/.aws/credentials), and is a in valid format.", e); } AmazonSQSClient sqs = new AmazonSQSClient(credentials); sqs.setEndpoint("https://sqs.us-east-2.amazonaws.com"); System.out.println("======================================================="); System.out.println("Getting Started with Amazon SQS FIFO Queues"); System.out.println("=======================================================\n"); try { // Create a FIFO queue System.out.println("Creating a new Amazon SQS FIFO queue called MyFifoQueue.fifo.\n"); Map<String, String> attributes = new HashMap<String, String>(); // A FIFO queue must have the FifoQueue attribute set to True attributes.put("FifoQueue", "true"); // Generate a MessageDeduplicationId based on the content, if the user doesn't provide a MessageDeduplicationId attributes.put("ContentBasedDeduplication", "true"); // The FIFO queue name must end with the .fifo suffix CreateQueueRequest createQueueRequest = new CreateQueueRequest("MyFifoQueue.fifo").withAttributes(attributes); String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl(); // List queues System.out.println("Listing all queues in your account.\n"); for (String queueUrl : sqs.listQueues().getQueueUrls()) { System.out.println(" QueueUrl: " + queueUrl); } System.out.println(); // Send a message System.out.println("Sending a message to MyFifoQueue.fifo.\n"); SendMessageRequest sendMessageRequest = new SendMessageRequest(myQueueUrl, "This is my message text."); // You must provide a non-empty MessageGroupId when sending messages to a FIFO queue sendMessageRequest.setMessageGroupId("messageGroup1"); // Uncomment the following to provide the MessageDeduplicationId //sendMessageRequest.setMessageDeduplicationId("1"); SendMessageResult sendMessageResult = sqs.sendMessage(sendMessageRequest); String sequenceNumber = sendMessageResult.getSequenceNumber(); String messageId = sendMessageResult.getMessageId(); System.out.println("SendMessage succeed with messageId " + messageId + ", sequence number " + sequenceNumber + "\n"); // Receive messages System.out.println("Receiving messages from MyFifoQueue.fifo.\n"); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); // Uncomment the following to provide the ReceiveRequestDeduplicationId //receiveMessageRequest.setReceiveRequestAttemptId("1"); List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry<String, String> entry : message.getAttributes().entrySet()) { System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } System.out.println(); // Delete the message System.out.println("Deleting the message.\n"); String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqs.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); // Delete the queue System.out.println("Deleting the queue.\n"); sqs.deleteQueue(new DeleteQueueRequest(myQueueUrl)); } catch (AmazonServiceException ase) { System.out.println("Caught an AmazonServiceException, which means your request made it " + "to Amazon SQS, but was rejected with an error response for some reason."); System.out.println("Error Message: " + ase.getMessage()); System.out.println("HTTP Status Code: " + ase.getStatusCode()); System.out.println("AWS Error Code: " + ase.getErrorCode()); System.out.println("Error Type: " + ase.getErrorType()); System.out.println("Request ID: " + ase.getRequestId()); } catch (AmazonClientException ace) { System.out.println("Caught an AmazonClientException, which means the client encountered " + "a serious internal problem while trying to communicate with SQS, such as not " + "being able to access the network."); System.out.println("Error Message: " + ace.getMessage()); } } }

从标准队列移至 FIFO 队列

如果您有使用标准队列的现有应用程序并想要利用 FIFO 队列的排序或确切一次处理功能,则需要正确地配置队列和应用程序。

注意

您无法将现有标准队列转换为 FIFO 队列。要进行转移,您必须为应用程序创建新的 FIFO 队列或者删除现有标准队列并将其重新创建为 FIFO 队列。

移动核对清单

使用以下核对清单确保您的应用程序能够与 FIFO 队列一起正确地工作。

  • FIFO 队列的上限为 300 个事务/秒 (TPS)。如果您的应用程序产生了较高的消息吞吐量,请考虑改用标准队列。

  • FIFO 队列不支持每消息延迟,仅支持每队列延迟。如果您的应用程序在每条消息上设置相同的 DelaySeconds 参数值,您必须将应用程序修改为删除每消息延迟并改为在整个队列上设置 DelaySeconds

  • 发送到 FIFO 队列的每条消息均需要一个消息组 ID。如果您不需要多个有序的消息组,请为您的所有消息指定相同的消息组 ID。

  • 在将消息发送到 FIFO 队列之前,请确认以下内容:

    • 如果您的应用程序可发送具有相同的消息正文的消息,您可以将应用程序修改为针对每条已发送消息提供唯一的消息重复数据删除 ID。

    • 如果您的应用程序发送具有独特的消息正文的消息,您可以启用基于内容的重复数据删除。

  • 您不必对使用者进行任何代码更改。但是,如果处理消息需要较长时间且您的可见性超时已设置为一个较高值,请考虑向每个 ReceiveMessage 操作添加接收请求尝试 ID。这允许您在网络发生故障时重试接收尝试,并防止队列由于接收尝试失败而导致的暂停。

有关更多信息,请参阅 Amazon Simple Queue Service API Reference

兼容性

客户

Amazon SQS 缓冲的异步客户端 当前不支持 FIFO 队列。

服务

如果应用程序使用多个 AWS 服务或者混合使用 AWS 服务与外部服务,请务必了解哪些服务功能不支持 FIFO 队列。

一些向 Amazon SQS 发送通知的 AWS 或外部服务可能无法与 FIFO 队列兼容,即使允许您将 FIFO 队列设置为目标。

AWS 服务的以下功能当前与 FIFO 队列兼容:

有关其他服务与 FIFO 队列的兼容性的信息,请参阅您的服务文档。