本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
以自我管理的 Apache Kafka 直播作为来源
你可以使用 EventBridge 管道从自我管理的 Apache Kafka 接收记录。然后,您可以选择筛选或增强这些记录,然后将其发送到可用目的地进行处理。在设置管道时,可以选择特定于源的设置。 EventBridge 当集群将数据发送到目标时,管道会保持从集群收到的记录的顺序。
Apache Kafka 是一个开源事件流平台,支持数据管道和流分析等工作负载。您可以使用Amazon托管的 Apache Kafka 服务 Amazon Managed Streaming for Apache Kafka for Apache Kafka,Amazon MSK 集群。
本主题介绍了如何在自行托 EventBridge 管的 Apache Kafka 集群中使用。在Amazon术语中,自行管理的群集包括非Amazon托管 Apache Kafka 集群。例如,您可以使用云提供商(如)来托管 Apache Kafka 集群CloudKarafka
Apache Kafka 作为源代码的运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon EventBridge 在内部轮询来自源的新消息,然后同步调用目标。 EventBridge 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)
对于基于 Apache Kafka 的源代码, EventBridge 支持处理控制参数,例如批处理时段和批处理大小。
EventBridge 调用管道时会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Apache Kafka 主题和 Apache Kafka 分区标识符的详细信息,以及时间戳和 base64 编码的消息。
示例事件
以下示例事件显示管道接收到的信息。您可以使用此事件来创建和过滤事件模式,或定义输入转换。并非所有字段都可以过滤。更多有关您可以筛选哪些字段的信息,请参阅亚马逊 EventBridge 管道筛选。
[ { "eventSource": "SelfManagedKafka", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]
Apache Kafka 群身份验证
EventBridge 管道支持多种方法来使用自行管理的 Apache Kafka 集群进行身份验证的方法。请确保将 Apache Kafka 集配置为使用支持的下列身份验证方法之一:有关 Apache Kafka 安全的更多信息,请参阅 Apache Kafka 文档的安全
VPC 访问
如果仅 VPC 中的 Apache Kafka 用户访问 Apache Kafka 代理,则您必须配置 Apache Kafka 源以访问 Amazon Virtual Private Cloud pache Kafka 源。
SASL/SCRAM 身份验证
EventBridge Pipes 支持使用传输层安全性协议(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证方式。 EventBridge 管道发送已加密凭据以使用集群进行身份验证。有关 SASL/SCRAM 身份验证的更多信息,请参阅 RFC 5802
EventBridge 管道支持使用 TLS 加密进行 SASL/PLAIN 身份身份身份验证 使用 SASL/PLAIN 身份验证 EventBridge 时,Pipes 会将凭证以明文(未加密)形式发送到服务器。
为了 SASL 身份验证,需要将登录凭据作为密钥存储在中Amazon Secrets Manager。
双向 TLS 身份验证
双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。
在自行托管的 Apache Kafka 中, EventBridge Pipes 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以使用 Apache Kafka 代理对 Pipets 进行身份验证 EventBridge 了。客户端证书必须由服务器信任存储中的证书颁发机构(CA)签名。
Apache Kafka 集群向 Pipes 发送服务器证书,以便 EventBridge 使用 Pipes 对 Apache Kafka 代理进行 EventBridge 身份验证。服务器证书可以是公有 CA 证书。也可以是私有 CA/自签名证书。公有 CA 证书必须由 Pipes 信任存储中的 EventBridge CA 签名。对于私有 CA /自签名证书,您可以配置服务器根 CA 证书(作为 Secrets Manager 中的密钥)。 EventBridge 管道使用根证书来验证 Apache Kafka 代理。
有关 mTLS 的更多信息,请参阅为作为源的 Amazon MSK 引入双向 TLS 身份验证
配置客户端证书密钥
CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。
注意
EventBridge 管道支持 PBES1
证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。
私有密钥必须采用 PKCS #8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
对于加密的私有密钥,请使用以下结构:
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,可以在密钥中包含私有密钥密码。
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
配置服务器根 CA 证书密钥
如果您的 Apache Kafka 代理使用 TLS 加密(具有由私有 CA 签名的证书),则创建此密钥。您可以将 TLS 加密用于 VPC、SASL/SCRAM、SASL/PLAIN 或 mTLS 身份验证。
服务器根 CA 证书密钥需要一个字段,其中包含 PEM 格式的 Apache Kafka 代理的根 CA 证书。以下示例显示密钥的结构。
{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"
执行角色权限
设置管道时,您可以使用现有的执行角色,也可以使用所需权限为您 EventBridge 创建一个管道。对于自我管理的 Apache Kafka, EventBridge 需要以下权限才能管理与您自己管理的 Apache Kafka 直播相关的资源。如果您要设置自己的执行角色,则必须自己添加这些权限。
所需的 权限
要在 Amazon Logs 中创建日志并将 CloudWatch 日志存储到日志组,管道必须在它的执行角色中具有以下权限:
注意
如果您不确定访问源代码所需的确切范围内的权限,请使用 Pi EventBridge pes 控制台创建新角色,然后检查策略中列出的操作。
可选的权限
您的管道还可能需要权限来:
描述您的 Secrets Manager 密钥。
访问 Amazon Key Management Service(Amazon KMS)客户管理的密钥。
访问 Amazon VPC。
Secrets Manager 和 Amazon KMS 权限
根据您为 Apache Kafka 代理配置的访问控制类型,管道可能需要访问您的 Secrets Manager 密钥或解密Amazon KMS客户管理的密钥的权限。要连接到这些资源,函数的执行角色必须具有以下权限:
VPC 权限
如果只有 VPC 内的用户才能访问您的自行管理的 Apache Kafka 集群,则您的管道必须具有访问 Amazon VPC 资源的权限。这些资源包括您的 VPC、子网、安全组和网络接口。要连接到这些资源,管道的执行角色必须具有以下权限:
网络配置
EventBridge 必须具有对与 Apache Kafka 代理关联的 Amazon Virtual Private Cloud (Amazon VPC) 资源的 要访问 Apache Kafka 集群的 VPC, EventBridge 需要对源子网进行出站互联网访问。对于公有子网,这可以是互联网网关,对于私有子网,它需要是网络地址转换器 (NAT),例如 NAT 网关或您自己的 NAT。请确保 NAT 具有公共 IP 地址并且可以连接到互联网。
使用以下规则配置您的 Amazon VPC 安全组(至少):
-
入站规则 — 允许为源指定之安全组的 Apache Kafka 代理端口(9092 对应 SASL,9094 对应 SASL,9094 对应 IAM)上的所有流量。
-
出站规则 – 允许所有目标的端口 443 上的所有流量传输。对于为源指定之安全组,允许 Apache Kafka 代理端口(9092 对应 SASL,9094 对应 SASL,9094 对应 IAM)上的所有流量。
使用自管理 Apache Kafka; 作为源代码配置管道
添加自管理的 Apache Kafka 直播源
使用控制台添加自管理的 Apache Kafka 源代码
通过 https://console.aws.amazon.com/events/
打开亚马逊 EventBridge 控制台。 在导航窗格中,选择管道。
选择 “创建管道”。
输入管道的名称。
(可选)添加管道的描述。
在 “生成管道” 选项卡上,为 “源” 选择 “自管理 Apache Kafka”。
对于 Bootstrap 服务器,请输入您的经纪人的
host:port
配对地址。在 “主题名称” 中,输入管道将从中读取的主题名称。
(可选)对于 VPC,选择所需的 VPC。然后,对于 VPC 子网,选择所需的子网。对于 VPC 安全组,选择安全组。
(可选)对于 “身份验证-可选”,打开 “使用身份验证” 并执行以下操作:
对于身份验证方法,选择身份验证类型。
对于密钥,选择密钥。
(可选)对于 Ad dditions(可选),执行以下操作:
对于起始位置,请选择以下选项之一:
最@@ 新 — 使用分片中的最新记录开始读取直播。
修剪地平线 — 使用分片中最后一条未修剪的记录开始读取数据流。这是碎片中最古老的记录。
在 “Batch 大小-可选” 中,输入每个批次的最大记录数。默认值为 100。
对于 Batch 窗口-可选,输入在继续操作之前收集记录的最大秒数。
现在已经配置了源,您可以为管道添加可选的筛选、可选的扩展或目标。
(可选)配置筛选
您可以向管道添加筛选功能,这样您就只能将Amazon MQ队列中的一部分记录发送到目标。
使用控制台配置筛选
选择 “筛选”。
在 S ample event-可选下,您将看到一个示例 Amazon MQ 事件,您可以使用该事件来构建事件模式,也可以选择 Enter your Own 来进入自己的事件。
在 “事件模式” 下,输入要筛选记录的事件模式。更多有关构建事件模式的信息,请参阅Amazon EventBridge 事件模式。
以下是示例事件模式,该模式仅发送城市字段中值为 Seatt le 的事件。
{ "data": { "City": ["Seattle"] } }
现在已对事件进行过滤,您可以为管道添加可选的扩充和目标。
(可选)定义丰富
您可以将事件数据发送到 Lambda 函数、Amazon Step Functions状态机、Amazon API Gateway 或 API 目标进行扩展。
选择充实
选择 “丰富”。
在 “详细信息” 下的 “服务” 中,选择要用于丰富内容的服务和相关设置。
您也可以在发送数据进行增强之前对其进行转换。
(可选)定义输入变压器
选择增益输入变压器-可选。
对于示例事件/事件负载,选择示例事件类型。
对于 Tran
<$.detail.field>
sformer,输入转换器语法,例如,"Event happened at <$.detail.field>."
其中引用了示例事件中的字段。也可以双击示例事件中的字段,将其添加到转换器中。对于输出,验证输出是否与您想要的相似。
现在,数据已经过筛选和增强,您必须定义要将事件数据发送到的目标。
配置目标
配置目标
选择目标。
在 “详细信息” 下的 “目标服务” 中,选择所需的目标。显示的字段因您所选的目标而异。根据需要输入特定于此目标类型的信息。
您也可以在将数据发送到目标之前对其进行转换。
(可选)定义输入变压器
选择目标输入变压器-可选。
对于示例事件/事件负载,选择示例事件类型。
对于 Tran
<$.detail.field>
sformer,输入转换器语法,例如,"Event happened at <$.detail.field>."
其中引用了示例事件中的字段。也可以双击示例事件中的字段,将其添加到转换器中。对于输出,验证输出是否与您想要的相似。
现在管道已配置完毕,请确保其设置配置正确。
配置管道设置
默认情况下,管道处于活动状态,但您可以将其停用。您还可以指定管道的权限并添加标签。
配置管道设置
选择 “管道设置” 选项卡。
默认情况下,新创建的管道在创建后立即处于活动状态。如果要创建非活动管道,请在 “激活” 下的 “激活管道” 中关闭 “活动”。
在 “权限” 下,对于 “执行角色”,执行以下操作之一:
要为此管道 EventBridge 创建新的执行角色,请选择为此特定资源创建新角色。 在角色名称下,您可以选择编辑角色名称。
要使用现有执行角色,请选择使用现有角色。在 “角色名称” 下,选择角色。
(可选)在 Tags-可选中,选择添加新标签,然后为规则输入一个或多个标签。有关更多信息,请参阅亚马逊 EventBridge 标签:
选择 “创建管道”。
Apache Kafka 源代码的自动伸缩
当您最初创建 Apache Kafka 源时, EventBridge 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,根据工作负载 EventBridge 自动增加或减少使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。
按一分钟的间隔时间来 EventBridge 评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 EventBridge 处理消息的速度更快。如有必要,在主题中 EventBridge 添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。
如果您的目标过载,则 EventBridge 减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。