列表分片 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

列表分片

一个数据流可以有一个或多个分片。有两种方法可以从数据流中列出(或检索)分片。

ListShardsAPI-推荐使用

从数据流中列出或检索分片的推荐方法是使用ListShardsAPI。以下示例说明如何获取数据流中的分片列表。有关此示例中所有主要操作以及为操作设置的所有参数的完整说明,请参阅ListShards.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

要运行上一个代码示例,您可以使用类似于下文的 POM 文件。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

使用ListShardsAPI,您可以使用ShardFilter参数来过滤掉 API 的响应。一次只能指定一个筛选器。

如果您将ShardFilter调用时的参数ListShardsAPI,Type是必需的属性,必须指定。如果你指定AT_TRIM_HORIZONFROM_TRIM_HORIZON,或者AT_LATEST类型,您无需指定ShardId或者Timestamp可选属性。

如果你指定AFTER_SHARD_ID类型,您还必须为可选的提供值ShardId财产。这些区域有:ShardId属性在功能上与ExclusiveStartShardId的参数ListShardsAPI。何时ShardId属性被指定,响应包括以分片开头的分片,其 ID 紧跟ShardId你提供的。

如果你指定AT_TIMESTAMP要么FROM_TIMESTAMP_ID类型,您还必须为可选的提供值Timestamp财产。如果你指定AT_TIMESTAMPtype,然后返回在提供的时间戳处打开的所有分片。如果你指定FROM_TIMESTAMP类型,然后返回从提供的时间戳开始到 TIP 的所有分片。

重要

DescribeStreamSummaryListShardAPI 提供了一种更具可扩展性的方式来检索有关数据流的信息。更具体地说,的配额DescribeStreamAPI 可能会导致限制。有关更多信息,请参阅 配额和限制。另请注意,DescribeStream配额在与您的所有数据流交互的所有应用程序之间共享。Amazonaccount. 的配额ListShards另一方面,API 特定于单个数据流。所以你不仅可以通过使用更高的 TPSListShardsAPI,但是随着创建更多数据流,操作可以更好地扩展。

我们建议您迁移所有称为DescribeStreamAPI 改为调用DescribeStream摘要和ListShardAPI。为了识别这些生产商和消费者,我们建议使用 Athena 来解析CloudTrail作为 KPL 和 KCL 的用户代理的日志将在 API 调用中捕获。

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

我们还建议AmazonLambda 和 Kinesis 数 Kinesis Data Streams Firehose 与调用DescribeStreamAPI 已重新配置,以便集成代替调用DescribeStreamSummaryListShards. 具体来说,AmazonLambda,您必须更新事件源映射。对于 Kinesis Data Firehose,必须更新相应的 IAM 权限,以便它们包含ListShardsIAM 权限。

DescribeStreamAPI-已弃用

重要

下面的信息描述了当前已弃用的通过DescribeStreamAPI。目前强烈建议您使用ListShards用于检索构成数据流的分片的 API。

describeStream 方法返回的响应对象使您能够检索有关组成流的分片的信息。要检索分片,请对此对象调用 getShards 方法。此方法可能不会通过一次调用返回流中的所有分片。在以下代码中,我们查看 getHasMoreShards 上的 getStreamDescription 方法以了解是否有未返回的其他分片。如果有,也就是说,如果此方法返回 true,则我们将继续循环调用 getShards,以将返回的新的分片批次添加到我们的分片列表中。循环将在 getHasMoreShards 返回 false 时退出;即,所有分片均已返回。请注意,getShards 不会返回处于 EXPIRED 状态的分片。有关分片状态(包括 EXPIRED 状态)的更多信息,请参阅 分片之后的数据路由、数据保留和分片状态

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );