列出分片 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

列出分片

一个数据流可以有一个或多个分片。列出或检索数据流中的分片的建议方法是使用 ListShards API。以下示例说明如何获取数据流中的分片列表。有关此示例中所有主要操作以及为操作设置的所有参数的完整说明,请参阅 ListShards

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

要运行上一个代码示例,您可以使用类似于下文的 POM 文件。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

借助 ListShards API,您可以使用 ShardFilter 参数来筛选出 API 的响应。一次只能指定一个筛选条件。

如果您在调用 ListShards API 时使用 ShardFilter 参数,则 Type 为必填属性并且必须指定。如果指定 AT_TRIM_HORIZONFROM_TRIM_HORIZONAT_LATEST 类型,则无需指定 ShardIdTimestamp 可选属性。

如果指定 AFTER_SHARD_ID 类型,则还必须提供可选 ShardId 属性的值。ShardId 属性的功能与 ListShards API 的 ExclusiveStartShardId 参数相同。指定 ShardId 属性后,响应将包括多个分片,其中开头的分片 ID 紧随您提供的 ShardId

如果指定 AT_TIMESTAMPFROM_TIMESTAMP_ID 类型,则还必须提供可选 Timestamp 属性的值。如果指定 AT_TIMESTAMP 类型,则返回在提供的时间戳上打开的所有分片。如果指定 FROM_TIMESTAMP 类型,则返回从提供的时间戳开始到 TIP 的所有分片。

重要

DescribeStreamSummaryListShard API 提供了一种更具可扩展性的方式来检索有关您的数据流的信息。更具体地说,DescribeStream API 的限额可能会导致节流。有关更多信息,请参阅 限额和限制。另请注意,与您 Amazon 账户中所有数据流交互的所有应用程序均共享 DescribeStream 限额。另一方面,ListShards API 的限额特定于单个数据流。因此,使用 ListShards API 不仅可以获得更高的 TPS,而且随着您创建更多的数据流,操作可以更好地扩展。

我们建议您迁移所有调用 DescribeStream API 的创建器和消费端,改为调用 DescribeStreamSummary 和 ListShard API。为确定这些创建器和消费端,我们建议使用 Athena 来解析 CloudTrail 日志,因为在 API 调用中会捕获 KPL 和 KCL 的用户代理。

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

我们还建议重新配置调用 DescribeStream API 的 Amazon Lambda 和 Amazon Firehose 与 Kinesis Data Streams 的集成,以便集成改为调用 DescribeStreamSummaryListShards。具体而言,对于 Amazon Lambda,您必须更新事件源映射。对于 Amazon Firehose,必须更新相应的 IAM 权限,使其包含 ListShards IAM 权限。