Amazon Managed Streaming for Apache Kafka 主题作为源 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Managed Streaming for Apache Kafka 主题作为源

你可以使用 Pipes 接收来自亚马逊托 EventBridge 管流媒体 Apache Kafka Kafka(亚马逊 MSK)主题的记录。您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,您可以选择特定于 Amazon MSK 的设置。 EventBridge 在将数据发送到目标时,Pipes 会维护消息代理中记录的顺序。

Amazon MSK 是一项完全托管的服务,可构建并运行应用程序,使用 Apache Kafka 处理流数据。Amazon MSK 简化了运行 Apache Kafka 的集群的设置、扩展和管理。借助 Amazon MSK,您可以为多个可用区配置应用程序,并通过 Amazon Identity and Access Management (IAM) 配置应用程序的安全性。Amazon MSK 支持多个开源版本的 Kafka。

亚马逊 MSK 作为来源的运作方式与使用亚马逊简单队列服务 (Amazon SQS) 或亚马逊 Kinesis 类似。 EventBridge在内部轮询来自源的新消息,然后同步调用目标。 EventBridge 批量读取消息,并将这些消息作为事件负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)

对于基于 Apache Kafka 的源, EventBridge 支持处理控制参数,例如批处理窗口和批处理大小。

EventBridge 按顺序读取每个分区的消息。 EventBridge 处理完每个批次后,它会提交该批次中消息的偏移量。如果管道的目标对批处理中的任何消息返回错误,则 EventBridge 会重试整批消息,直到处理成功或消息过期。

EventBridge 当它调用目标时,会发送事件中的一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 亚马逊 EventBridge 管道过滤

[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

轮询和流的起始位置

请注意,管道创建和更新期间的流源轮询最终将是一致的。

  • 在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着,如果您指定 LATEST 作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON

MSK 集群身份验证

EventBridge 需要权限才能访问 Amazon MSK 集群、检索记录和执行其他任务。Amazon MSK 支持通过多种选项来控制客户端对 MSK 集群的访问。有关在哪种情况下使用哪种身份验证方法的更多信息,请参阅 如何 EventBridge 选择引导代理

未经身份验证的访问

我们建议仅使用未经身份验证的访问权限进行开发。仅当集群禁用基于 IAM 角色的身份验证时,未经身份验证的访问权限才有效。

SASL/SCRAM 身份验证

Amazon MSK 支持使用传输层安全性协议(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。 EventBridge 要连接到集群,您需要将身份验证凭据(登录凭据)存储在 Amazon Secrets Manager 密钥中。

有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 Amazon Secrets Manager进行用户名和密码身份验证

Amazon MSK 不支持 SASL/PLAIN 身份验证。

基于 IAM 角色的身份验证

您可以使用 IAM 来验证连接到 MSK 集群的客户端的身份。如果您的 MSK 集群上已启用 IAM 身份验证,并且您没有提供身份验证密钥,则 EventBridge 会自动默认为使用 IAM 身份验证。要创建和部署基于 IAM 用户或角色的策略,请使用 IAM 控制台或 API。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM 访问控制

要允许连接 EventBridge 到 MSK 集群、读取记录和执行其他必需的操作,请向管道的执行角色添加以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK Kafka 操作

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

对于 Amazon MSK,则 EventBridge 充当客户。您可以将客户端证书(作为 Secrets Manager 中的密钥)配置为向 MSK 集群中的代理进行身份验证 EventBridge 。客户端证书必须由服务器信任存储中的证书颁发机构 (CA) 签名。MSK 集群向发送服务器证书,用于 EventBridge 对代理进行 EventBridge身份验证。服务器证书必须由 Amazon 信任存储区中的 CA 签名。

Amazon MSK 不支持自签名服务器证书,因为 Amazon MSK 中的所有经纪人都使用由亚马逊信任服务 CA 签署的公共证书,默认情况下该 EventBridge 证书是信任的。

有关适用于 Amazon MSK 的 mTLS 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的双向 TLS 身份验证

配置 mTLS 密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

EventBridge 支持 PBES1(但不支持 PBES2)私钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

如何 EventBridge 选择引导代理

EventBridge 根据集群上可用的身份验证方法以及您是否为身份验证提供密钥来选择引导代理。如果您为 mTLS 或 SASL/SCRAM 提供了密钥,则 EventBridge 会自动选择该身份验证方法。如果您不提供密钥,请 EventBridge选择集群上处于活动状态的最强身份验证方法。以下是 EventBridge 选择代理的优先顺序,从最强到最弱的身份验证:

  • mTLS(为 mTLS 提供的密钥)

  • SASL/SCRAM(为 SASL/SCRAM 提供的密钥)

  • SASL IAM(未提供密钥,IAM 身份验证处于活动状态)

  • 未经身份验证的 TLS(未提供密钥,IAM 身份验证未处于活动状态)

  • 纯文本(未提供密钥,IAM 身份验证和未经身份验证的 TLS 均未处于活动状态)

注意

如果 EventBridge 无法连接到最安全的代理类型,则不会尝试连接到其他(较弱的)代理类型。如果 EventBridge 要选择较弱的代理类型,请停用集群上所有更强的身份验证方法。

网络配置

EventBridge 必须有权访问与您的亚马逊 MSK 集群关联的亚马逊虚拟私有云(亚马逊 VPC)资源。

  • 要访问您的 Amazon MSK 集群的 VPC, EventBridge 可以使用源子网的出站互联网访问权限。对于公有子网,它必须是托管 NAT 网关。对于私有子网,它可以是 NAT 网关,也可以是您自己的 NAT。确保此 NAT 具有公共 IP 地址,可以连接到互联网。

  • EventBridge Pipes 还支持通过传送事件 Amazon PrivateLink,允许您在不通过公共互联网的情况下将事件从位于 Amazon Virtual Private Cloud (Amazon VPC) 的事件源发送到 Pipes 目标。您可以使用 Pipes 从 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、自我管理的 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。

    要设置 VPC 终端节点,请参阅Amazon PrivateLink 用户指南中的创建 VPC 终端节点。对于服务名称,请选择com.amazonaws.region.pipes-data

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则-允许 Amazon MSK 代理端口上为您的源指定的安全组的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon MSK 代理端口上为您的源指定的安全组的所有流量。

    代理端口包括:

    • 9092 表示纯文本

    • 9094 适用于 TLS

    • 9096 for SASL

    • 9098 for IAM

注意

可通过 Amazon MSK API 发现您的 Amazon VPC 配置。在设置过程中,您不需要对其进行配置。

可自定义的使用者组 ID

将 Apache Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望管道加入的 Apache Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Apache Kafka 记录处理设置从其他使用者迁移到。 EventBridge

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Apache Kafka 会向所有使用者分发消息。换句话说, EventBridge 不会收到 Apache Kafka 主题的所有消息。如果 EventBridge 要处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果您指定了使用者组 ID,而 Apache Kafka 找到了具有相同 ID 的有效现有使用者组,则 EventBridge 会忽略管道的StartingPosition参数。而是 EventBridge 开始根据使用者组的已提交偏移量处理记录。如果您指定了使用者组 ID,而 Apache Kafka 找不到现有的使用者组,则使用指定的来 EventBridge 配置您的来源。StartingPosition

在所有 Apache Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建管道后,无法更新此值。

Amazon MSK 源的自动扩缩

最初创建 Amazon MSK 源时, EventBridge 会分配一个使用者来处理 Apache Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,还可根据工作负载 EventBridge 自动增加或缩小使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

每隔一分钟, EventBridge 评估主题中所有分区的消费偏移延迟。如果延迟太高,则表示分区接收消息的速度 EventBridge 快于处理消息的速度。如有必要,在主题中 EventBridge 添加或删除消费者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果您的目标超负荷,则 EventBridge 减少消费者的数量。此操作通过减少使用者可以检索和发送到管道的消息数,来减少管道的工作负载。