

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 对 Amazon MSK 复制器进行故障排除
<a name="msk-replicator-troubleshooting"></a>

以下信息可以帮助您解决 MSK Replicator 的问题。[排查 Amazon MSK 集群的问题](troubleshooting.md)有关其他 Amazon MSK 功能，请参阅。您也可以将问题发布到 [Amazon Web Services re:Post](https://repost.aws/)。

## 复制器状态从 “创建” 变为 “失败”
<a name="msk-replicator-ts-failed-state"></a>

创建 MSK 复制器失败的常见原因：

1. 验证您为目标集群提供的安全组是否具有允许流量流向目标集群安全组的出站规则，以及目标集群的安全组是否具有接受来自 Replicator 安全组的流量的入站规则。

1. 对于跨区域复制，请确认您的源集群已启用多 VPC 连接以实现 IAM 访问控制，并且已在源集群上设置集群策略。

1. 验证创建期间提供的 IAM 角色是否具有读写源集群和目标集群所需的权限，包括写入主题的权限。

1. 确认您的网络 ACLs 没有阻止 MSK Replicator 与您的集群之间的连接。

1. 当 MSK Replicator 尝试连接时，源集群或目标集群可能无法完全使用。这可能是由于负载过大、磁盘使用率或 CPU 使用率所致。修复代理的问题，然后重试创建复制器。

执行上述验证后，再次创建 MSK 复制器。

## Replicator 显示停滞在 “创建” 状态
<a name="msk-replicator-ts-stuck-creating"></a>

创建 MSK Replicator 最多可能需要 30 分钟。请等待 30 分钟，然后再次检查集群的状态。

## Replicator 没有复制数据或只复制部分数据
<a name="msk-replicator-ts-not-replicating"></a>

1. 使用 Amazon CloudWatch 中的`AuthError`指标验证您的 Replicator 是否没有遇到身份验证错误。如果此指标大于 0，请检查 IAM 角色策略并确保没有为集群权限设置拒绝权限。

1. 确认您的源集群和目标集群没有遇到问题（连接过多、磁盘容量已满或 CPU 使用率过高）。

1. 使用该`KafkaClusterPingSuccessCount`指标验证您的集群是否可以访问。如果此指标为 0 或没有数据点，请检查网络和 IAM 角色权限。

1. 使用该`ReplicatorFailure`指标验证您的 Replicator 是否没有遇到故障。如果大于 0，请检查 IAM 角色的主题级权限。

1. 验证允许列表中的正则表达式是否与您要复制的主题的名称相匹配，并且拒绝列表没有排除主题。

1. Replicator 最多可能需要 30 秒才能检测和创建新主题。如果起始位置为最*新*（默认），则不会复制在目标集群上创建主题之前生成的消息。

## 目标集群中的消息偏移量与源集群不同
<a name="msk-replicator-ts-offset-mismatch"></a>

MSK Replicator 使用来自源集群的消息并将其生成到目标集群，这可能会导致不同的偏移。如果您已开启使用者组偏移同步，MSK Replicator 将自动转换偏移量，以便在故障转移后，您的使用者可以从上次停下来的位置附近恢复处理。

## Replicator 未同步使用者组偏移量
<a name="msk-replicator-ts-consumer-groups"></a>

1. 验证数据复制是否按预期运行。

1. 验证允许列表中的正则表达式是否与您要复制的使用者组匹配。

1. 验证 MSK 复制器是否已在目标集群上创建了该主题。如果源集群上的使用者组仅消费了未复制的消息，则该使用者组不会被复制到目标集群。一旦您的使用者组开始读取新复制的消息，MSK Replicator 将自动复制该消费者组。

**注意**  
MSK Replicator 优化了消费者组偏移量同步，便于消费者从主题分区快要结束时读取。如果您的消费组在源集群上滞后，则目标集群的延迟可能会更高。当您的消费者赶上时，MSK 复制器将自动减少延迟。

## 复制延迟很高或持续增加
<a name="msk-replicator-ts-high-latency"></a>

1. 验证您的分区数量是否正确。下表显示了所需吞吐量的建议的最小分区数。  
**吞吐量和建议的最小分区数**    
[See the AWS documentation website for more details](http://docs.amazonaws.cn/msk/latest/developerguide/msk-replicator-troubleshooting.html)

1. 验证您的集群中是否有足够的读取和写入容量。MSK 复制器充当源集群（出口）的使用器，也充当目标集群（入口）的生成器。配置集群容量以支持复制流量以及其他流量。

1. 复制延迟因区域对距离而异。

1. 使用该指标验证您的 Replicator 是否受到限制。`ThrottleTime`如果大于 0，请调整 Kafka 配额。请参阅[使用 Kafka 配额管理吞吐量](msk-replicator-bp-quotas.md)。

1. 查看 S [Amazon ervice Health Das](https://health.aws.amazon.com/health/status) hboard，了解您所在地区的 MSK 服务事件。

## 使用 ReplicatorFailure 指标进行故障排除
<a name="msk-replicator-ts-replicator-failure"></a>

该`ReplicatorFailure`指标可帮助您监控和检测复制问题。非零值通常表示由于邮件大小限制、时间戳范围违规或记录批量大小问题而导致的复制失败。如果您为复制器配置了日志传输，则可以使用传送的日志消息来识别特定的故障。有关更多详细信息，请参阅[MSK 复制器日志](msk-replicator-logs.md)。如果未配置日志传输，请按照以下步骤查询复制器的状态主题以获取错误消息。

如果该`ReplicatorFailure`指标报告的值为非零，请按照以下步骤进行故障排除：

1. 配置一个可以连接到目标 MSK 集群并设置 Apache Kafka CLI 工具的客户端。请参阅[连接到预置 Amazon MSK 集群](client-access.md)。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.amazonaws.cn/msk/home?region=us-east-1#/home/)-east-1\#/home/。

   获取 MSK Replicator 和目标 MSK 集群的，并[获取目标 MSK 集群的代理终端节点](get-bootstrap-console.md)。 ARNs 

1. 导出 MSK Replicator ARN 和代理端点：

   ```
   export TARGET_CLUSTER_SERVER_STRING=<BootstrapServerString>
   export REPLICATOR_ARN=<ReplicatorARN>
   export CONSUMER_CONFIG_FILE=<ConsumerConfigFile>
   ```

1. 在您的`<path-to-your-kafka-installation>/bin`目录中，将以下脚本另存为**query-replicator-failure-message.sh**。

   ```
   #!/bin/bash
   
   # Script: Query MSK Replicator Failure Message
   # Description: This script queries exceptions from Amazon MSK Replicator status topics
   # It takes a replicator ARN and bootstrap server as input and searches for replicator exceptions
   # in the replicator's status topic, formatting and displaying them in a readable manner
   #
   # Required Arguments:
   #   --replicator-arn: The ARN of the AWS MSK Replicator
   #   --bootstrap-server: The Kafka bootstrap server to connect to
   #   --consumer.config: Consumer config properties file
   # Usage Example:
   #   ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>
   
   print_usage() {
     echo "USAGE: $0 ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>"
     echo "--replicator-arn <String: MSK Replicator ARN>      REQUIRED: The ARN of AWS MSK Replicator."
     echo "--bootstrap-server <String: server to connect to>  REQUIRED: The Kafka server to connect to."
     echo "--consumer.config <String: config file>            REQUIRED: Consumer config properties file."
     exit 1
   }
   
   # Initialize variables
   replicator_arn=""
   bootstrap_server=""
   consumer_config=""
   
   # Parse arguments
   while [[ $# -gt 0 ]]; do
     case "$1" in
       --replicator-arn)
         if [ -z "$2" ]; then
           echo "Error: --replicator-arn requires an argument."
           print_usage
         fi
         replicator_arn="$2"; shift 2 ;;
       --bootstrap-server)
         if [ -z "$2" ]; then
           echo "Error: --bootstrap-server requires an argument."
           print_usage
         fi
         bootstrap_server="$2"; shift 2 ;;
       --consumer.config)
         if [ -z "$2" ]; then
           echo "Error: --consumer.config requires an argument."
           print_usage
         fi
         consumer_config="$2"; shift 2 ;;
       *) echo "Unknown option: $1"; print_usage ;;
     esac
   done
   
   # Check for required arguments
   if [ -z "$replicator_arn" ] || [ -z "$bootstrap_server" ] || [ -z "$consumer_config" ]; then
     echo "Error: --replicator-arn, --bootstrap-server, and --consumer.config are required."
     print_usage
   fi
   
   # Extract replicator name and suffix from ARN
   replicator_arn_suffix=$(echo "$replicator_arn" | awk -F'/' '{print $NF}')
   replicator_name=$(echo "$replicator_arn" | awk -F'/' '{print $(NF-1)}')
   echo "Replicator name: $replicator_name"
   
   # List topics and find the status topic
   topics=$(./kafka-topics.sh --command-config client.properties --list --bootstrap-server "$bootstrap_server")
   status_topic_name="__amazon_msk_replicator_status_${replicator_name}_${replicator_arn_suffix}"
   
   # Check if the status topic exists
   if echo "$topics" | grep -Fq "$status_topic_name"; then
     echo "Found replicator status topic: '$status_topic_name'"
     ./kafka-console-consumer.sh --bootstrap-server "$bootstrap_server" --consumer.config "$consumer_config" --topic "$status_topic_name" --from-beginning | stdbuf -oL grep "Exception" | stdbuf -oL sed -n 's/.*Exception:\(.*\) Topic: \([^,]*\), Partition: \([^\]*\).*/ReplicatorException:\1 Topic: \2, Partition: \3/p'
   else
     echo "No topic matching the pattern '$status_topic_name' found."
   fi
   ```

   运行此脚本查询 MSK Replicator 的故障消息：

   ```
   <path-to-your-kafka-installation>/bin/query-replicator-failure-message.sh --replicator-arn $REPLICATOR_ARN --bootstrap-server $TARGET_CLUSTER_SERVER_STRING --consumer.config $CONSUMER_CONFIG_FILE
   ```

   此脚本输出了所有错误及其异常消息，以及受影响的主题分区。由于该主题包含了所有历史失败消息，因此应使用最后一条消息开始调查。以下是失败消息的示例：

   ```
   ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
   ```

**常见故障和解决方案**  
下面介绍了 MSK Replicator 的常见故障以及如何缓解这些故障。

**消息大小大于 max.request.size**  
*原因：*单封邮件大小超过 10 MB（默认最大值）。  
下面是此失败消息类型的示例。  

```
ReplicatorException: The message is 20635370 bytes when serialized which is larger than 10485760, which is the value of the max.request.size configuration. Topic: test, Partition: 1
```
*解决方案：*缩小主题中单个消息的大小。如果不能，请按照[要求提高限额的](limits.md#request-msk-quota-increase)说明进行操作。

**消息大小大于服务器能接受的最大消息大小**  
*原因：*消息大小超过目标集群的最大消息大小。  
下面是此失败消息类型的示例。  

```
ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
```
*解决方案：*增加目标集群或主题的`max.message.bytes`配置。参见 [max.message.bytes。](https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes)

**时间戳超出范围**  
*原因：*消息时间戳超出了目标集群的允许范围。  
下面是此失败消息类型的示例。  

```
ReplicatorException: Timestamp 1730137653724 of message with offset 0 is out of range. The timestamp should be within [1730137892239, 1731347492239] Topic: test, Partition: 1
```
*解决方案：*更新目标集群的`message.timestamp.before.max.ms`配置。参见 m [essag](https://kafka.apache.org/documentation/#topicconfigs_message.timestamp.before.max.ms) e.timestamp.before.max.ms。

**记录批处理过大**  
*原因：*记录批量大小超过了目标集群上为主题设置的区段大小。MSK 复制器的最大批处理大小为 1MB。  
下面是此失败消息类型的示例。  

```
ReplicatorException: The request included message batch larger than the configured segment size on the server. Topic: test, Partition: 1
```
*解决方案：*将目标集群更新`segment.bytes`为至少 1048576 (1 MB)。参见[分段.bytes。](https://kafka.apache.org/documentation/#topicconfigs_segment.bytes)

**注意**  
如果在应用这些解决方案后`ReplicatorFailure`指标继续发出非零值，请重复故障排除过程，直到该指标发出零值。

## 对来自自管 Kafka 集群的复制进行故障排除
<a name="msk-replicator-ts-external"></a>

### MSK Replicator 无法连接到自建的 Kafka 集群
<a name="msk-replicator-ts-external-connect"></a>

如果 MSK Replicator 无法连接到您的自管理 Kafka 集群，请执行以下检查：

1. 验证您的 VPN 或 Direct Connect 连接是否处于活动状态以及路由表是否正确。

1. 确认安全组允许 SASL\_SSL端口（通常为 9096）上来自 MSK Replicator 子网的入站流量。

1. 验证从 VPC 到自管理集群代理主机名的 DNS 解析。

1. 查看 Amazon 中的`KafkaClusterPingSuccessCount`指标 CloudWatch — 值为 0 表示连接失败。

### SASL/SCRAM 身份验证失败
<a name="msk-replicator-ts-external-sasl"></a>

如果`AuthError`指标不为零或 Replicator 日志显示 SASL/SCRAM 错误：

1. 验证存储在 Secrets Manager 中的 Amazon 凭据是否与自行管理集群上的 SCRAM 用户凭据相匹配。

1. 验证 SCRAM 用户是否具有所需的 ACL 权限（读取、描述主题；在使用者组上读取、描述；在集群上描述）。

1. 检查`AuthError`指标以确认身份验证错误，并使用该`ClusterAlias`维度确定源集群还是目标集群受到影响。

### SSL 证书问题
<a name="msk-replicator-ts-external-ssl"></a>

如果 Replicator 无法与自行管理的集群建立安全连接：

1. 验证 S Amazon ecrets Manager 中的`certificate`值是否包含 PEM 格式的完整 CA 证书链。

1. 验证是否已在所有自行管理的集群代理上配置了 SSL 侦听器。

1. 确认证书尚未过期，并且是由受信任的 CA 颁发的。

### 自建集群的消费组偏移量同步失败
<a name="msk-replicator-ts-external-offsets"></a>

如果消费组偏移量未正确同步：

1. 验证`ConsumerGroupOffsetSyncFailure`指标 — 它应该是 0。

1. 验证消费组是否在源集群上处于活跃状态（非活动消费组可能不同步）。

1. 对于双向复制，请确认在两个复制器`true`上都将其设置`synchroniseConsumerGroupOffsets`为。