将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka

你可以使用 Kafka 插件将来自亚马逊 Apache Managed Streaming For Apache KafkaMSK(亚马 OpenSearch 逊)的数据提取到你的摄取管道中。借助亚马逊MSK,您可以构建和运行使用 Apache Kafka 来处理流数据的应用程序。 OpenSearch Ingestion Amazon PrivateLink 用于连接亚马逊。MSK您可以从 Amazon MSK 和 Amazon MSK 无服务器集群中提取数据。这两个流程之间的唯一区别是在设置管道之前必须执行的先决条件步骤。

Amazon MSK 先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照《适用于 A pache Kafka 的亚马逊托管流媒体 Kafka 开发者指南》中创建集群中的步骤创建亚马逊MSK预配置集群。对于 Broker 类型,请选择除t3类型之外的任何选项,因为 OpenSearch Ingestion 不支持这些类型。

  2. 集群处于 “活动” 状态后,请按照开启多VPC连接中的步骤进行操作。

  3. 按照将集群策略附加到集MSK群中的步骤来附加以下策略之一,具体取决于您的集群和管道是否相同 Amazon Web Services 账户。此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 集群的 Amazon PrivateLink 连接并从 Kafka 主题中读取数据。请务必使用自己的版本resource进行更新ARN。

    当集群与管道位于同一 Amazon Web Services 账户时,适用以下策略:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    如果您的 Amazon MSK 集群所在的管道与您的管道 Amazon Web Services 账户 不同,请改为附加以下策略。请注意,只有预配置的 Amazon 集群才能进行跨账户访问,Amazon MSK Serverless MSK 集群无法进行跨账户访问。的 ARN fo Amazon principal r 应与您ARN为管道配置提供的相同管道角色相同:YAML

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. 按照创建主题中的步骤创建 Kafka 主题。确保BootstrapServerString这是私有端点(单VPC)引导程序URLs之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK 集群拥有的区域数量。--partitions 的值至少应为 10

  5. 按照生成和使用数据中的步骤生成和使用数据。再说一遍,请确保BootstrapServerString这是您的私有端点(单个VPC)引导程序URLs之一。

Amazon MSK 无服务器先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照《适用MSK于 Apache Managed Streaming 的亚马逊管理流媒体 Kafka 开发者指南》中创建MSK无服务器集群中的步骤创建亚马逊无服务器集群。

  2. 集群处于 “活动” 状态后,按照将群集策略附加到MSK集群中的步骤来附加以下策略。请务必使用自己的版本resource进行更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK Serverless 集群的 Amazon PrivateLink 连接并从 Kafka 主题中读取数据。当您的集群和管道处于相同状态时,此政策适用 Amazon Web Services 账户,因为Amazon MSK Serverless不支持跨账户访问,因此必须如此。

  3. 按照创建主题中的步骤创建 Kafka 主题。确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导程序URLs之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK Serverless 集群所拥有的区域数量。--partitions 的值至少应为 10

  4. 按照生成和使用数据中的步骤生成和使用数据。再说一遍,请确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导程序URLs之一。

步骤 1:配置管道角色

设置 Amazon 预MSK配置集群或无服务器集群后,在管道角色中添加要在工作流配置中使用的以下 Kafka 权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

步骤 2:创建管道

然后,你可以配置如下所示的 OpenSearch 摄取管道,将 Kafka 指定为来源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:kafka:{region}:{account-id}:cluster/cluster-name/cluster-id" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

您可以使用预先配置的 Amazon MSK 蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道

步骤 3:(可选)使用 Amazon Glue 架构注册表

当您将 OpenSearch Ingestion 与 Amazon 配合使用时MSK,您可以使用架构注册表中托管的架构AVRO的数据格式。 Amazon Glue 在 Amazon Glue 架构注册表中,您可以集中发现、控制和演变数据流架构。

要使用此选项,请在管道配置中启用架构 type

schema: type: "aws_glue"

您还必须在您的管道角色中提供 Amazon Glue 读取访问权限。您可以使用名为的 Amazon 托管策略AWSGlueSchemaRegistryReadonlyAccess。此外,您的注册表必须与您的 OpenSearch Ingestion 管道位于同一 Amazon Web Services 账户 区域中。

步骤 4:(可选)为 Amazon MSK 管道配置推荐的计算单位 (OCUs)

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是,当分区的数量大于使用者的数量时,Amazon 会在每个使用者上MSK托管多个分区。 OpenSearch Ingestion 具有内置的 auto Scaling,可根据CPU使用情况或管道中待处理记录的数量向上或向下扩展。

为实现最佳性能,请将分区分布在多个计算单位中以便并行处理。如果主题有大量分区(例如,超过 96 个,这是OCUs每个管道的最大分区),我们建议您将管道配置为 1— OCUs 96。因为它将根据需要自动扩缩。如果主题包含的分区数量较少(例如,少于 96 个),则最大计算单位应与分区数量相同。

当管道包含多个主题时,请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一主题和使用者组添加另一个具有新集的OCUs管道,您可以几乎线性地扩展吞吐量。