将 OpenSearch Ingestion 管道与 Amazon Managed Streaming for Apache Kafka 结合使用 - Amazon OpenSearch Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 OpenSearch Ingestion 管道与 Amazon Managed Streaming for Apache Kafka 结合使用

您可以使用 Kafka 插件,将来自 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 的数据摄取到 OpenSearch Ingestion 管道。在 Amazon MSK 中,您可以构建并运行使用 Apache Kafka 的应用程序来处理流数据。OpenSearch Ingestion 使用 Amazon PrivateLink 连接 Amazon MSK。您可以从 Amazon MSK 和 Amazon MSK 无服务器集群摄取数据。这两个流程之间的唯一区别是在设置管道之前必须执行的先决条件步骤。

预置的 Amazon MSK 先决条件

创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 按照《Amazon Managed Streaming for Apache Kafka 开发人员指南》中 Creating a cluster 说明的步骤,创建一个由 Amazon MSK 预置的集群。对于代理类型,请选择除 t3 类型之外的任何选项,因为 OpenSearch Ingestion 不支持这些类型。

  2. 集群处于活动状态后,请按照开启多 VPC 连接中的步骤执行操作。

  3. 按照将集群策略附加到 MSK 集群的步骤附加以下策略之一,具体取决于集群与管道是否位于同一 Amazon Web Services 账户。此策略允许 OpenSearch Ingestion 创建指向 Amazon MSK 集群的 Amazon PrivateLink 连接,并从 Kafka 主题中读取数据。确保使用自身 ARN 更新 resource

    当集群与管道位于同一 Amazon Web Services 账户 时,适用以下策略:

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" } ] }

    如果 Amazon MSK 集群与管道位于不同的 Amazon Web Services 账户中,请改为附加以下策略。请注意,只有预置的 Amazon MSK 集群才能进行跨账户访问,Amazon MSK 无服务器集群不支持跨账户访问。Amazon principal 的 ARN 应是您为管道配置提供的同一管道角色的 ARN:

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::444455556666:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*" ] } ] }
  4. 按照创建主题中的步骤创建 Kafka 主题。确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。--replication-factor 的值应为 23,具体取决于 Amazon MSK 集群包含的可用区数量。--partitions 的值至少应为 10

  5. 按照生成和使用数据中的步骤生成和使用数据。同样,确保 BootstrapServerString 是私有端点 (single-VPC) 引导 URL 之一。

Amazon MSK 无服务器先决条件

创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 按照《Amazon Managed Streaming for Apache Kafka 开发人员指南》中 Create an MSK Serverless cluster 说明的步骤,创建一个 Amazon MSK 无服务器集群。

  2. 集群处于活动状态后,按照 Attach a cluster policy to the MSK cluster 中的步骤附加以下策略。确保使用自身 ARN 更新 resource

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" } ] }

    此策略允许 OpenSearch Ingestion 创建指向 Amazon MSK 无服务器集群的 Amazon PrivateLink 连接,并读取 Kafka 主题中的数据。当集群和管道位于同一 Amazon Web Services 账户时将会执行此策略,必须满足这一条件,因为 Amazon MSK 无服务器不支持跨账户访问。

  3. 按照创建主题中的步骤创建 Kafka 主题。确保 BootstrapServerString 是简单身份验证和安全层(SASL)IAM 引导 URL 之一。--replication-factor 的值应为 23,具体取决于 Amazon MSK 无服务器集群包含的可用区数量。--partitions 的值至少应为 10

  4. 按照生成和使用数据中的步骤生成和使用数据。同样,确保 BootstrapServerString 是简单身份验证和安全层(SASL)IAM 引导 URL 之一。

步骤 1:配置管道角色

预置好 Amazon MSK 集群或设置好无服务器集群后,在管道角色中添加要在管道配置中使用的以下 Kafka 权限:

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*" ] } ] }

步骤 2:创建管道

然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Kafka 指定为源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "grouplambd-id" aws: msk: arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" region: "us-west-2" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"] index: "index_name" aws_region: "region" aws_sigv4: true

您可以使用预先配置的 Amazon MSK 蓝图,以创建此管道。有关更多信息,请参阅 使用蓝图

步骤 3:(可选)使用 Amazon Glue 架构注册表

将 OpenSearch Ingestion 与 Amazon MSK 一起使用时,您可以对 Amazon Glue 架构注册表中托管的架构使用 AVRO 数据格式。在 Amazon Glue架构注册表中,您可以集中发现、控制和演变数据流架构。

要使用此选项,请在管道配置中启用架构 type

schema: type: "aws_glue"

您还必须在管道角色中为 Amazon Glue 提供读取访问权限。您可以使用名为 AWSGlueSchemaRegistryReadonlyAccess 的 Amazon 托管策略。此外,您的注册表必须与 OpenSearch Ingestion 管道位于同一 Amazon Web Services 账户 和区域中。

步骤 4:(可选)为 Amazon MSK 管道配置推荐的计算单位(OCU)

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是,当分区数量大于使用者数量时,Amazon MSK 将要求每个使用者托管多个分区。OpenSearch Ingestion 具有内置自动扩缩功能,可以根据 CPU 使用率或管道的待处理记录数量进行纵向扩缩或缩减。

为实现最佳性能,请将分区分布在多个计算单位中以便并行处理。如果主题包含大量分区(例如,超过 96 个,即每个管道的最大 OCU),我们建议您将管道配置为 1–96 个 OCU。因为它将根据需要自动扩缩。如果主题包含的分区数量较少(例如,少于 96 个),则最大计算单位应与分区数量相同。

当管道包含多个主题时,请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一个主题和使用者组添加另一个包含一组新 OCU 的管道,几乎可以线性扩展吞吐量。