监控消费者滞后 - Amazon Managed Streaming for Apache Kafka
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

监控消费者滞后

通过监控消费者滞后,您可以识别不能跟上主题中最新数据的缓慢或卡住的消费者。必要时,您可以采取补救措施,例如扩展或重新启动这些消费者。要监控消费者滞后,您可以使用 Amazon CloudWatch、使用 Prometheus 开放监控或 Burrow。

CloudWatch 和使用 Prometheus 进行开放监控的消费者滞后指标

消费者滞后指标量化写入主题的最新数据与应用程序读取的数据之间的差异。亚马逊 MSK 提供了以下消费者滞后指标,您可以通过 Amazon CloudWatch 或通过 Prometheus 开放监控获得这些指标:EstimatedMaxTimeLagEstimatedTimeLagMaxOffsetLagOffsetLag, 和SumOffsetLag. 有关这些指标的信息,请参阅。使用 CloudWatch 进行监控的亚马逊 MSK 指标.

使用 Burrow 监控消费者滞后

Burrow 是 Apache Kafka 的监控配套工具,可提供使用器滞后检查。Burrow 具有一个模块化设计,其中包括以下子系统:

  • 集群运行一个 Apache Kafka 客户端,该客户端会定期更新主题列表和每个分区的当前 HEAD 偏移量(最新的偏移量)。

  • 使用器从存储库中提取有关使用器组的信息。此存储库可以是 Apache Kafka 集群(使用 __consumer_offsets 主题)、ZooKeeper 或某个其他的存储库。

  • 存储子系统将所有此类信息存储在 Burrow 中。

  • 求值程序子系统从存储子系统中检索特定使用器组的信息,并计算该组的状态。这将遵循使用器滞后评估规则

  • 通知程序子系统根据配置的时间间隔请求使用器组的状态,并为满足配置的条件的组发出通知(通过电子邮件、HTTP 或其他方法)。

  • HTTP Server 子系统为 Burrow 提供一个 API 接口来提取有关集群和使用器的信息。

有关 Burrow 的更多信息,请参阅 Burrow - Kafka 使用器滞后检查

重要

确保 Burrow 与您用于 MSK 集群的 Apache Kafka 版本兼容。

将 Burrow 与 Amazon MSK 结合使用

如果您使用纯文本通信,请按照此步骤操作。对于 TLS,请参阅下个过程。

  1. 创建 MSK 集群并在集群所在的同一 VPC 中启动客户端计算机。例如,您可以按照开始使用 上的说明执行操作。

  2. 在充当客户端计算机的 EC2 实例上运行以下命令。

    sudo yum install go
  3. 在客户端计算机上运行以下命令以获取 Burrow 项目。

    go get github.com/linkedin/Burrow
  4. 运行以下命令以安装 dep。这会将 dep 安装到 /home/ec2-user/go/bin/dep 文件夹中。

    curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
  5. 转到 /home/ec2-user/go/src/github.com/linkedin/Burrow 文件夹并运行以下命令。

    /home/ec2-user/go/bin/dep ensure
  6. 在同一文件夹中运行以下命令。

    go install
  7. 打开 /home/ec2-user/go/src/github.com/linkedin/Burrow/config/burrow.toml 配置文件以进行编辑。在配置文件的以下部分中,将占位符替换为 MSK 集群的名称、ZooKeeper 服务器的 host: port 对以及引导代理。

    要获取您的 ZooKeeper host:port 对,请描述 MSK 集群并查找 ZookeeperConnectString 的值。请参阅 获取 Amazon MSK 集群的 Apache ZooKeeper 连接字符串

    要获得引导代理,请参阅为亚马逊 MSK 集群获取引导经纪商

    在编辑配置文件时,请遵循下面显示的格式。

    [zookeeper] servers=[ "ZooKeeper-host-port-pair-1", "ZooKeeper-host-port-pair-2", "ZooKeeper-host-port-pair-3" ] timeout=6 root-path="/burrow" [client-profile.test] client-id="burrow-test" kafka-version="0.10.0" [cluster.MSK-cluster-name] class-name="kafka" servers=[ "bootstrap-broker-host-port-pair-1", "bootstrap-broker-host-port-pair-2", "bootstrap-broker-host-port-pair-3" ] client-profile="test" topic-refresh=120 offset-refresh=30 [consumer.MSK-cluster-name] class-name="kafka" cluster="MSK-cluster-name" servers=[ "bootstrap-broker-host-port-pair-1", "bootstrap-broker-host-port-pair-2", "bootstrap-broker-host-port-pair-3" ] client-profile="test" group-denylist="^(console-consumer-|python-kafka-consumer-|quick-).*$" group-allowlist=""
  8. go/bin 文件夹中,运行以下命令。

    ./Burrow --config-dir /home/ec2-user/go/src/github.com/linkedin/Burrow/config
  9. 查看 bin/log/burrow.log 文件中的错误。

  10. 可以使用以下命令来测试您的设置。

    curl -XGET 'HTTP://your-localhost-ip:8000/v3/kafka'
  11. 有关所有受支持的 HTTP 请求和链接,请参阅 Burrow HTTP 终端节点

将 Burrow 与 TLS 结合使用

除上述过程外,如果您使用 TLS 通信,请参阅以下步骤。

  1. 运行以下 命令。

    sudo yum install java-1.8.0-openjdk-devel -y
  2. 根据需要调整路径后,运行以下命令。

    find /usr/lib/jvm/ -name "cacerts" -exec cp {} /tmp/kafka.client.truststore.jks \;
  3. 在下一步中,您可以使用 keytool 命令,该命令要求输入密码。默认密码为 changeit。我们建议您在继续执行下一步之前,运行以下命令来更改密码。

    keytool -keystore /tmp/kafka.client.truststore.jks -storepass changeit -storepasswd -new Password
  4. 运行以下 命令。

    keytool --list -rfc -keystore /tmp/kafka.client.truststore.jks >/tmp/truststore.pem

    您需要 burrow.toml 文件的 truststore.pem,本过程后文中有说明。

  5. 要生成 certfile 和 keyfile,请使用通过 Amazon MSK 管理双向身份验证的客户端证书中的代码。您需要 pem 标志。

  6. 设置 burrow.toml 文件,如下例所示。您可以拥有多个集群和使用者部分,以使用一个 burrow 集群监视多个 MSK 集群。您还可以在 client-profile 下调整 Apache Kafka 版本。它代表 Apache Kafka 支持的客户端版本。有关更多信息,请参阅 Burrow GitHub 上的客户端配置文件

    [general] pidfile="burrow.pid" stdout-logfile="burrow.out" [logging] filename="/tmp/burrow.log" level="info" maxsize=100 maxbackups=30 maxage=10 use-localtime=false use-compression=true [zookeeper] servers=[ "ZooKeeperConnectionString" ] timeout=6 root-path="/burrow" [client-profile.msk1-client] client-id="burrow-test" tls="msk-mTLS" kafka-version="2.0.0" [cluster.msk1] class-name="kafka" servers=[ "BootstrapBrokerString" ] client-profile="msk1-client" topic-refresh=120 offset-refresh=30 [consumer.msk1-cons] class-name="kafka" cluster="msk1" servers=[ "BootstrapBrokerString" ] client-profile="msk1-client" group-denylist="^(console-consumer-|python-kafka-consumer-|quick-).*$" group-allowlist="" [httpserver.default] address=":8000" [storage.default] class-name="inmemory" workers=20 intervals=15 expire-group=604800 min-distance=1 [tls.msk-mTLS] certfile="/tmp/client_cert.pem" keyfile="/tmp/private_key.pem" cafile="/tmp/truststore.pem" noverify=false