Amazon Kinesis Data Streams
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用 Kinesis Data Streams API 开发使用增强型扇出功能的使用者

增强型扇出功能 是一种 Amazon Kinesis Data Streams 功能,使用者利用此功能能够接收数据流(其中每分片每秒的专用吞吐量高达 2 MiB 数据)中的记录。使用增强型扇出功能的使用者不必与接收流中数据的其他使用者争夺。有关更多信息,请参阅 利用使用增强型扇出功能的使用者

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的使用者。

使用 Kinesis Data Streams API 注册使用增强型扇出功能的使用者

  1. 调用 RegisterStreamConsumer 以将应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为使用者生成一个 Amazon 资源名称 (ARN) 并在响应中返回此名称。

  2. 要开始侦听特定分片,请将调用中的使用者 ARN 传递给 SubscribeToShard。之后,Kinesis Data Streams 会通过 HTTP/2 连接将分片中的记录以 SubscribeToShardEvent 类型的事件形式推送给您。此连接将保持打开状态长达 5 分钟。如果要在通过调用 SubscribeToShard 返回的 future 正常或异常完成之后继续接收分片中的记录,请再次调用 SubscribeToShard

  3. 要取消注册使用增强型扇出功能的使用者,请调用 DeregisterStreamConsumer

以下代码是一个示例,演示如何为使用者订阅分片、定期续订订阅以及处理事件。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }