排查 MSK 复制器问题 - Amazon Managed Streaming for Apache Kafka
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

排查 MSK 复制器问题

以下信息可帮助您排查 MSK 复制器可能存在的问题。有关其他 Amazon MSK 功能的问题解决信息,请参阅排查 Amazon MSK 集群的问题。您也可以将问题发布到 Amazon Web Services re:Post

MSK 复制器状态从 CREATING 变为 FAILED

以下是 MSK 复制器创建失败的一些常见原因。

  1. 请验证您在目标集群部分中为创建复制器提供的安全组具有出站规则,以允许流量进入目标集群的安全组。此外,请验证目标集群的安全组是否有入站规则,这些规则接受来自您在目标集群部分中为创建复制器提供的安全组的流量。请参阅选择目标集群

  2. 如果您正在为跨区域复制创建复制器,请确认您的源集群已为 IAM 访问控制身份验证方法开启了多 VPC 连接。请参阅单区域中的 Amazon MSK 多 VPC 私有连接。此外,请验证是否已在源集群上设置集群策略,以便 MSK 复制器可以连接到源集群。请参阅准备 Amazon MSK 源集群

  3. 验证您在创建 MSK 复制器期间提供的 IAM 角色是否具有读写源集群和目标集群所需的权限。此外,还要验证 IAM 角色是否具有写入主题的权限。请参阅 配置复制器设置和权限

  4. 确认您的网络 ACLs 没有阻止 MSK 复制器与您的源集群和目标集群之间的连接。

  5. 当 MSK 复制器尝试连接源集群或目标集群时,源集群或目标集群可能无法完全使用。这可能是由于过度负载、磁盘使用率或 CPU 使用率过高导致复制器无法连接到代理。修复代理的问题,然后重试创建复制器。

执行上述验证后,再次创建 MSK 复制器。

MSK 复制器似乎停留在 CREATING 状态

有时,MSK 复制器创建可能需要长达 30 分钟。请等待 30 分钟,然后再次检查集群的状态。

MSK 复制器没有复制数据或只复制部分数据

请按照以下步骤排查数据复制问题。

  1. 使用 Amazon 中 MSK 复制器提供的 AuthError 指标,验证复制器没有遇到任何身份验证错误。 CloudWatch如果此指标大于 0,请检查您为复制器提供的 IAM 角色的策略是否有效,并且没有为集群权限设置拒绝权限。根据 clusterAlias 维度,您可以确定源集群或目标集群是否遇到身份验证错误。

  2. 请验证您的源集群和目标集群没有遇到任何问题。复制器可能无法连接到您的源集群或目标集群。这可能是由于连接过多、磁盘容量已满或 CPU 使用率过高所致。

  3. 使用 Amazon 中的 KafkaClusterPingSuccessCount 指标,验证您的源集群和目标集群是否可以从 MSK 复制器访问。 CloudWatch根据 clusterAlias 维度,您可以确定源集群或目标集群是否遇到身份验证错误。如果该指标为 0 或没有数据点,则连接不正常。您应该检查 MSK 复制器用于连接到集群的网络和 IAM 角色权限。

  4. 使用 Amazon 中的指标,验证您的复制器是否没有因缺少主题级权限而出现故障。 ReplicatorFailure CloudWatch如果此指标大于 0,请检查您为主题级别权限提供的 IAM 角色。

  5. 验证您在创建复制器时在允许列表中提供的正则表达式是否与您要复制的主题的名称相匹配。此外,请确认主题没有因为拒绝列表中的正则表达式而被排除在复制之外。

  6. 请注意,复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是“最新”(默认),则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。或者,如果想要在目标集群上复制主题上的现有消息,则可以从源集群主题分区中最早的偏移量开始复制。请参阅配置复制器设置和权限

目标集群中的消息偏移量与源集群不同

作为复制数据的一部分,MSK 复制器使用来自源集群的消息并将其生成到目标集群。这可能导致消息在源集群和目标集群上具有不同的偏移量。但是,如果您在复制器创建期间启用了消费者组偏移量同步功能,MSK 复制器将在复制元数据时自动转换偏移量,以便在失效转移到目标集群后,您的消费者可以从源集群中断的地方附近恢复处理。

MSK 复制器未同步消费组偏移量或目标集群上不存在消费者组

请按照以下步骤排查元数据复制问题。

  1. 验证您的数据复制是否按预期运行。如果不是,请参阅MSK 复制器没有复制数据或只复制部分数据

  2. 验证您在创建复制器时在允许列表中提供的正则表达式是否与要复制的消费者组的名称相匹配。此外,请确认消费者组没有因为拒绝列表中的正则表达式而被排除在复制之外。

  3. 验证 MSK 复制器是否已在目标集群上创建了该主题。复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是最新(默认),则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。如果源集群上的消费者组仅使用了 MSK 复制器未复制的消息,则该消费者组不会被复制到目标集群。在目标集群上成功创建主题后,MSK 复制器将开始将源集群上新写入的消息复制到目标集群。一旦您的消费者组开始从源读取这些消息,MSK 复制器就会自动将该消费者组复制到目标集群。或者,如果想要在目标集群上复制主题上的现有消息,则可以从源集群主题分区中最早的偏移量开始复制。请参阅配置复制器设置和权限

注意

MSK 复制器为源集群上的消费者优化了消费者组偏移量同步,这些消费者正在从更接近主题分区末端的位置进行读取。如果您的消费者组在源集群上出现延迟,那么与源相比,您可能会看到目标上的消费者组的延迟更高。这意味着在失效转移到目标集群后,您的消费者将重新处理更多重复消息。为了减少此延迟后,源集群上的消费者需要赶上进度并从流末端(主题分区的末尾)开始消耗。当您的消费者赶上时,MSK 复制器将自动减少延迟。

复制延迟很高或持续增加

以下是复制延迟较高的一些常见原因。

  1. 验证源和目标 MSK 集群上的分区数量是否正确。分区过少或过多会影响性能。有关选择分区数量的指导,请参阅使用 MSK 复制器的最佳实践。下表显示要使用 MSK 复制器实现所需吞吐量的建议最小分区数。

    吞吐量和建议的最小分区数
    吞吐量(MB/s) 需要的最小分区数量
    50 167
    100 334
    250 833
    500 1666
    1000 3333
  2. 验证您的源和目标 MSK 集群中是否有足够的读取和写入容量来支持复制流量。MSK 复制器充当源集群(出口)的使用器,也充当目标集群(入口)的生成器。因此,除了集群上的其他流量外,您还应预置集群容量以支持复制流量。有关调整 MSK 集群大小的指导,请参阅使用 MSK 复制器的最佳实践

  3. 不同源和目标 Amazon 区域对中的 MSK 集群的复制延迟可能会有所不同,具体取决于集群在地理上相隔的距离。例如,与欧洲地区(爱尔兰)和亚太地区(悉尼)区域的集群之间的复制相比,在欧洲地区(爱尔兰)和欧洲地区(伦敦)区域的集群之间进行复制时,复制延迟通常较低。

  4. 验证复制器没有因为在源集群或目标集群上设置的限额过于激进而受到限制。您可以使用 Amazon 中 MSK 复制器提供的 ThrottleTime指标 CloudWatch 来查看源/目标集群上的代理对请求施加节流限制的平均时间(以毫秒为单位)。如果此指标大于 0,则应调整 Kafka 限额以减少节流,以便复制器能够赶上。有关管理复制器的 Kafka 限额的信息,请参阅使用 Kafka 限额管理 MSK 复制器吞吐量

  5. ReplicationLatency 当 Amazon 区域降级时, MessageLag 可能会增加。使用 Amazon 服务运行状况控制面板查看您的主 MSK 集群所在的区域中是否有 MSK 服务事件。如果有服务事件,可以临时将应用程序的读取和写入重定向到另一个区域。

使用指标排除 MSK 复制器故障 ReplicatorFailure

该 ReplicatorFailure 指标可帮助您监控和检测 MSK Replicator 中的复制问题。此指标的非零值通常表示复制失败问题,这可能是由以下因素造成的:

  • 消息大小限制

  • 时间戳范围违规问题

  • 记录批次大小

如果该 ReplicatorFailure 指标报告的值为非零,请按照以下步骤解决问题。

注意

有关该指标的更多信息,请参阅MSK 复制器指标

  1. 配置一个能够连接到目标 MSK 集群并设置 Apache Kafka CLI 工具的客户端。有关设置客户端和 Kafka CLI 工具的信息,请参阅连接到 Amazon MSK 集群

  2. https://console.aws.amazon.com/msk/家打开 Amazon MSK 控制台? 区域=us-east-1#/home/。

    然后执行以下操作:

    1. 获取 MSK 复制器和目标 MSK 集群的。 ARNs

    2. 获取目标 MSK 集群的代理终端节点。您将在后续步骤中使用这些终端节点。

  3. 运行以下命令导出您在上一步中获得的 MSK 复制器 ARN 和代理端点。

    确保将以下示例中使用的 < ReplicatorARN >、< BootstrapServerString > 和 < ConsumerConfigFile > 占位符值替换为实际值。

    export TARGET_CLUSTER_SERVER_STRING=<BootstrapServerString>
    export REPLICATOR_ARN=<ReplicatorARN>
    export CONSUMER_CONFIG_FILE=<ConsumerConfigFile>
  4. 在您的<path-to-your-kafka-installation>/bin目录中,执行以下操作:

    1. 保存以下脚本并将其命名query-replicator-failure-message.sh

      #!/bin/bash # Script: Query MSK Replicator Failure Message # Description: This script queries exceptions from Amazon MSK Replicator status topics # It takes a replicator ARN and bootstrap server as input and searches for replicator exceptions # in the replicator's status topic, formatting and displaying them in a readable manner # # Required Arguments: # --replicator-arn: The ARN of the AWS MSK Replicator # --bootstrap-server: The Kafka bootstrap server to connect to # --consumer.config: Consumer config properties file # Usage Example: # ./query-replicator-failure-message.sh ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config> print_usage() { echo "USAGE: $0 ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>" echo "--replicator-arn <String: MSK Replicator ARN> REQUIRED: The ARN of AWS MSK Replicator." echo "--bootstrap-server <String: server to connect to> REQUIRED: The Kafka server to connect to." echo "--consumer.config <String: config file> REQUIRED: Consumer config properties file." exit 1 } # Initialize variables replicator_arn="" bootstrap_server="" consumer_config="" # Parse arguments while [[ $# -gt 0 ]]; do case "$1" in --replicator-arn) if [ -z "$2" ]; then echo "Error: --replicator-arn requires an argument." print_usage fi replicator_arn="$2"; shift 2 ;; --bootstrap-server) if [ -z "$2" ]; then echo "Error: --bootstrap-server requires an argument." print_usage fi bootstrap_server="$2"; shift 2 ;; --consumer.config) if [ -z "$2" ]; then echo "Error: --consumer.config requires an argument." print_usage fi consumer_config="$2"; shift 2 ;; *) echo "Unknown option: $1"; print_usage ;; esac done # Check for required arguments if [ -z "$replicator_arn" ] || [ -z "$bootstrap_server" ] || [ -z "$consumer_config" ]; then echo "Error: --replicator-arn, --bootstrap-server, and --consumer.config are required." print_usage fi # Extract replicator name and suffix from ARN replicator_arn_suffix=$(echo "$replicator_arn" | awk -F'/' '{print $NF}') replicator_name=$(echo "$replicator_arn" | awk -F'/' '{print $(NF-1)}') echo "Replicator name: $replicator_name" # List topics and find the status topic topics=$(./kafka-topics.sh --command-config client.properties --list --bootstrap-server "$bootstrap_server") status_topic_name="__amazon_msk_replicator_status_${replicator_name}_${replicator_arn_suffix}" # Check if the status topic exists if echo "$topics" | grep -Fq "$status_topic_name"; then echo "Found replicator status topic: '$status_topic_name'" ./kafka-console-consumer.sh --bootstrap-server "$bootstrap_server" --consumer.config "$consumer_config" --topic "$status_topic_name" --from-beginning | stdbuf -oL grep "Exception" | stdbuf -oL sed -n 's/.*Exception:\(.*\) Topic: \([^,]*\), Partition: \([^\]*\).*/ReplicatorException:\1 Topic: \2, Partition: \3/p' else echo "No topic matching the pattern '$status_topic_name' found." fi
    2. 运行此脚本查询 MSK Replicator 的故障消息。

      <path-to-your-kafka-installation>/bin/query-replicator-failure-message.sh --replicator-arn $REPLICATOR_ARN --bootstrap-server $TARGET_CLUSTER_SERVER_STRING --consumer.config $CONSUMER_CONFIG_FILE

      此脚本输出所有错误及其异常消息和受影响的主题分区。您可以使用此异常信息来缓解故障,如中所述常见的 MSK 复制器故障及其解决方案。由于该主题包含所有历史失败消息,因此请使用最后一条消息开始调查。下面是一个失败消息示例。

      ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1

常见的 MSK 复制器故障及其解决方案

以下列表描述了您可能遇到的一些 MSK Replicator 故障以及如何缓解这些故障。

邮件大小大于 max.request.size
原因

当 MSK Replicator 由于单个邮件大小超过 10 MB 而无法复制数据时,就会发生此故障。默认情况下,MSK Replicator 会复制大小不超过 10 MB 的消息。

下面是此失败消息类型的示例。

ReplicatorException: The message is 20635370 bytes when serialized which is larger than 10485760, which is the value of the max.request.size configuration. Topic: test, Partition: 1
解决方案

缩小主题中单个消息的大小。如果您无法执行此操作,请按照以下说明申请提高限额

邮件大小大于服务器将接受的最大邮件大小
原因

当消息大小超过目标集群的最大消息大小时,就会发生此故障。

下面是此失败消息类型的示例。

ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
解决方案

增加目标集群或相应目标集群主题的max.message.bytes配置。将目标集群的max.message.bytes配置设置为与您最大的未压缩消息大小相匹配。有关执行此操作的信息,请参阅 max.message.bytes。

时间戳超出范围
原因

之所以出现此故障,是因为单个消息的时间戳超出了目标集群的允许范围。

下面是此失败消息类型的示例。

ReplicatorException: Timestamp 1730137653724 of message with offset 0 is out of range. The timestamp should be within [1730137892239, 1731347492239] Topic: test, Partition: 1
解决方案

更新目标集群的message.timestamp.before.max.ms配置,以允许发送带有较旧时间戳的消息。有关执行此操作的信息,请参阅 m essag e.timestamp.before.max.ms。

记录批次太大
原因

之所以出现此故障,是因为记录的批量大小超过了目标集群上为主题设置的区段大小。MSK 复制器支持的最大批次大小。

下面是此失败消息类型的示例。

ReplicatorException: The request included message batch larger than the configured segment size on the server. Topic: test, Partition: 1
解决方案

目标集群的 segment.bytes 配置必须至少等于批处理大小 (1 MB),Replicator 才能正常运行。将目标集群的 segment.bytes 更新为至少 1048576 (1 MB)。有关执行此操作的信息,请参阅 s egment .bytes。

注意

如果在应用这些解决方案后 ReplicatorFailure 指标继续发出非零值,请重复故障排除过程,直到该指标发出零值。