本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用者 - 标签监控
通过监控使用者滞后,您可以识别无法跟上主题中可用的最新数据的缓慢或卡住的使用者。如有必要,您可以采取补救措施,例如扩展或重启这些使用者。要监控使用者滞后,您可以使用 Amazon CloudWatch、Prometheus 或 Burrow 的开源监控系统。
用于 CloudWatch 和 Prometheus 开源监控系统的用户标签指标
使用者滞后指标量化写入主题的最新数据与应用程序读取的数据之间的差异。Amazon MSK 提供以下使用者滞后指标,您可以通过 Amazon CloudWatch 或通过
Prometheus 的开源监控获得这些指标:EstimatedMaxTimeLag
、EstimatedTimeLag
、MaxOffsetLag
、OffsetLag
和 SumOffsetLag
。 有关这些指标的信息,请参阅结合使用 Amazon MSK 和 CloudWatch 指标进行监控。
使用 Burrow 进行使用者标签监控
Burrow 是 Apache Kafka 的监控配套工具,可提供使用器滞后检查。Burrow 具有一个模块化设计,其中包括以下子系统:
-
集群运行一个 Apache Kafka 客户端,该客户端会定期更新主题列表和每个分区的当前 HEAD 偏移量(最新的偏移量)。
-
使用器从存储库中提取有关使用器组的信息。此存储库可以是 Apache Kafka 集群(使用
__consumer_offsets
主题)、ZooKeeper 或其他某个存储库。 -
存储子系统将所有此类信息存储在 Burrow 中。
-
求值程序子系统从存储子系统中检索特定使用器组的信息,并计算该组的状态。这将遵循使用器滞后评估规则
。 -
通知程序子系统根据配置的时间间隔请求使用器组的状态,并为满足配置的条件的组发出通知(通过电子邮件、HTTP 或其他方法)。
-
HTTP Server 子系统为 Burrow 提供一个 API 接口来提取有关集群和使用器的信息。
有关 Burrow 的更多信息,请参阅 Burrow - Kafka 使用器滞后检查
确保 Burrow 与您用于 MSK 集群的 Apache Kafka 版本兼容。
利用 Amazon MSK 设置和使用 Burrow
如果您使用纯文本通信,请按照此步骤操作。对于 TLS,请参阅下个过程。
-
在集群所在的同一 VPC 中创建 MSK 集群并启动客户端计算机。例如,您可以按照开始使用 Amazon MSK 上的说明执行操作。
-
在充当客户端计算机的 EC2 实例上运行以下命令。
sudo yum install go
-
在客户端计算机上运行以下命令以获取 Burrow 项目。
go get github.com/linkedin/Burrow
-
运行以下命令以安装 dep。这会将 dep 安装到
/home/ec2-user/go/bin/dep
文件夹中。curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
-
转到
/home/ec2-user/go/src/github.com/linkedin/Burrow
文件夹并运行以下命令。/home/ec2-user/go/bin/dep ensure
-
在同一文件夹中运行以下命令。
go install
-
打开
/home/ec2-user/go/src/github.com/linkedin/Burrow/config/burrow.toml
配置文件以进行编辑。在配置文件的以下部分中,将占位符替换为 MSK 集群的名称、ZooKeeper 服务器的 host:port 对以及引导代理。要获取 ZooKeeper host:port 对,请描述 MSK 集群并查找
ZookeeperConnectString
的值。 请参阅获取 ZooKeeper 的 Apache Amazon MSK 集群 连接字符串。要获得引导代理,请参阅获取 Amazon MSK 集群的引导代理。
在编辑配置文件时,请遵循下面显示的格式。
[zookeeper] servers=[ "
ZooKeeper-host-port-pair-1
", "ZooKeeper-host-port-pair-2
", "ZooKeeper-host-port-pair-3
" ] timeout=6 root-path="/burrow" [client-profile.test] client-id="burrow-test" kafka-version="0.10.0" [cluster.MSK-cluster-name
] class-name="kafka" servers=[ "bootstrap-broker-host-port-pair-1
", "bootstrap-broker-host-port-pair-2
", "bootstrap-broker-host-port-pair-3
" ] client-profile="test" topic-refresh=120 offset-refresh=30 [consumer.MSK-cluster-name
] class-name="kafka" cluster="MSK-cluster-name
" servers=[ "bootstrap-broker-host-port-pair-1
", "bootstrap-broker-host-port-pair-2
", "bootstrap-broker-host-port-pair-3
" ] client-profile="test" group-blacklist="^(console-consumer-|python-kafka-consumer-|quick-).*$" group-whitelist="" -
在
go/bin
文件夹中,运行以下命令。./Burrow --config-dir /home/ec2-user/go/src/github.com/linkedin/Burrow/config
-
查看
bin/log/burrow.log
文件中的错误。 -
可以使用以下命令来测试您的设置。
curl -XGET 'HTTP://
your-localhost-ip
:8000/v3/kafka' -
有关所有受支持的 HTTP 请求和链接,请参阅 Burrow HTTP 终端节点
。
将 Burrow 与 TLS 结合使用
除上述过程外,如果您使用 TLS 通信,请参阅以下步骤。
-
运行以下命令。
sudo yum install java-1.8.0-openjdk-devel -y
-
根据需要调整路径后,运行以下命令。
find /usr/lib/jvm/ -name "cacerts" -exec cp {} /tmp/kafka.client.truststore.jks \;
-
在下一步中,您可以使用
keytool
命令,该命令要求输入密码。默认密码为changeit
。 我们建议您在继续执行下一步之前,运行以下命令来更改密码。keytool -keystore /tmp/kafka.client.truststore.jks -storepass changeit -storepasswd -new Password
-
运行以下命令。
keytool --list -rfc -keystore /tmp/kafka.client.truststore.jks >/tmp/truststore.pem
您需要
burrow.toml
文件的truststore.pem
,本过程后文中有说明。 -
要生成 certfile 和 keyfile,请使用通过 Amazon MSK 管理双向身份验证的客户端证书
中的代码。您需要 pem
标志。 -
设置
burrow.toml
文件,如下例所示。您可以拥有多个集群和使用者部分,以使用一个 burrow 集群监视多个 MSK 集群。您还可以在client-profile
下调整 Apache Kafka 版本。 它代表 Apache Kafka 支持的客户端版本。有关更多信息,请参阅 Burrow 上的客户端配置文件GitHub。 [general] pidfile="burrow.pid" stdout-logfile="burrow.out" [logging] filename="/tmp/burrow.log" level="info" maxsize=100 maxbackups=30 maxage=10 use-localtime=false use-compression=true [zookeeper] servers=[ "
ZooKeeperConnectionString
" ] timeout=6 root-path="/burrow" [client-profile.msk1-client] client-id="burrow-test" tls="msk-mTLS" kafka-version="2.0.0" [cluster.msk1] class-name="kafka" servers=[ "BootstrapBrokerString
" ] client-profile="msk1-client" topic-refresh=120 offset-refresh=30 [consumer.msk1-cons] class-name="kafka" cluster="msk1" servers=[ "BootstrapBrokerString
" ] client-profile="msk1-client" group-blacklist="^(console-consumer-|python-kafka-consumer-|quick-).*$" group-whitelist="" [httpserver.default] address=":8000" [storage.default] class-name="inmemory" workers=20 intervals=15 expire-group=604800 min-distance=1 [tls.msk-mTLS] certfile="/tmp/client_cert.pem" keyfile="/tmp/private_key.pem" cafile="/tmp/truststore.pem" noverify=false