Kinesis Data Streams 消费端问题排查 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Data Streams 消费端问题排查

LeaseManagementConfig 构造函数出现编译错误

升级到 Kinesis 客户端库 (KCL) 版本 3.x 或更高版本时,可能会遇到与构造函数相关的编译错误。LeaseManagementConfig如果您直接创建LeaseManagementConfig对象来设置配置,而不是ConfigsBuilder在 KCL 版本 3.x 或更高版本中使用,则在编译 KCL 应用程序代码时可能会看到以下错误消息。

Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'

3.x 或更高版本的 KCL 要求您在 tableName 参数之后再添加一个参数,即应用程序名称(类型:字符串)。

  • 之前: leaseManagementConfig = new LeaseManagementConfig(tableName、dynam DBClient o、KinesisClient、StreamName、workerIdentifier)

  • 之后: leaseManagementConfig = new LeaseManagementConfig(表名、应用程序名称、dynamo、KinesisClient、St reamName、workerI DBClient dentifier)

我们建议不要直接创建 LeaseManagementConfig 对象,而是使用ConfigsBuilder在 KCL 3.x 及更高版本中设置配置。 ConfigsBuilder提供了一种更灵活、更易于维护的方式来配置 KCL 应用程序。

以下是使用ConfigsBuilder设置 KCL 配置的示例。

ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig() .failoverTimeMillis(60000), // this is an example configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

使用 Kinesis Client Library 时会跳过某些 Kinesis Data Streams 记录

跳过记录的最常见原因是未处理从 processRecords 引发的异常。Kinesis Client Library(KCL)依靠 processRecords 代码来处理由处理数据记录引起的任何异常。processRecords 引发的任何异常都会被 KCL 吸收。为避免因为反复出现的故障无休止地进行重试,KCL 不会重新发送在发生异常时处理的那批记录。然后,KCL 会在不重新启动记录处理器的情况下对下一批数据记录调用 processRecords。这有效地导致消费端应用程序观察到跳过的记录。要防止跳过记录,请适当处理 processRecords 中的所有异常。

属于同一分片的记录通过不同的记录处理器同时处理

对于任何正在运行的 Kinesis Client Library(KCL)应用程序,一个分片只有一个所有者。但是,多个记录处理器可以临时处理同一分片。如果工作器实例失去网络连接,KCL 会假设无法访问的工作器在故障转移时间到期后不再处理记录,并指示其他工作实例接管。在一段很短的时间内,新的记录处理器和来自无法联系的工作程序的记录处理器可能都会处理来自同一个分片的数据。

设置适合您的应用程序的故障转移时间。对于低延迟应用程序,默认值 10 秒可以代表您可以等待的最长时间。但是,在一些情况下,如您预计会有一些连接问题(如跨地理区域打电话)并且连接可能较频繁地中断时,这个数字设置可能就过低了。

您的应用程序应预料到并处理这种情况,尤其是因为网络连接通常会恢复到之前无法访问的工作程序。如果一个记录处理器让另一个记录处理器接手其分片,它必须处理以下两种情况才能顺利执行关闭:

  1. 当前对的调用完成后,KCL 在记录处理器上调用关机方法,关闭原因为 “ZOMBIE”。processRecords您的记录处理器应视情况清除所有资源然后退出。

  2. 当您尝试从“zombie”工作程序执行检查点操作时,KCL 会引发 ShutdownException。收到此异常后,您的代码应完全退出当前方法。

有关更多信息,请参阅 处理重复记录

消费端应用程序的读取速率比预期的慢

读取吞吐量低于预期的最常见原因如下:

  1. 多个消费端应用程序的总读取量超过每个分片的限制。有关更多信息,请参阅 限额和限制。在此情况下,您可以增加 Kinesis 数据流中的分片数量。

  2. 指定每个调用的 GetRecords 最大数量的限制可能配置为较低值。如果您正在使用 KCL,您可能已经使用一个较低的 maxRecords 属性值配置了工作程序。一般来说,我们推荐对此属性使用系统默认值。

  3. 出于很多可能的原因,您的 processRecords 调用内的逻辑花费的时间可能超过预期;该逻辑可能是 CPU 使用率高、I/O 阻止或同步出现瓶颈。要测试是否如此,请对空的记录处理器进行测试运行并比较读取吞吐量。有关如何跟踪传入数据的信息,请参阅使用重新分片、扩展和并行处理更改分片数量

如果您只有一个消费端应用程序,那么读取速率比放入速率至少高两倍始终是可能的。这是因为每秒最多可以写入 1000 条记录,最大总数据写入速率为每秒 1MB(包括分区键)。每个开放分片每秒最多可读取 5 个事务,最大总数据读取速率为每秒 2 MB。请注意,每次读取(GetRecords 调用)都将获取一批记录。GetRecords 返回的数据的大小因分片的使用率而异。GetRecords 可返回的数据的最大大小为 10MB。如果呼叫返回该限制,则在接下来的 5 秒钟内进行的后续调用会抛出ProvisionedThroughputExceededException

GetRecords 即使流中有数据,也会返回一个空的记录数组

使用或获取记录是一种拉取模型。开发人员应GetRecords在没有退缩的情况下连续循环调用。每个 GetRecords 调用还将返回一个 ShardIterator 值,该值必须在循环的下一个迭代中使用。

GetRecords 操作不会阻止。相反,它将立即返回一些相关数据记录或一个空的 Records 元素。在两种情况下,将返回空的 Records 元素:

  1. 目前分片中没有更多数据。

  2. ShardIterator 指向的分片部分附近没有数据。

后一种情况很微妙,但却是避免在检索数据时搜寻时间无止境(延迟)的一种必要的设计折衷。因此,流使用应用程序应循环并调用 GetRecords,并且理所当然地处理空记录。

在生产场景中,仅当 NextShardIterator 值为 NULL 时,才应退出连续循环。当 NextShardIteratorNULL 时,这意味着当前分片已关闭,ShardIterator 值的指向应越过最后一条记录。如果使用应用程序从不调用 SplitShardMergeShards,分片将保持打开状态,并且对 GetRecords 的调用从不返回为 NextShardIteratorNULL 值。

如果您使用 Kinesis 客户端库 (KCL),则会为您抽象出前面的消费模式。这包括自动处理一组动态变化的分片。在 KCL 中,开发人员只需提供处理传入记录的逻辑。这是可能的,因为该库会为您连续调用 GetRecords

分片迭代器意外过期

每个 GetRecords 请求返回新的分片迭代器(作为 NextShardIterator),然后您可以将其用于下一个 GetRecords 请求(作为 ShardIterator)。此分片迭代器一般来说不会在您使用前过期。不过,您可能会发现,由于您超过 5 分钟没有调用 GetRecords,或者您执行了消费端应用程序的重新启动操作,该分片迭代器会过期。

如果分片迭代器在您使用之前立即过期,则可能表明 Kinesis 使用的 DynamoDB 表没有足够的容量来存储租用数据。如果您的分片数量很多,则很可能发生这种情况。要解决此问题,请增加分配给分片表的写入容量。有关更多信息,请参阅 使用租约表跟踪 KCL 消费端应用程序处理的分片

消费端记录处理滞后

对于大多数使用案例,消费端应用程序从流中读取最新数据。在特定情况下,消费端读取可能会滞后,而您可能并不希望出现这种情况。在确定您的消费端读取滞后多久后,请查看消费端滞后的最常见原因。

GetRecords.IteratorAgeMilliseconds 指标开始,该指标跟踪流中所有分片和消费端的读取位置。请注意,如果某个迭代器的寿命超过了保留期的 50%(默认值为 24 小时,最长可配置为 365 天),则存在记录过期造成数据丢失的风险。一种快速的权宜之计是增加保留期。这会在您进一步对问题进行故障排除时防止重要数据丢失。有关更多信息,请参阅 使用 Amazon Kinesis Data Streams 服务 CloudWatch。接下来,使用 Kinesis 客户端库 (KCL) 发出的自定义 CloudWatch 指标,确定您的使用者应用程序从每个分片中读取数据落后了多远。MillisBehindLatest有关更多信息,请参阅 使用 Amazon 监控 Kinesis Client Library CloudWatch

下面是消费端滞后的最常见原因:

  • GetRecords.IteratorAgeMillisecondsMillisBehindLatest 突然发生大幅提升,通常表明临时性问题,例如下游应用程序的 API 操作失败。如果其中一个指标始终显示这种行为,请调查这些突然增加。

  • 这些指标的逐步增大表明,由于处理记录的速度不够快,消费端无法与流保持同步。此行为最常见的根本原因是没有足够的物理资源,或者记录处理逻辑没有随着流吞吐量的增大而进行扩展。您可以通过查看 KCL 发出的与processTask操作相关的其他自定义 CloudWatch指标(包括RecordProcessor.processRecords.TimeSuccess、和)来验证此行为。RecordsProcessed

    • 如果您发现与吞吐量上升相关的 processRecords.Time 指标发生增长,则应该分析记录处理逻辑,以确定为什么逻辑没有随吞吐量的增长而扩展。

    • 如果您发现与吞吐量上升无关的 processRecords.Time 值发生增长,请检查您是否在关键路径中执行了任何阻塞性调用,这通常会导致记录处理速度下降。替代方法是通过增加分片数来提高并行度。最后,确认在需求峰值期间,底层处理节点上有足够数量的物理资源(内存、CPU 利用率等)。

未经授权的 KMS 密钥权限错误

当使用者应用程序在没有 Amazon KMS 密钥权限的情况下从加密流中读取数据时,就会发生此错误。要为应用程序分配权限以访问 KMS 密钥,请参阅 Using Key Policies in Amazon KMSUsing IAM Policies with Amazon KMS

DynamoDbException: 更新表达式中提供的文档路径无效,无法更新

在 2.27.19 至 2.27.23 适用于 Java 的 Amazon SDK 版本中使用 KCL 3.x 时,您可能会遇到以下 DynamoDB 异常:

“软件.amazon.awssdk.services.dynamodb.model。 DynamoDbException: 更新表达式中提供的文档路径无法更新(服务: DynamoDb,状态码:400,请求 ID:xxx)”

之所以出现此错误,是因为中存在影响由 KCL 3.x 管理 适用于 Java 的 Amazon SDK 的 DynamoDB 元数据表的已知问题。该问题是在2.27.19版本中引入的,影响了2.27.23之前的所有版本。该问题已在 2.27.24 适用于 Java 的 Amazon SDK 版本中得到解决。为了获得最佳性能和稳定性,我们建议升级到 2.28.0 或更高版本。

排查消费端的其他常见问题