Kinesis Data Streams 消费端问题排查 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Data Streams 消费端问题排查

LeaseManagementConfig 构造函数出现编译错误

升级到 Kinesis 客户端库 (KCL) 版本 3.x 或更高版本时,可能会遇到与构造函数相关的编译错误。LeaseManagementConfig如果您直接创建LeaseManagementConfig对象来设置配置,而不是在 3.x 或更高KCL版本ConfigsBuilder中使用,则在编译KCL应用程序代码时可能会看到以下错误消息。

Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'

KCL3.x 或更高版本要求您在参数后面再添加一个参数 applicationName (类型:String)。 tableName

  • 之前: leaseManagementConfig = 新增 LeaseManagementConfig (tableName,dynamoDBClient,,kinesisClient,streamName,workerIdentifier)

  • 之后: leaseManagementConfig = 新 LeaseManagementConfig (tableName,, applicationName,dynamoDBClient,kinesisClient,streamName,workerIdentifier)

我们建议不要直接创建 LeaseManagementConfig 对象,而是在 KCL 3.x及更高版本中使用ConfigsBuilder来设置配置。 ConfigsBuilder提供了一种更灵活、更易于维护的方式来配置您的KCL应用程序。

以下是使用ConfigsBuilder来设置KCL配置的示例。

ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig() .failoverTimeMillis(60000), // this is an example configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

使用 Kinesis Client Library 时会跳过某些 Kinesis Data Streams 记录

跳过记录的最常见原因是未处理从 processRecords 引发的异常。Kinesis 客户端库 (KCL) 依赖您的processRecords代码来处理数据记录时出现的任何异常。processRecords 引发的任何异常都会被 KCL 吸收。为避免因为反复出现的故障无休止地进行重试,KCL 不会重新发送在发生异常时处理的那批记录。然后,KCL 会在不重新启动记录处理器的情况下对下一批数据记录调用 processRecords。这有效地导致消费端应用程序观察到跳过的记录。要防止跳过记录,请适当处理 processRecords 中的所有异常。

属于同一分片的记录通过不同的记录处理器同时处理

对于任何正在运行的 Kinesis 客户端库 (KCL) 应用程序,一个分片只有一个所有者。但是,多个记录处理器可以临时处理同一分片。在工作程序实例丢失网络连接的情况下,KCL 会假定此无法联系的工作程序不再处理记录,并会在故障转移时间到期后指示其他工作程序来接管其工作。在一段很短的时间内,新的记录处理器和来自无法联系的工作程序的记录处理器可能都会处理来自同一个分片的数据。

您应该根据您的应用程序设置适当的故障转移时间。对于低延迟应用程序,默认值 10 秒可以代表您可以等待的最长时间。但是,在一些情况下,如您预计会有一些连接问题(如跨地理区域打电话)并且连接可能较频繁地中断时,这个数字设置可能就过低了。

您的应用程序应预料到并处理这种情况,尤其是因为网络连接通常会恢复到之前无法访问的工作程序。如果一个记录处理器让另一个记录处理器接手其分片,它必须处理以下两种情况才能顺利执行关闭:

  1. 当前对的调用完成后processRecords,将在记录处理器上KCL调用 shutdown 方法,关机原因为 “” ZOMBIE。您的记录处理器应视情况清除所有资源然后退出。

  2. 当您尝试从“zombie”工作程序执行检查点操作时,KCL 会引发 ShutdownException。收到此异常后,您的代码应完全退出当前方法。

有关更多信息,请参阅 处理重复记录

消费端应用程序的读取速率比预期的慢

读取吞吐量低于预期的最常见原因如下:

  1. 多个消费端应用程序的总读取量超过每个分片的限制。有关更多信息,请参阅 限额和限制。在此情况下,您可以增加 Kinesis 数据流中的分片数量。

  2. 指定每个调用的 GetRecords 最大数量的限制可能配置为较低值。如果您使用的是KCL,则可能已为工作器配置了较低的maxRecords属性值。一般来说,我们推荐对此属性使用系统默认值。

  3. 由于多种可能的原因,processRecords调用中的逻辑可能需要比预期更长的时间;逻辑可能CPU密集、I/O 阻塞或同步出现瓶颈。要测试是否如此,请对空的记录处理器进行测试运行并比较读取吞吐量。有关如何跟踪传入数据的信息,请参阅使用重新分片、扩展和并行处理更改分片数量

如果您只有一个消费端应用程序,那么读取速率比放入速率至少高两倍始终是可能的。这是因为每秒最多可以写入 1000 条记录,最大总数据写入速率为每秒 1MB(包括分区键)。每个开放分片每秒最多可读取 5 个事务,最大总数据读取速率为每秒 2 MB。请注意,每次读取(GetRecords 调用)都将获取一批记录。GetRecords 返回的数据的大小因分片的使用率而异。GetRecords 可返回的数据的最大大小为 10MB。如果某个调用返回了该限制,在接下来 5 分钟内进行的后续调用将引发 ProvisionedThroughputExceededException

GetRecords 即使流中有数据,也会返回一个空的记录数组

使用或获取记录是一种拉取模型。开发人员应GetRecords在没有退缩的情况下连续循环调用。每个 GetRecords 调用还将返回一个 ShardIterator 值,该值必须在循环的下一个迭代中使用。

GetRecords 操作不会阻止。相反,它将立即返回一些相关数据记录或一个空的 Records 元素。在两种情况下,将返回空的 Records 元素:

  1. 目前分片中没有更多数据。

  2. ShardIterator 指向的分片部分附近没有数据。

后一种情况很微妙,但却是避免在检索数据时搜寻时间无止境(延迟)的一种必要的设计折衷。因此,流使用应用程序应循环并调用 GetRecords,并且理所当然地处理空记录。

在生产场景中,仅当 NextShardIterator 值为 NULL 时,才应退出连续循环。当 NextShardIteratorNULL 时,这意味着当前分片已关闭,ShardIterator 值的指向应越过最后一条记录。如果使用应用程序从不调用 SplitShardMergeShards,分片将保持打开状态,并且对 GetRecords 的调用从不返回为 NextShardIteratorNULL 值。

如果您使用 Kinesis 客户端库 (KCL),则会为您抽象出上述消费模式。这包括自动处理一组动态变化的分片。在 KCL 中,开发人员只需提供处理传入记录的逻辑。这是可能的,因为该库会为您连续调用 GetRecords

分片迭代器意外过期

每个 GetRecords 请求返回新的分片迭代器(作为 NextShardIterator),然后您可以将其用于下一个 GetRecords 请求(作为 ShardIterator)。此分片迭代器一般来说不会在您使用前过期。不过,您可能会发现,由于您超过 5 分钟没有调用 GetRecords,或者您执行了消费端应用程序的重新启动操作,该分片迭代器会过期。

如果分片迭代器在您还没能使用之前很快过期,这可能表示 Kinesis 使用的 DynamoDB 表没有足够的容量存储租赁数据。如果您的分片数量很多,则很可能发生这种情况。要解决此问题,请增加分配给分片表的写入容量。有关更多信息,请参阅 使用租赁表来跟踪使用者应用程序处理的KCL分片

消费端记录处理滞后

对于大多数使用案例,消费端应用程序从流中读取最新数据。在特定情况下,消费端读取可能会滞后,而您可能并不希望出现这种情况。在确定您的消费端读取滞后多久后,请查看消费端滞后的最常见原因。

GetRecords.IteratorAgeMilliseconds 指标开始,该指标跟踪流中所有分片和消费端的读取位置。请注意,如果某个迭代器的寿命超过了保留期的 50%(默认值为 24 小时,最长可配置为 365 天),则存在记录过期造成数据丢失的风险。一种快速的权宜之计是增加保留期。这会在您进一步对问题进行故障排除时防止重要数据丢失。有关更多信息,请参阅 使用亚马逊监控亚马逊 Kinesis Data Streams 服务 CloudWatch。接下来,使用 Kinesis Client Library()发出的自定义 CloudWatch 指标,确定您的使用者应用程序从每个分片中读取数据落后了多远。KCL MillisBehindLatest有关更多信息,请参阅 使用亚马逊监控 Kinesis 客户端库 CloudWatch

下面是消费端滞后的最常见原因:

  • 突然大幅增加GetRecords.IteratorAgeMillisecondsMillisBehindLatest通常表示存在暂时性问题,例如下游应用程序的API操作失败。如果任何一个指标持续指示此行为,您应该调查这些突然增长的原因。

  • 这些指标的逐步增大表明,由于处理记录的速度不够快,消费端无法与流保持同步。此行为最常见的根本原因是没有足够的物理资源,或者记录处理逻辑没有随着流吞吐量的增大而进行扩展。您可以查看 KCL 发出的与 processTask 操作关联的其他自定义 CloudWatch 指标,包括 RecordProcessor.processRecords.TimeSuccessRecordsProcessed,来验证此行为。

    • 如果您发现与吞吐量上升相关的 processRecords.Time 指标发生增长,则应该分析记录处理逻辑,以确定为什么逻辑没有随吞吐量的增长而扩展。

    • 如果您发现与吞吐量上升无关的 processRecords.Time 值发生增长,请检查您是否在关键路径中执行了任何阻塞性调用,这通常会导致记录处理速度下降。替代方法是通过增加分片数来提高并行度。最后,确认在需求峰值期间,底层处理节点上有足够数量的物理资源(内存、CPU利用率等)。

未授权KMS的主密钥权限错误

当使用者应用程序在没有KMS主密钥权限的情况下从加密流中读取数据时,就会发生此错误。要向应用程序分配访问KMS密钥的权限,请参阅中的使用密钥策略 Amazon KMS使用IAM策略 Amazon KMS

排查消费端的其他常见问题