将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka

你可以使用 Kafka 插件将来自亚马逊 Apache Managed Streaming for Apache Kafka(亚马逊 MS OpenSearch K)的数据提取到你的摄取管道中。在 Amazon MSK 中,您可以构建并运行使用 Apache Kafka 的应用程序来处理流数据。 OpenSearch Ingestion Amazon PrivateLink 用于连接亚马逊 MSK。您可以从 Amazon MSK 和 Amazon MSK 无服务器集群中提取数据。这两个流程之间的唯一区别是在设置管道之前必须执行的先决步骤。

亚马逊 MSK 先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照《适用于 Apache Managed Streaming 的亚马逊管理流媒体 Kafk a 开发者指南》中创建集群中的步骤创建 Amazon MSK 预配置集群。对于 Broker 类型,请选择除t3类型之外的任何选项,因为 OpenSearch Ingestion 不支持这些类型。

  2. 集群处于活动状态后,请按照开启多 VPC 连接中的步骤执行操作。

  3. 按照将集群策略附加到 MSK 集群的步骤附加以下策略之一,具体取决于集群与管道是否位于同一 Amazon Web Services 账户。此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 集群的 Amazon PrivateLink 连接并从 Kafka 主题中读取数据。确保使用自身 ARN 更新 resource

    当集群与管道位于同一 Amazon Web Services 账户时,适用以下策略:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    如果您的 Amazon MSK 集群与您的管道 Amazon Web Services 账户 不同,请改为附加以下策略。请注意,只有预配置的 Amazon MSK 集群才能进行跨账户访问,Amazon MSK 无服务器集群无法进行跨账户访问。的 ARN Amazon principal 应该是您为管道 YAML 配置提供的相同管道角色的 ARN:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. 按照创建主题中的步骤创建 Kafka 主题。确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK 集群拥有的区域数量。--partitions 的值至少应为 10

  5. 按照生成和使用数据中的步骤生成和使用数据。同样,确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。

Amazon MSK 无服务器先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照 Apache Managed Streaming for Apache Kafka 开发者指南中创建 MSK 无服务器集群中的步骤创建亚马逊 MSK 无服务器集群。

  2. 集群处于 “活动” 状态后,按照将集群策略附加到 MSK 集群中的步骤来附加以下策略。确保使用自身 ARN 更新 resource

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 无服务器集群的 Amazon PrivateLink 连接并从 Kafka 主题中读取数据。当您的集群和管道处于相同状态时,此政策适用 Amazon Web Services 账户,这必须是正确的,因为 Amazon MSK Serverless 不支持跨账户访问。

  3. 按照创建主题中的步骤创建 Kafka 主题。确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导网址之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK Serverless 集群拥有的区域数量。--partitions 的值至少应为 10

  4. 按照生成和使用数据中的步骤生成和使用数据。再说一遍,请确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导网址之一。

步骤 1:配置管道角色

设置 Amazon MSK 预配置集群或无服务器集群后,在管道角色中添加要在管道配置中使用的以下 Kafka 权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

步骤 2:创建管道

然后,你可以配置如下所示的 OpenSearch 摄取管道,将 Kafka 指定为来源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:kafka:{region}:{account-id}:cluster/cluster-name/cluster-id" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

您可以使用预配置的 Amazon MSK 蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道

步骤 3:(可选)使用 Amazon Glue 架构注册表

当您将 OpenSearch Ingestion 与 Amazon MSK 配合使用时,可以将 AVRO 数据格式用于架构注册表中托管的架构。 Amazon Glue 在 Amazon Glue 架构注册表中,您可以集中发现、控制和演变数据流架构。

要使用此选项,请在管道配置中启用架构 type

schema: type: "aws_glue"

您还必须在您的管道角色中提供 Amazon Glue 读取访问权限。您可以使用名为的 Amazon 托管策略AWSGlueSchemaRegistryReadonlyAccess。此外,您的注册表必须 Amazon Web Services 账户 与您的 OpenSearch 摄取管道位于同一区域中。

步骤 4:(可选)为 Amazon MSK 管道配置推荐的计算单位 (OCU)

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是,当分区数量大于使用者数量时,Amazon MSK 将要求每个使用者托管多个分区。 OpenSearch Ingestion 具有内置的 auto Scaling,可以根据 CPU 使用率或管道中的待处理记录数量向上或向下扩展。

为实现最佳性能,请将分区分布在多个计算单位中以便并行处理。如果主题包含大量分区(例如,超过 96 个,即每个管道的最大 OCU),我们建议您将管道配置为 1–96 个 OCU。因为它将根据需要自动扩缩。如果主题包含的分区数量较少(例如,少于 96 个),则最大计算单位应与分区数量相同。

当管道包含多个主题时,请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一个主题和使用者组添加另一个包含一组新 OCU 的管道,几乎可以线性扩展吞吐量。