使用 Kinesis Data Streams API 开发具有增强扇出功能的使用器 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Data Streams API 开发具有增强扇出功能的使用器

增强型扇出是一种 Amazon Kinesis Data Streams 功能,支持使用器从数据流中接收记录,其中每分片每秒专用吞吐量高达 2MB 数据。使用增强型扇出功能的使用者不必与接收流中数据的其他使用者争夺。有关更多信息,请参阅开发具有专用吞吐量的自定义使用者(增强扇出功能)

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的使用器。

使用 Kinesis Data Streams API 注册采用增强型扇出功能的使用器
  1. 调用 RegisterStreamConsumer 将应用程序注册为使用增强型扇出功能的使用器。Kinesis Data Streams 为使用器生成一个 Amazon 资源名称(ARN)并在响应中返回此名称。

  2. 要开始侦听特定分片,请将调用中的使用器 ARN 传递给 SubscribeToShard。之后,Kinesis Data Streams 会通过 HTTP/2 连接将分片中的记录以 SubscribeToShardEvent 类型的事件形式推送给您。此连接将保持打开状态长达 5 分钟。如果要在通过调用 SubscribeToShard 返回的 future 正常或异常完成之后继续接收分片中的记录,请再次调用 SubscribeToShard

    注意

    到达当前分片的末尾时,SubscribeToShard API 还会返回当前分片的子分片列表。

  3. 要取消注册使用增强型扇出功能的使用者,请调用 DeregisterStreamConsumer

以下代码是一个示例,演示如何为使用者订阅分片、定期续订订阅以及处理事件。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

如果 event.ContinuationSequenceNumber 返回 null,则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 CLOSED 状态,并且您已从其中读取了所有可用的数据记录。在这种情况下,按照上文示例所述,您可以使用 event.childShards 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息,请参阅 ChildShard