将 OpenSearch Ingestion 管道与 Confluent Cloud Kafka 结合使用
您可以使用 OpenSearch Ingestion 管道将数据从 Confluent Cloud Kafka 集群流式传输到 Amazon OpenSearch Service 域和 OpenSearch 无服务器集合。OpenSearch Ingestion 同时支持公用和私有网络配置,可用于将数据从 Confluent Cloud Kafka 集群流式传输到 OpenSearch Service 或 OpenSearch 无服务器管理的域或集合。
连接到 Confluent Cloud 公有 Kafka 集群
您可以使用 OpenSearch Ingestion 管道从采用公有配置(即可以公开解析域 DNS 名称)的 Confluent Cloud Kafka 集群迁移数据。要执行此操作,请设置一个 OpenSearch Ingestion 管道,将 Confluent Cloud 公有 Kafka 集群作为源,将 OpenSearch Service 或 OpenSearch 无服务器作为目标。这将处理从自主管理型源集群到 Amazon 托管式目标域或集合的流式数据传输。
先决条件
创建 OpenSearch Ingestion 管道之前,请执行以下步骤:
-
创建充当源的 Confluent Cloud Kafka 集群。该集群应包含要摄入 OpenSearch Service 的数据。
-
创建要将数据迁移到的 OpenSearch Service 域或 OpenSearch 无服务器集合。有关更多信息,请参阅 创建 OpenSearch Service 域和创建集合。
-
使用 Amazon Secrets Manager 在 Confluent Cloud Kafka 集群上设置身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。
-
将基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从自主管理型集群写入您的域或集合。
以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新
resource。要创建具有向该集合或域写入数据的正确权限的 IAM 角色,请参阅 在 Amazon OpenSearch Ingestion 中设置角色和用户。
步骤 1:配置管道角色
完成 Confluent Cloud Kafka 集群管道的先决条件设置后,配置管道角色以在管道配置中使用,添加写入 OpenSearch Service 域或 OpenSearch 无服务器集合的权限,以及从 Secrets Manager 读取密钥的权限。
管理网络接口需要以下权限:
从 Amazon Secrets Manager 服务中读取密钥需要以下权限:
写入 Amazon OpenSearch Service 域需要以下权限:
{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:region:account-id:domain/domain-name/*" } ] }
步骤 2:创建管道
然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Confluent Cloud Kafka 指定为源。
您可以将多个 OpenSearch Service 域指定为数据的目标。使用此功能可以对传入多个 OpenSearch Service 域的数据执行条件路由或复制。
您还可以将数据从源 Confluent Kafka 集群迁移到 OpenSearch 无服务器 VPC 集合中。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。
version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-east-1.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] aws: region: "us-east-1" aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-east-1" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-east-1"
您可以使用预先配置的蓝图,以创建此管道。有关更多信息,请参阅 使用蓝图。
连接到 VPC 中的 Confluent Cloud Kafka 集群
您还可以使用 OpenSearch Ingestion 管道从在 VPC 中运行的 Confluent Cloud Kafka 集群迁移数据。要执行此操作,请设置一个 OpenSearch Ingestion 管道,将 Confluent Cloud Kafka 集群作为源,将 OpenSearch Service 或 OpenSearch 无服务器作为目标。这将处理从 Confluent Cloud Kafka 源集群到 Amazon 托管式目标域或集合的流式数据传输。
OpenSearch Ingestion 支持在 Confluent 支持的所有网络模式下配置的 Confluent Cloud Kafka 集群。OpenSearch Ingestion 支持将以下网络配置模式作为来源:
-
Amazon VPC 对等连接
-
适用于专用集群的 Amazon PrivateLink
-
适用于企业集群的 Amazon PrivateLink
-
Amazon Transit Gateway
先决条件
创建 OpenSearch Ingestion 管道之前,请执行以下步骤:
-
使用 VPC 网络配置创建 Confluent Cloud Kafka 集群,其中包含要摄入 OpenSearch Service 的数据。
-
创建要将数据迁移到的 OpenSearch Service 域或 OpenSearch 无服务器集合。有关更多信息,请参阅 创建 OpenSearch Service 域 和 创建集合。
-
使用 Amazon Secrets Manager 在 Confluent Cloud Kafka 集群上设置身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。
-
获取有权访问 Confluent Cloud Kafka 集群的 VPC 的 ID。选择 OpenSearch Ingestion 要使用的 VPC CIDR。
注意
如果您使用 Amazon Web Services 管理控制台 创建管道,则还必须将 OpenSearch Ingestion 管道附加到您的 VPC 才能使用 Confluent Cloud Kafka 集群。要执行此操作,请找到网络配置部分,选择连接到 VPC 复选框,然后从提供的任意一个默认选项中选择 CIDR,或者选择自己的 CIDR。您可以按照 RFC 1918 当前最佳实践
中的定义,使用私有地址空间中的任何 CIDR。 要提供自定义 CIDR,请从下拉菜单中选择其他。为避免 OpenSearch Ingestion 和自主管理型 OpenSearch 之间出现 IP 地址冲突,请确保自主管理型 OpenSearch VPC CIDR 与 OpenSearch Ingestion 的 CIDR 不同。
-
将基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从自主管理型集群写入您的域或集合。
注意
如果使用 Amazon PrivateLink 连接 Confluent Cloud Kafka,则需要配置 VPC DHCP 选项。应启用 DNS 主机名和 DNS 解析。
具体而言,使用以下选项集值:
企业集群:
domain-name: aws.private.confluent.cloud domain-name-servers: AmazonProvidedDNS专用集群:
domain-name: aws.confluent.cloud domain-name-servers: AmazonProvidedDNS此更改可确保 Confluent PrivateLink 端点的 DNS 解析在 VPC 内正常运行。
以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新
resource。要创建具有向该集合或域写入数据的正确权限的 IAM 角色,请参阅 在 Amazon OpenSearch Ingestion 中设置角色和用户。
步骤 1:配置管道角色
完成管道先决条件设置后,配置管道角色以在管道配置中使用,并为该角色添加以下权限:
必须为用于创建 OpenSearch Ingestion 管道的 IAM 角色提供上述 Amazon EC2 权限,因为该管道会使用这些权限在您的 VPC 中创建和删除网络接口。该管道只能通过此网络接口访问 Kafka 集群。
步骤 2:创建管道
然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Kafka 指定为源。
您可以将多个 OpenSearch Service 域指定为数据的目标。使用此功能可以对传入多个 OpenSearch Service 域的数据执行条件路由或复制。
您还可以将数据从源 Confluent Kafka 集群迁移到 OpenSearch 无服务器 VPC 集合中。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。
version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-east-1.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] aws: region: "us-east-1" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-east-1" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-east-2"