将 OpenSearch Ingestion 管道与 Kafka 结合使用 - Amazon OpenSearch Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 OpenSearch Ingestion 管道与 Kafka 结合使用

您可以使用 Kafka 插件,将数据从自行管理的 Kafka 集群流式传输到 Amazon OpenSearch Service 域和 OpenSearch 无服务器集合。OpenSearch Ingestion 支持来自 Kafka 集群的连接,该集群配置有公共或私有(VPC)网络。本主题概述设置摄取管道的先决条件和步骤,包括配置网络设置和身份验证方法,例如双向TLS(mTLS)、SASL/SCRAM 或 IAM。

从公有 Kafka 集群迁移数据

您可以使用 OpenSearch Ingestion 管道,从自行管理的公共(即可以公开解析域 DNS 名称)Kafka 集群迁移数据。要执行此操作,请设置一个 OpenSearch Ingestion 管道,将自主管理型 Kafka 作为源,将 OpenSearch Service 或 OpenSearch 无服务器作为目标。这将处理从自主管理型源集群到 Amazon 托管式目标域或集合的流式数据传输。

先决条件

创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 使用公有网络配置创建自主管理型 Kafka 集群。该集群应包含要摄入 OpenSearch Service 的数据。

  2. 创建要将数据迁移到的 OpenSearch Service 域或 OpenSearch 无服务器集合。有关更多信息,请参阅 创建 OpenSearch Service 域创建集合

  3. 使用 Amazon Secrets Manager 设置自主管理型集群上的身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。

  4. 基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从自主管理型集群写入您的域或集合。

    以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新 resource

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::444455556666:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:us-east-1:111122223333:domain/domain-name" ] } ] }

    要创建具有向该集合或域写入数据的正确权限的 IAM 角色,请参阅 在 Amazon OpenSearch Ingestion 中设置角色和用户

步骤 1:配置管道角色

完成 Kafka 管道的先决条件设置后,配置管道角色以在管道配置中使用,添加写入 OpenSearch Service 域或 OpenSearch 无服务器集合的权限,以及从 Secrets Manager 读取密钥的权限。

步骤 2:创建管道

然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Kafka 指定为源。

您可以将多个 OpenSearch Service 域指定为数据的目标。使用此功能可以对传入多个 OpenSearch Service 域的数据执行条件路由或复制。

您还可以将数据从源 Confluent Kafka 集群迁移到 OpenSearch 无服务器 VPC 集合中。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-east-1.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] aws: region: "us-east-1" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-east-1" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-east-1"

您可以使用预先配置的蓝图,以创建此管道。有关更多信息,请参阅 使用蓝图

从 VPC 中的 Kafka 集群迁移数据

您还可以使用 OpenSearch Ingestion 管道从在 VPC 中运行的自主管理型 Kafka 集群迁移数据。要执行此操作,请设置一个 OpenSearch Ingestion 管道,将自主管理型 Kafka 作为源,将 OpenSearch Service 或 OpenSearch 无服务器作为目标。这将处理从自主管理型源集群到 Amazon 托管式目标域或集合的流式数据传输。

先决条件

创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 使用 VPC 网络配置创建自主管理型 Kafka 集群,其中包含要摄入 OpenSearch Service 的数据。

  2. 创建要将数据迁移到的 OpenSearch Service 域或 OpenSearch 无服务器集合。有关更多信息,请参阅创建 OpenSearch Service 域创建集合

  3. 使用 Amazon Secrets Manager 设置自主管理型集群上的身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。

  4. 获取有权访问自主管理型 Kafka 的 VPC 的 ID。选择 OpenSearch Ingestion 要使用的 VPC CIDR。

    注意

    如果您使用 Amazon Web Services 管理控制台创建管道,则还必须将 OpenSearch Ingestion 管道附加到您的 VPC 才能使用自主管理型 Kafka。要执行此操作,请找到网络配置部分,选择连接到 VPC 复选框,然后从提供的任意一个默认选项中选择 CIDR,或者选择自己的 CIDR。您可以按照 RFC 1918 当前最佳实践中的定义,使用私有地址空间中的任何 CIDR。

    要提供自定义 CIDR,请从下拉菜单中选择其他。为避免 OpenSearch Ingestion 和自主管理型 OpenSearch 之间出现 IP 地址冲突,请确保自主管理型 OpenSearch VPC CIDR 与 OpenSearch Ingestion 的 CIDR 不同。

  5. 基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从自主管理型集群写入您的域或集合。

    以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新 resource

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::444455556666:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:us-east-1:111122223333:domain/domain-name" ] } ] }

    要创建具有向该集合或域写入数据的正确权限的 IAM 角色,请参阅 在 Amazon OpenSearch Ingestion 中设置角色和用户

步骤 1:配置管道角色

完成管道先决条件设置后,配置管道角色以在管道配置中使用,并为该角色添加以下权限:

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"] }, { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:*:network-interface/*", "arn:aws:ec2:*:*:subnet/*", "arn:aws:ec2:*:*:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

必须为用于创建 OpenSearch Ingestion 管道的 IAM 角色提供上述 Amazon EC2 权限,因为该管道会使用这些权限在您的 VPC 中创建和删除网络接口。该管道只能通过此网络接口访问 Kafka 集群。

步骤 2:创建管道

然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Kafka 指定为源。

您可以将多个 OpenSearch Service 域指定为数据的目标。使用此功能可以对传入多个 OpenSearch Service 域的数据执行条件路由或复制。

您还可以将数据从源 Confluent Kafka 集群迁移到 OpenSearch 无服务器 VPC 集合中。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-east-1.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] aws: region: "us-east-1" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-east-1" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-east-1"

您可以使用预先配置的蓝图,以创建此管道。有关更多信息,请参阅 使用蓝图