本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Confluent Cl OpenSearch oud Kafka 中使用采集管道
您可以使用 OpenSearch 摄取管道将数据从 Confluent Cloud Kafka 集群流式传输到 OpenSearch 亚马逊服务域和无服务器集合。 OpenSearch OpenSearch Ingestion 支持公用和私有网络配置,用于将数据从 Confluent Cloud Kafka 集群流式传输到由服务或无服务器管理的域或集合。 OpenSearch OpenSearch
连接到 Confluent Cloud 公有 Kafka 集群
您可以使用 OpenSearch 摄取管道从具有公共配置的 Confluent Cloud Kafka 集群迁移数据,这意味着可以公开解析域 DNS 名称。为此,请设置一个 OpenSearch 采集管道,将 Confluent Cloud 公共 Kafka 集群作为源,将 OpenSearch 服务或 OpenSearch 无服务器作为目标。这将处理您从自行管理的源集群到托 Amazon管目标域或集合的流式数据。
先决条件
在创建 OpenSearch Ingestion 管道之前,请执行以下步骤:
-
创建充当源的 Confluent Cloud Kafka 集群。集群应包含您要提取到 S OpenSearch ervice 中的数据。
-
创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息,请参阅 创建 OpenSearch 服务域和创建集合。
-
使用 Amazon Secrets Manager在 Confluent Cloud Kafka 集群上设置身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。
-
将基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。
以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新
resource
。要创建具有正确权限的 IAM 角色来访问向集合或域写入数据,请参阅在 Amazon OpenSearch Ingestion 中设置角色和用户。
步骤 1:配置管道角色
设置 Confluent Cloud Kafka 集群管道先决条件后,配置要在管道配置中使用的管道角色,添加写入 OpenSearch 服务域或 OpenSearch 无服务器集合的权限,以及从 Secrets Manager 读取密钥的权限。
管理网络接口需要以下权限:
以下是从 Amazon Secrets Manager 服务中读取机密所需的权限:
写入亚马逊 OpenSearch 服务域需要以下权限:
{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:region
:account-id
:domain/domain-name
/*" } ] }
步骤 2:创建管道
然后,你可以配置如下所示的 OpenSearch 摄取管道,它将你的 Confluent Cloud Kafka 指定为来源。
您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。
您还可以将数据从源 Confluent Kafka 集群迁移到无服务器 V OpenSearch PC 集合。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。
version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "
topic-name
" group_id: "group-id
" bootstrap_servers: - "bootstrap-server
.us-east-1
.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username
} password: ${aws_secrets:confluent-kafka-secret:password
} schema: type: confluent registry_url: https://my-registry
.us-east-1
.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key
}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret
}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com
"] aws: region: "us-east-1
" aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret
" region: "us-east-1
" schema-secret: secret_id: "my-self-managed-kafka-schema
" region: "us-east-1
"
您可以使用预先配置的蓝图来创建此管道。有关更多信息,请参阅 使用蓝图。
连接到 VPC 中的 Confluent Cloud Kafka 集群
您还可以使用 OpenSearch 摄取管道从 VPC 中运行的 Confluent Cloud Kafka 集群迁移数据。为此,请设置一个 OpenSearch 采集管道,将 Confluent Cloud Kafka 集群作为源,将 OpenSearch 服务或 OpenSearch 无服务器作为目标。这将处理从 Confluent Cloud Kafka 源集群到 Amazon托管式目标域或集合的流式数据传输。
OpenSearch Ingestion 支持在 Confluent 中以所有支持的网络模式配置的 Confluent Cloud Kafka 集群。 OpenSearch Ingestion 支持以下网络配置模式作为来源:
-
Amazon VPC 对等连接
-
Amazon PrivateLink 适用于专用集群
-
Amazon PrivateLink 适用于企业集群
-
Amazon Transit Gateway
先决条件
在创建 OpenSearch Ingestion 管道之前,请执行以下步骤:
-
使用 VPC 网络配置创建 Confluent Cloud Kafka 集群,其中包含您要提取到服务中的数据。 OpenSearch
-
创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息,请参阅。有关更多信息,请参阅 创建 OpenSearch 服务域和创建集合。
-
使用 Amazon Secrets Manager在 Confluent Cloud Kafka 集群上设置身份验证。按照 Rotate Amazon Secrets Manager secrets 中的步骤启用密钥轮换。
-
获取有权访问 Confluent Cloud Kafka 集群的 VPC 的 ID。选择 OpenSearch Ingestion 要使用的 VPC CIDR。
注意
如果您使用创建管道,则还必须将您的 OpenSearch 摄取管道连接到您的 VPC 才能使用 Confluent Cloud Kafka 集群。 Amazon Web Services Management Console 要执行此操作,请找到网络配置部分,选择连接到 VPC 复选框,然后从提供的任意一个默认选项中选择 CIDR,或者选择自己的 CIDR。您可以按照 RFC 1918 当前最佳实践
中的定义,使用私有地址空间中的任何 CIDR。 要提供自定义 CIDR,请从下拉菜单中选择其他。为避免接入和自行管理之间的 IP 地址冲突 OpenSearch,请确保自管理 VP OpenSearch C CIDR 与用于 OpenSearch 摄取的 CIDR 不同。 OpenSearch
-
将基于资源的策略附加到域,或将数据访问策略附加到集合。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。
注意
如果使用 Amazon PrivateLink 连接 Confluent Cloud Kafka,则需要配置 VPC DHCP 选项。应启用 DNS 主机名和 DNS 解析。
具体而言,请使用以下选项集值:
domain-name: aws.private.confluent.cloud domain-name-servers: AmazonProvidedDNS
此更改可确保 Confluent PrivateLink 终端节点的 DNS 解析在 VPC 内正常运行。
以下示例域访问策略允许您在下一步中创建的管道角色将数据写入域。确保使用自身 ARN 更新
resource
。要创建具有正确权限的 IAM 角色来访问向集合或域写入数据,请参阅在 Amazon OpenSearch Ingestion 中设置角色和用户。
步骤 1:配置管道角色
完成管道先决条件设置后,配置管道角色以在管道配置中使用,并为该角色添加以下权限:
您必须为用于创建 OpenSearch 摄取管道的 IAM 角色提供上述 Amazon EC2 权限,因为管道使用这些权限在您的 VPC 中创建和删除网络接口。该管道只能通过此网络接口访问 Kafka 集群。
步骤 2:创建管道
然后,您可以配置如下所示的 OpenSearch 采集管道,将 Kafka 指定为来源。
您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。
您还可以将数据从源 Confluent Kafka 集群迁移到无服务器 V OpenSearch PC 集合。务必要在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。
version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "
topic-name
" group_id: "group-id
" bootstrap_servers: - "bootstrap-server
.us-east-1
.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username
} password: ${aws_secrets:confluent-kafka-secret:password
} schema: type: confluent registry_url: https://my-registry
.us-east-1
.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key
}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret
}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com
"] aws: region: "us-east-1
" index: "confluent-index
" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret
" region: "us-east-1
" schema-secret: secret_id: "my-self-managed-kafka-schema
" region: "us-east-2
"