排查 MSK 复制器问题 - Amazon Managed Streaming for Apache Kafka
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

排查 MSK 复制器问题

以下信息可帮助您排查 MSK 复制器可能存在的问题。有关其他 Amazon MSK 功能的问题解决信息,请参阅排查 Amazon MSK 集群的问题。您也可以将问题发布到 Amazon Web Services re:Post

MSK 复制器状态从 CREATING 变为 FAILED

以下是 MSK 复制器创建失败的一些常见原因。

  1. 请验证您在目标集群部分中为创建复制器提供的安全组具有出站规则,以允许流量进入目标集群的安全组。此外,请验证目标集群的安全组是否有入站规则,这些规则接受来自您在目标集群部分中为创建复制器提供的安全组的流量。请参阅选择目标集群

  2. 如果您正在为跨区域复制创建复制器,请确认您的源集群已为 IAM 访问控制身份验证方法开启了多 VPC 连接。请参阅单区域中的 Amazon MSK 多 VPC 私有连接。此外,请验证是否已在源集群上设置集群策略,以便 MSK 复制器可以连接到源集群。请参阅准备 Amazon MSK 源集群

  3. 验证您在创建 MSK 复制器期间提供的 IAM 角色是否具有读写源集群和目标集群所需的权限。此外,还要验证 IAM 角色是否具有写入主题的权限。请参阅 配置复制器设置和权限

  4. 确认您的网络 ACL 没有阻止 MSK 复制器与您的源集群和目标集群之间的连接。

  5. 当 MSK 复制器尝试连接源集群或目标集群时,源集群或目标集群可能无法完全使用。这可能是由于过度负载、磁盘使用率或 CPU 使用率过高导致复制器无法连接到代理。修复代理的问题,然后重试创建复制器。

执行上述验证后,再次创建 MSK 复制器。

MSK 复制器似乎停留在 CREATING 状态

有时,MSK 复制器创建可能需要长达 30 分钟。请等待 30 分钟,然后再次检查集群的状态。

MSK 复制器没有复制数据或只复制部分数据

请按照以下步骤排查数据复制问题。

  1. 使用 Amazon CloudWatch 中 MSK 复制器提供的 AuthError 指标,验证复制器是否没有遇到任何身份验证错误。如果此指标大于 0,请检查您为复制器提供的 IAM 角色的策略是否有效,并且没有为集群权限设置拒绝权限。根据 clusterAlias 维度,您可以确定源集群或目标集群是否遇到身份验证错误。

  2. 请验证您的源集群和目标集群没有遇到任何问题。复制器可能无法连接到您的源集群或目标集群。这可能是由于连接过多、磁盘容量已满或 CPU 使用率过高所致。

  3. 使用 Amazon CloudWatch 中的 KafkaClusterPingSuccessCount 指标,验证您的源集群和目标集群是否可以从 MSK 复制器访问。根据 clusterAlias 维度,您可以确定源集群或目标集群是否遇到身份验证错误。如果该指标为 0 或没有数据点,则连接不正常。您应该检查 MSK 复制器用于连接到集群的网络和 IAM 角色权限。

  4. 使用 Amazon CloudWatch 中的 ReplicatorFailure 指标,验证您的复制器是否没有因缺少主题级权限而出现故障。如果此指标大于 0,请检查您为主题级别权限提供的 IAM 角色。

  5. 验证您在创建复制器时在允许列表中提供的正则表达式是否与您要复制的主题的名称相匹配。此外,请确认主题没有因为拒绝列表中的正则表达式而被排除在复制之外。

  6. 请注意,复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是“最新”(默认),则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。或者,如果想要在目标集群上复制主题上的现有消息,则可以从源集群主题分区中最早的偏移量开始复制。请参阅配置复制器设置和权限

目标集群中的消息偏移量与源集群不同

作为复制数据的一部分,MSK 复制器使用来自源集群的消息并将其生成到目标集群。这可能导致消息在源集群和目标集群上具有不同的偏移量。但是,如果您在复制器创建期间启用了消费者组偏移量同步功能,MSK 复制器将在复制元数据时自动转换偏移量,以便在失效转移到目标集群后,您的消费者可以从源集群中断的地方附近恢复处理。

MSK 复制器未同步消费组偏移量或目标集群上不存在消费者组

请按照以下步骤排查元数据复制问题。

  1. 验证您的数据复制是否按预期运行。如果不是,请参阅MSK 复制器没有复制数据或只复制部分数据

  2. 验证您在创建复制器时在允许列表中提供的正则表达式是否与要复制的消费者组的名称相匹配。此外,请确认消费者组没有因为拒绝列表中的正则表达式而被排除在复制之外。

  3. 验证 MSK 复制器是否已在目标集群上创建了该主题。复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是最新(默认),则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。如果源集群上的消费者组仅使用了 MSK 复制器未复制的消息,则该消费者组不会被复制到目标集群。在目标集群上成功创建主题后,MSK 复制器将开始将源集群上新写入的消息复制到目标集群。一旦您的消费者组开始从源读取这些消息,MSK 复制器就会自动将该消费者组复制到目标集群。或者,如果想要在目标集群上复制主题上的现有消息,则可以从源集群主题分区中最早的偏移量开始复制。请参阅配置复制器设置和权限

注意

MSK 复制器为源集群上的消费者优化了消费者组偏移量同步,这些消费者正在从更接近主题分区末端的位置进行读取。如果您的消费者组在源集群上出现延迟,那么与源相比,您可能会看到目标上的消费者组的延迟更高。这意味着在失效转移到目标集群后,您的消费者将重新处理更多重复消息。为了减少此延迟后,源集群上的消费者需要赶上进度并从流末端(主题分区的末尾)开始消耗。当您的消费者赶上时,MSK 复制器将自动减少延迟。

复制延迟很高或持续增加

以下是复制延迟较高的一些常见原因。

  1. 验证源和目标 MSK 集群上的分区数量是否正确。分区过少或过多会影响性能。有关选择分区数量的指导,请参阅使用 MSK 复制器的最佳实践。下表显示要使用 MSK 复制器实现所需吞吐量的建议最小分区数。

    吞吐量和建议的最小分区数
    吞吐量(MB/s) 需要的最小分区数量
    50 167
    100 334
    250 833
    500 1666
    1000 3333
  2. 验证您的源和目标 MSK 集群中是否有足够的读取和写入容量来支持复制流量。MSK 复制器充当源集群(出口)的使用器,也充当目标集群(入口)的生成器。因此,除了集群上的其他流量外,您还应预置集群容量以支持复制流量。有关调整 MSK 集群大小的指导,请参阅使用 MSK 复制器的最佳实践

  3. 不同源和目标 Amazon 区域对中的 MSK 集群的复制延迟可能会有所不同,具体取决于集群在地理上相隔的距离。例如,与欧洲地区(爱尔兰)和亚太地区(悉尼)区域的集群之间的复制相比,在欧洲地区(爱尔兰)和欧洲地区(伦敦)区域的集群之间进行复制时,复制延迟通常较低。

  4. 验证复制器没有因为在源集群或目标集群上设置的限额过于激进而受到限制。您可以使用 Amazon CloudWatch 中的 MSK 复制器提供的 ThrottleTime 指标来查看源/目标集群上的代理对请求施加节流限制的平均时间(以毫秒为单位)。如果此指标大于 0,则应调整 Kafka 限额以减少节流,以便复制器能够赶上。有关管理复制器的 Kafka 限额的信息,请参阅使用 Kafka 限额管理 MSK 复制器吞吐量

  5. 当 Amazon 区域降级时,ReplicationLatency 和 MessageLag 可能会增加。使用 Amazon 服务运行状况控制面板查看您的主 MSK 集群所在的区域中是否有 MSK 服务事件。如果有服务事件,可以临时将应用程序的读取和写入重定向到另一个区域。

使用 ReplicatorFailure 指标排除 MSK 复制器的故障

ReplicatorFailure 指标有助于监控和检测 MSK 复制器中的复制问题。此指标若为非零值,通常表示存在复制失败的问题,原因可能包括如下因素:

  • 消息大小限制

  • 时间戳超出范围

  • 记录批处理大小问题

如果 ReplicatorFailure 指标报告为非零值,请遵照以下步骤对问题进行故障排除。

注意

有关该指标的更多信息,请参阅MSK 复制器指标

  1. 配置一个能连接至目标 MSK 集群并设置了 Apache Kafka CLI 工具的客户端。有关设置客户端与 Kafka CLI 工具的信息,请参阅连接到预置 Amazon MSK 集群

  2. 打开 Amazon MSK 控制台,网址为:https://console.aws.amazon.com/msk/home?region=us-east-1#/home/

    然后执行以下操作:

    1. 获取 MSK 复制器及目标 MSK 集群的 ARN。

    2. 获取目标 MSK 集群的代理端点。这些端点将在后续步骤中使用。

  3. 运行以下命令,导出在上一步中获得的 MSK 复制器 ARN 与代理端点。

    确保将以下示例中使用的 <ReplicatorARN>、<BootstrapServerString> 和 <ConsumerConfigFile> 的占位符值替换为实际值。

    export TARGET_CLUSTER_SERVER_STRING=<BootstrapServerString>
    export REPLICATOR_ARN=<ReplicatorARN>
    export CONSUMER_CONFIG_FILE=<ConsumerConfigFile>
  4. <path-to-your-kafka-installation>/bin 目录中,执行以下操作:

    1. 保存以下脚本,并将其命名为 query-replicator-failure-message.sh

      #!/bin/bash # Script: Query MSK Replicator Failure Message # Description: This script queries exceptions from Amazon MSK Replicator status topics # It takes a replicator ARN and bootstrap server as input and searches for replicator exceptions # in the replicator's status topic, formatting and displaying them in a readable manner # # Required Arguments: # --replicator-arn: The ARN of the AWS MSK Replicator # --bootstrap-server: The Kafka bootstrap server to connect to # --consumer.config: Consumer config properties file # Usage Example: # ./query-replicator-failure-message.sh ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config> print_usage() { echo "USAGE: $0 ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>" echo "--replicator-arn <String: MSK Replicator ARN> REQUIRED: The ARN of AWS MSK Replicator." echo "--bootstrap-server <String: server to connect to> REQUIRED: The Kafka server to connect to." echo "--consumer.config <String: config file> REQUIRED: Consumer config properties file." exit 1 } # Initialize variables replicator_arn="" bootstrap_server="" consumer_config="" # Parse arguments while [[ $# -gt 0 ]]; do case "$1" in --replicator-arn) if [ -z "$2" ]; then echo "Error: --replicator-arn requires an argument." print_usage fi replicator_arn="$2"; shift 2 ;; --bootstrap-server) if [ -z "$2" ]; then echo "Error: --bootstrap-server requires an argument." print_usage fi bootstrap_server="$2"; shift 2 ;; --consumer.config) if [ -z "$2" ]; then echo "Error: --consumer.config requires an argument." print_usage fi consumer_config="$2"; shift 2 ;; *) echo "Unknown option: $1"; print_usage ;; esac done # Check for required arguments if [ -z "$replicator_arn" ] || [ -z "$bootstrap_server" ] || [ -z "$consumer_config" ]; then echo "Error: --replicator-arn, --bootstrap-server, and --consumer.config are required." print_usage fi # Extract replicator name and suffix from ARN replicator_arn_suffix=$(echo "$replicator_arn" | awk -F'/' '{print $NF}') replicator_name=$(echo "$replicator_arn" | awk -F'/' '{print $(NF-1)}') echo "Replicator name: $replicator_name" # List topics and find the status topic topics=$(./kafka-topics.sh --command-config client.properties --list --bootstrap-server "$bootstrap_server") status_topic_name="__amazon_msk_replicator_status_${replicator_name}_${replicator_arn_suffix}" # Check if the status topic exists if echo "$topics" | grep -Fq "$status_topic_name"; then echo "Found replicator status topic: '$status_topic_name'" ./kafka-console-consumer.sh --bootstrap-server "$bootstrap_server" --consumer.config "$consumer_config" --topic "$status_topic_name" --from-beginning | stdbuf -oL grep "Exception" | stdbuf -oL sed -n 's/.*Exception:\(.*\) Topic: \([^,]*\), Partition: \([^\]*\).*/ReplicatorException:\1 Topic: \2, Partition: \3/p' else echo "No topic matching the pattern '$status_topic_name' found." fi
    2. 运行此脚本,查询 MSK 复制器失败消息。

      <path-to-your-kafka-installation>/bin/query-replicator-failure-message.sh --replicator-arn $REPLICATOR_ARN --bootstrap-server $TARGET_CLUSTER_SERVER_STRING --consumer.config $CONSUMER_CONFIG_FILE

      此脚本输出了所有错误及其异常消息,以及受影响的主题分区。可使用该异常信息来缓解故障,如 常见 MSK 复制器故障及其解决方案 中所述。由于该主题包含了所有历史失败消息,因此应使用最后一条消息开始调查。以下是失败消息的示例。

      ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1

常见 MSK 复制器故障及其解决方案

以下列表描述了您可能遇到的 MSK 复制器故障,以及如何缓解这些故障。

消息大小大于 max.request.size
原因

当 MSK 复制器因为单个消息大小超过 10 MB 而无法复制数据时,就会发生这种故障。默认情况下,MSK 复制器会复制大小不超过 10 MB 的消息。

下面是此失败消息类型的示例。

ReplicatorException: The message is 20635370 bytes when serialized which is larger than 10485760, which is the value of the max.request.size configuration. Topic: test, Partition: 1
解决方案

减小主题中单个消息的大小。如果无法执行此操作,请按照以下说明申请提高限额

消息大小大于服务器能接受的最大消息大小
原因

当消息大小超过目标集群的最大消息大小时,就会发生这种故障。

下面是此失败消息类型的示例。

ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
解决方案

增大目标集群或相应目标集群主题的 max.message.bytes 配置。将目标集群的 max.message.bytes 配置设置为与最大未压缩消息大小一致。有关执行此操作的信息,请参阅 max.message.bytes

时间戳超出范围
原因

出现这种故障的原因是,单个消息的时间戳超出了目标集群的允许范围。

下面是此失败消息类型的示例。

ReplicatorException: Timestamp 1730137653724 of message with offset 0 is out of range. The timestamp should be within [1730137892239, 1731347492239] Topic: test, Partition: 1
解决方案

将目标集群的 message.timestamp.before.max.ms 配置更新为允许发送带较旧时间戳的消息。有关执行此操作的信息,请参阅 message.timestamp.before.max.ms

记录批处理过大
原因

出现这种故障的原因是,记录批处理大小超过了为目标集群上的主题设置的区段大小。MSK 复制器的最大批处理大小为 1MB。

下面是此失败消息类型的示例。

ReplicatorException: The request included message batch larger than the configured segment size on the server. Topic: test, Partition: 1
解决方案

目标集群的 segment.bytes 配置至少须与批处理大小 (1 MB) 一样大,复制器才能顺利进行操作。将目标集群的 segment.bytes 更新为至少 1048576 (1 MB)。有关此操作的信息,请参阅 segment.bytes

注意

如果 ReplicatorFailure 指标在应用上述解决方案后继续发出非零值,请重复进行故障排除过程,直至该指标发出一个零值。