使用 Kinesis Data Streams API 开发具有增强型扇出功能的使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Data Streams API 开发具有增强型扇出功能的使用者

增强了扇出是一种 Amazon Kinesis Data Streams 功能,使用者利用此功能能够接收数据流(其中每分片每秒的专用吞吐量高达 2 MB 数据)中的记录。使用增强型扇出功能的使用者不必与接收流中数据的其他使用者争夺。有关更多信息,请参阅 开发具有专用吞吐量的自定义使用者(增强扇出功能)

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的使用者。

使用 Kinesis Data Streams API 注册使用增强型扇出功能的使用者
  1. CallRegisterStreamConsumer将您的应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为使用者生成一个 Amazon 资源名称 (ARN) 并在响应中返回此名称。

  2. 要开始侦听特定分片,请将调用中的使用者 ARN 传递给SubscribeToShard. 之后,Kinesis Data Streams 会以类型事件的形式将记录从分片中的记录推送到您。SubscribeToShardEvent通过 HTTP/2 连接。此连接将保持打开状态长达 5 分钟。如果要在通过调用 SubscribeToShard 返回的 future 正常或异常完成之后继续接收分片中的记录,请再次调用 SubscribeToShard

    注意

    SubscribeToShard当前分片结束时,API 还会返回当前分片的子分片列表。

  3. 要取消注册使用增强型扇出功能的使用者,请调用 DeregisterStreamConsumer

以下代码是一个示例,演示如何为使用者订阅分片、定期续订订阅以及处理事件。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

如果event.ContinuationSequenceNumber回报null,它表示发生了涉及该分片的分片拆分或合并。这个碎片现在在CLOSED状态,并且您已经从该分片中读取了所有可用的数据记录。在这种情况下,根据上面的例子,你可以使用event.childShards了解通过拆分或合并创建的正在处理的分片的新子分片。有关更多信息,请参阅 。ChildSharD.