将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka

你可以使用 Kafka 插件将来自亚马逊 Apache Managed Streaming for Apache Kafka(亚马逊 MS OpenSearch K)的数据提取到你的摄取管道中。在 Amazon MSK 中,您可以构建并运行使用 Apache Kafka 的应用程序来处理流数据。 OpenSearch Ingestion Amazon PrivateLink 用于连接亚马逊 MSK。

先决条件

在创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 按照 Amazon Managed Streaming for Apache Kafka 开发人员指南中的创建集群的步骤创建 Amazon MSK 集群。

    • 对于集群类型,请选择已预配置。 OpenSearch 摄取不支持无服务器 MSK 集群。

  2. 集群处于活动状态后,请按照开启多 VPC 连接中的步骤执行操作。

  3. 按照将集群策略附加到 MSK 集群的步骤附加以下策略之一,具体取决于集群与管道是否位于同一 Amazon Web Services 账户。此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 集群的 Amazon PrivateLink 连接并从 Kafka 主题中读取数据。确保使用自身 ARN 更新 resource

    当集群与管道位于同一 Amazon Web Services 账户时,适用以下策略:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    如果您的 MSK 集群与您的管道 Amazon Web Services 账户 不同,请改为附加以下策略。的 ARN Amazon principal 应该是您为管道 YAML 配置提供的相同管道角色的 ARN:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. 按照创建主题中的步骤创建 Kafka 主题。确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。--replication-factor 的值应为 23,具体取决于 MSK 集群包含的区域数量。--partitions 的值至少应为 10

  5. 按照生成和使用数据中的步骤生成和使用数据。同样,确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。

步骤 1:配置管道角色

设置 MSK 集群后,在管道角色中添加要在管道配置中使用的以下 Kafka 权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

步骤 2:创建管道

然后,你可以配置如下所示的 OpenSearch 摄取管道,将 Kafka 指定为来源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" serde_format: "json"/"plaintext" aws: msk: arn: "arn:aws:iam::{account-id}:role/cluster-role" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema: # Optional type: "aws_glue" processor: - grok: match: log: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

您可以使用 AWS-MSKPipeline 蓝图创建此管道。有关更多信息,请参阅 使用蓝图创建管道

步骤 3:(可选)使用 Amazon Glue 架构注册表

当您将 OpenSearch Ingestion 与 Amazon MSK 配合使用时,可以将 AVRO 数据格式用于架构注册表中托管的架构。 Amazon Glue 在 Amazon Glue 架构注册表中,您可以集中发现、控制和演变数据流架构。

要使用此选项,请在管道配置中启用架构 type

schema: type: "aws_glue"

您还必须在您的管道角色中提供 Amazon Glue 读取访问权限。您可以使用名为的 Amazon 托管策略AWSGlueSchemaRegistryReadonlyAccess。此外,您的注册表必须与您的 OpenSearch Ingestion 管道位于同一 Amazon Web Services 账户 区域中。

步骤 4:(可选)为 Amazon MSK 管道配置推荐的计算单位 (OCU)

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是,当分区数量大于使用者数量时,Amazon MSK 将要求每个使用者托管多个分区。 OpenSearch Ingestion 具有内置的 auto Scaling,可以根据 CPU 使用率或管道中的待处理记录数量向上或向下扩展。

为实现最佳性能,请将分区分布在多个计算单位中以便并行处理。如果主题包含大量分区(例如,超过 96 个,即每个管道的最大 OCU),我们建议您将管道配置为 1–96 个 OCU。因为它将根据需要自动扩缩。如果主题包含的分区数量较少(例如,少于 96 个),则最大计算单位应与分区数量相同。

当管道包含多个主题时,请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一个主题和使用者组添加另一个包含一组新 OCU 的管道,几乎可以线性扩展吞吐量。