本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
AmazManaged Streaming for Apache Kafka 主题作为来源
您可以使用 EventBridge Pipes 接收来自 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 主题的记录。在将这些记录发送到可用目的地之一进行处理之前,您可以选择筛选或增强这些记录。在设置管道时,您可以选择特定于 Amazon MSK 的设置。 EventBridge 在将数据发送到目的地时,管道会保持消息代理的记录顺序。
Amazon MSK 是一项完全托管式服务,可用于构建和运行使用 Apache Kafka 来处理流式传输数据的应用程序。Amazon MSK 简化了运行 Apache Kafka 的集群的设置、扩展和管理。使用 Amazon MSK,您可以为多个可用区和保证Amazon Identity and Access Management (IAAAAAAAAAAAAAAAAAAAAAAAAAAAA Amazon MSK 支持多个开源版本的 Kafka。
Amazon MSK 作为来源,运行方式与使用 Amazon Simple Queueue Service (Amazon SQS) 或 Amazon EventBridge 在内部轮询来自源的新消息,然后同步调用目标。 EventBridge 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)
对于基于 Apache Kafka 的源代码, EventBridge 支持处理控制参数,例如批处理时段和批处理大小。
EventBridge 按顺序读取各个分区的消息。 EventBridge 处理各个批次后,会提交该批次中消息的偏移量。如果管道的目标为批处理中的任何消息返回错误,将 EventBridge 重试整批消息,处理成功或消息过期为止。
EventBridge 在事件调用目标时会在事件中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。
示例事件
以下示例事件显示管道接收到的信息。您可以使用此事件来创建和过滤事件模式,也可以用于定义输入转换。并非所有字段都可以过滤。有关您可以筛选哪些字段的更多信息,请参阅亚马逊 EventBridge 管道筛选。
[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]
MSK 集群身份验证
EventBridge 需要访问 Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持通过多种选项来控制客户端对 MSK 集群的访问。有关何时使用哪种身份验证方法的更多信息,请参阅如何 EventBridge 选择引导代理。
未经身份验证的访问
我们建议仅使用未经身份验证的访问进行开发。只有在集群禁用基于 IAM 角色的身份验证时,未经身份验证的访问才有效。
SASL/SCRAM 身份验证
Amazon MSK 支持使用传输层安全性协议(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。为 EventBridge 连接到集群,您可以将身份验证凭证(登录凭证)存储在Amazon Secrets Manager密钥中。
有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 Amazon Secrets Manager 进行用户名和密码身份验证。
Amazon MSK 不支持 SASL/PLAIN 身份验证。
基于 IAM 角色的身份验证
您可以使用 IAM 来验证连接到 MSK 集群的客户端的身份。如果 IAM 身份验证在您的 MSK 集群上处于活动状态,并且您没有为身份验证提供密钥,则 IAM 身份验证在您的 MSK 集群上处于活动状态。 EventBridge 要创建和部署基于 IAM 用户或角色的策略,请使用 IAM 控制台或 API。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM 访问控制。
要连接 EventBridge 到 MSK 集群、读取记录和执行其他所需操作,请将以下权限添加到管道的执行角色。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:
region
:account-id
:cluster/cluster-name
/cluster-uuid
", "arn:aws:kafka:region
:account-id
:topic/cluster-name
/cluster-uuid
/topic-name
", "arn:aws:kafka:region
:account-id
:group/cluster-name
/cluster-uuid
/consumer-group-id
" ] } ] }
您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK Kafka 操作。
双向 TLS 身份验证
双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。
Amazon MSK EventBridge 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以 EventBridge 使用 MSK 集群中的代理进行身份验证。客户端证书必须由服务器信任存储中的证书颁发机构(CA)签名。MSK 集群会发送服务器证书, EventBridge 以使用代理进行身份验证 EventBridge。服务器证书必须由Amazon信任存储中的 CA 签名。
Amazon MSK 不支持自签名服务器证书,因为 Amazon MSK 中的所有代理都使用由 Amazon T rust Services CA
有关适用于 Amazon MSK 的 mTLS 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的双向 TLS 身份验证。
配置 mTLS 密钥
CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。
注意
EventBridge 支持 PBES1
证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。
私有密钥必须采用 PKCS #8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
对于加密的私有密钥,请使用以下结构:
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
以下示例显示使用加密私有密钥进行 MTL 身份验证的密钥内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
如何 EventBridge 选择引导代理
EventBridge 根据集群上可用的身份验证的密钥来选择引导代理。如果为 mTLS 或 SASL/SCRAM 提供了密钥, EventBridge 将自动选择该身份验证方法。如果不提供密钥,请 EventBridge选择在集群上处于活动状态的最强身份验证方法。下面是 EventBridge 选择代理的优先级顺序,从最强到最弱的身份验证:
mTLS(为 mTLS 提供的密钥)
SASL/SCRAM(为 SASL/SCRAM 提供的密钥)
SASL IAAAAAAAAAAAAAAAAAAAAAAAAA
未经身份验证的 TLS(未提供密钥,IAAAAAAAAAAAAAAAAA
纯文本(未提供密钥,IAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
注意
如果 EventBridge 无法连接到最安全的代理类型,则不会尝试连接到其他(较弱)代理类型。如果 EventBridge 要选择较弱的代理类型,请停用集群上所有更强的身份验证方法。
执行角色权限
设置管道时,您可以使用现有的执行角色,也可以使用所需权限为您 EventBridge 创建一个管道。对于亚马逊 MSK, EventBridge 需要以下权限才能管理与您的亚马逊 MSK 主题相关的资源。如果您要设置自己的执行角色,则必须自己添加这些权限。
注意
如果您使用基于 IAM 角色的身份验证,则 基于 IAM 角色的身份验证除了下面列出的权限外,您的执行角色还需要列出的权限。
注意
如果您不确定访问源代码所需的确切范围内的权限,请使用 Pi EventBridge pes 控制台创建新角色,然后检查策略中列出的操作。
网络配置
EventBridge 必须具有对与 Amazon Virtual Private Cloud MSK 集群关联的 Amazon VPC (Amazon VPC) 资源的访问权限。要访问您的 Amazon MSK 集群的 VPC, EventBridge 需要对源子网进行出站互联网访问。对于公共子网,这可以是互联网网关,对于私有子网,它需要是网络地址转换器 (NAT),例如 NA T 网关或您自己的 NAT。确保 NAT 具有公共 IP 地址并可以连接到互联网。
使用以下规则配置您的 Amazon VPC 安全组(至少):
-
入站规则 — 允许为源指定之安全组的 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。
-
出站规则 – 允许所有目标的端口 443 上的所有流量传输。对于为源指定的安全组,允许 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。
注意
可通过 Amazon MSK API 发现您的 Amazon VPC 配置。在安装过程中,您无需对其进行配置。
可自定义的使用者组 ID
将 Apache Kafka 设置为源时,您可以指定使用者组 ID。此使用者组 ID 是您希望管道加入的 Apache Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Apache Kafka 记录处理设置从其他使用者迁移到 EventBridge。
如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,Apache Kafka 会向所有使用者组分发消息。换句话说, EventBridge 不会收到 Apache Kafka 主题的所有消息。如果 EventBridge 要处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。
此外,如果指定了使用者组 ID,而 Apache Kafka 找到了具有相同 ID 的有效现有使用者组 ID 的有效现有使用者组 ID,则 Apache Kafka 找到了具有相同 ID 的有效现有使用 EventBridge 者组 ID 的StartingPosition
参数。相反, EventBridge 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Apache Kafka 找不到现有使用者组 ID,则 Apache Kafka 找不到现有使用者组 ID,则 EventBridge Apache Kafka 找不到现有使用StartingPosition
在所有 Apache Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建管道后,无法更新此值。
配置以 Amazon MSK 作为源的管道
添加Amazon MSK 主题源
使用控制台添加 Amazon MSK 源
通过 https://console.aws.amazon.com/events/
打开亚马逊 EventBridge 控制台。 在导航窗格中,选择 Pipes (管道)。
选择创建管道。
输入管道的名称。
(可选)添加管道的描述。
在 “生成管道” 选项卡上,为 “来源” 选择 Amazon MSK。
对于 Amazon MSK 集群,选择要修改的集群。
在主题名称中,输入管道将从中读取的主题名称。
(可选)在 “使用者组 ID-可选” 中,输入您希望管道加入的使用者组的 ID。
(可选)对于 “身份验证-可选”,打开 “使用身份验证” 并执行以下操作:
对于身份验证方法,选择所需的类型。
对于密钥,选择密钥。
(可选)对于 Aditi onal setting (其他设置),执行以下操作:
在 “Batch 大小-可选” 中,输入每个批次的最大记录数。默认值为 100。
对于 Batch 窗口-可选,输入在继续操作之前收集记录的最大秒数。
对于起始位置,请选择以下选项之一:
最@@ 新 — 使用分片中的最新记录开始阅读主题。
修剪地平线 — 从分片中最后一条未修剪的记录开始阅读主题。这是碎片中最古老的记录。
注意
Trim Horizon 与 Apache Kafka 的最早版本相同。
现在已经配置了源,您可以为管道添加可选的筛选、可选的扩展或目标。
(可选)配置筛选
您可以向管道添加筛选功能,这样您就只能将一部分记录从 Amazon MSK 队列发送到目标。
使用控制台配置筛选
选择 “筛选”。
在 S ample event-可选下,您将看到一个示例 Amazon MSK 事件,您可以使用该事件来构建事件模式,也可以选择 Enter your Own 来进入自己的事件。
在 “事件模式” 下,输入要筛选记录的事件模式。有关构建事件模式的更多信息,请参阅Amazon EventBridge 事件模式。
以下是示例事件模式,该模式仅发送城市字段中值为 Seatt le 的事件。
{ "data": { "City": ["Seattle"] } }
现在已对事件进行过滤,您可以为管道添加可选的扩充和目标。
(可选)定义丰富
您可以将用于丰富的事件数据发送到 Lambda 函数、Amazon Step Functions状态机、Amazon API Gateway 或 API 目标。
选择充实
选择 “丰富”。
在 “详细信息” 下的 “服务” 中,选择要用于丰富内容的服务和相关设置。
您也可以在发送数据进行增强之前对其进行转换。
(可选)定义输入变压器
选择增益输入变压器-可选。
对于示例事件/事件负载,选择示例事件类型。
对于 Tran
<$.detail.field>
sformer,输入转换器语法,例如,"Event happened at <$.detail.field>."
其中引用了示例事件中的字段。也可以双击示例事件中的字段,将其添加到转换器中。对于输出,验证输出是否与您想要的相似。
现在,数据已经过筛选和增强,您必须定义要将事件数据发送到的目标。
配置目标
配置目标
选择目标。
在 “详细信息” 下,为目标服务选择一个目标。显示的字段因您所选的目标而异。根据需要输入特定于此目标类型的信息。
您也可以在将数据发送到目标之前对其进行转换。
(可选)定义输入变压器
选择目标输入变压器-可选。
对于示例事件/事件负载,选择示例事件类型。
对于 Tran
<$.detail.field>
sformer,输入转换器语法,例如,"Event happened at <$.detail.field>."
其中引用了示例事件中的字段。也可以双击示例事件中的字段,将其添加到转换器中。对于输出,验证输出是否与您想要的相似。
现在管道已配置完毕,请确保其设置配置正确。
配置管道设置
默认情况下,管道处于活动状态,但您可以将其停用。您还可以指定管道的权限并添加标签。
配置管道设置
选择管道设置选项卡。
默认情况下,新创建的管道在创建后立即处于活动状态。如果要创建非活动管道,请在 “激活” 下的 “激活管道” 中关闭 “活动”。
在 “权限” 下,对于 “执行角色”,执行以下操作之一:
若要为此管道 EventBridge 创建新的执行角色,请选择为此特定资源创建新角色。 在角色名称下,您可以选择编辑角色名称。
要使用现有执行角色,请选择使用现有角色。在角色名称下,选择角色。
(可选)在 Tags-可选下,选择添加新标签,然后为规则输入一个或多个标签。有关更多信息,请参阅亚马逊 EventBridge 标签:
选择创建管道。
Amazon MSK 源的自动伸缩
当你最初创建一个 Amazon MSK 源时, EventBridge 分配一个使用者来处理 Apache Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,还可以根据工作负载 EventBridge 自动伸缩或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。
按一分钟的间隔时间来 EventBridge 评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 EventBridge 处理消息的速度更快。如有必要, EventBridge 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。
如果您的目标过载, EventBridge 会减少使用者的数量。此操作通过减少使用者可以检索和发送到管道的消息数来减少管道上的工作负载。