使用 Kinesis Data Streams API 开发增强型扇出功能的使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Data Streams API 开发增强型扇出功能的使用者

增强了扇出是一种 Amazon Kinesis Data Streams 功能,使用者利用此功能能够接收数据流(其中每分片每秒的专用吞吐量高达 2 MB 数据)中的记录。使用增强型扇出功能的使用者不必与接收流中数据的其他使用者争夺。有关更多信息,请参阅 开发具有专用吞吐量的自定义使用者(增强扇出功能)

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的使用者。

使用 Kinesis Data Streams API 注册使用增强型扇出功能的使用者

  1. CallRegisterStream使用者将应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为使用者生成 Amazon 资源名称 (ARN) 并在响应中返回此名称。

  2. 要开始侦听特定分片,请将调用中的使用者 ARN 传递给SubscribeTo碎片. 之后,Kinesis Data Streams 会以类型事件的形式将分片中的记录推送给您。SubscribeToShardEvent通过 HTTP/2 连接。此连接将保持打开状态长达 5 分钟。CallSubscribeTo碎片再次如果要在之后继续接收分片中的记录future通过调用返回的信息返回了SubscribeTo碎片正常或特殊情况下完成。

    注意

    SubscribeToShard当前分片结束时,API 还会返回当前分片的子分片列表。

  3. 要取消注册使用增强型扇出功能的使用者,请调用DeregisterStream使用者.

以下代码是一个示例,演示如何为使用者订阅分片、定期续订订阅以及处理事件。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

如果event.ContinuationSequenceNumber回报null,它表示发生了涉及该分片的分片拆分或合并。这个碎片现在在CLOSED状态,并且您已经从该分片中读取了所有可用的数据记录。在这种情况下,根据上面的例子,你可以使用event.childShards了解通过拆分或合并创建的分片的新子分片。有关更多信息,请参阅 。ChildShard.