在 Confluent OpenSearch Kafka 云中使用采集管道 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Confluent OpenSearch Kafka 云中使用采集管道

你可以使用 Confluent Kafka 作为 OpenSearch Ingestion 中的来源,将数据从 Confluent Kafka 集群流式传输到亚马逊 OpenSearch 服务域或亚马逊无服务器集合。 OpenSearch OpenSearch Ingestion 支持在公共和私有网络空间中处理来自自管 Kafka 的流数据。

连接到 Confluent 公共 Kafka 云

您可以使用 OpenSearch Ingestion 管道从具有公共配置的 Confluent Kafka 集群中流式传输数据(必须公开解析引导服务器 DNS 名称)。为此,你需要一个 OpenSearch 采集管道、一个融合的 Kafka 集群作为源,以及一个亚马逊服务 OpenSearch 域或一个亚马逊 OpenSearch 无服务器集合作为目标。

要迁移数据,必须具备以下条件:

  • 充当源的 Confluent Kafka 集群。集群应包含您要迁移的数据。

  • 作为目的地的亚马逊 OpenSearch 服务域名或亚马逊 OpenSearch 无服务器集合。

  • Kafka 集群应使用来自 Amazon Secrets Manager的凭据启用身份验证。

要求

要在自行管理的 OpenSearch 或 Elasticsearch 源集群上启用 Amazon Secrets Manager 基于身份验证的功能,您必须

  • 按照轮换密钥中的步骤在你的 Confluent Kafka 集群上设置身份验证。 Amazon Secrets ManagerAmazon Secrets Manager

  • 在 IAM 中创建具有写入亚马逊 OpenSearch 服务域或亚马逊 OpenSearch 无服务器集合权限的管道角色。您还必须指定读取证书的权限 Amazon Secrets Manager。要实现此目的,应按照以下步骤进行:

    • 基于资源的策略附加到您的 Amazon S OpenSearch ervice 域或将数据访问策略附加到您的馆藏。这些访问策略允许 OpenSearch Ingestion 将数据从您的自行管理 OpenSearch 或 Elasticsearch 源集群写入您的亚马逊服务 OpenSearch 域或您的亚马逊无服务器集合。 OpenSearch

  • 参考蓝 OpenSearch 图创建摄取管道。

    完成这些步骤后,您的管道将自动开始处理源集群中的数据,并将其提取到您的亚马逊 OpenSearch 服务域或亚马逊 OpenSearch 无服务器收集目标中。您可以在 Ingesti OpenSearch on 管道中使用各种处理器对摄取的数据执行任何转换。

IAM 角色和权限

以下示例域访问策略允许您在下一步中创建的管道角色向 Amazon S OpenSearch ervice 域写入数据。确保使用自己的 ARN 更新资源。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

管理网络接口需要以下权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

以下是从 Amazon Secrets Manager 服务中读取机密所需的权限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"] } ] }

写入亚马逊 OpenSearch 服务域需要以下权限:

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }
创建管道

将策略附加到管道角色后,使用 Confluent Kafka 数据迁移管道蓝图创建管道。该蓝图包括在 Kafka 和目标之间迁移数据的默认配置。

  • 您可以指定多个 Amazon OpenSearch 服务域作为数据的目的地。此功能允许有条件地路由或将传入的数据复制到多个 Amazon OpenSearch Servicedomains。

  • 您可以将数据从源 Confluent Kafka 集群迁移到亚马逊无服务器 V OpenSearch PC 集合。确保在管道配置中提供网络访问策略。

  • 您可以使用融合架构注册表来定义和融合架构。

以下示例管道将数据从 Confluent Kafka 集群提取到亚马逊服务域: OpenSearch

version: "2" kafka-pipeline: source: kafka: # Encryption is always required encryption: type: "ssl" topics: - name: "topic_4" group_id: "demoGroup" bootstrap_servers: # TODO: for public confluent kafka use public booststrap server dns - "<<bootstrap-server>>.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: "${{aws_secrets:confluent-kafka-secret:username}}" password: "${{aws_secrets:confluent-kafka-secret:password}}" # Schema is optional schema: type: confluent registry_url: https://<<registry-url>>.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: [ "https://<<opensearchdomain>>.us-west-2.es.amazonaws.com" ] index: "enterprise-confluent-demo" aws: sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" region: "<<aws-region>>" extension: aws: secrets: confluent-kafka-secret: secret_id: "enterprise-kafka-credentials" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" schema-secret: secret_id: "self-managed-kafka-schema" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role"

在 VPC 中连接到 Confluent Kafka 云

您可以使用 OpenSearch 摄取管道从具有公共配置的 Confluent Kafka 集群中流式传输数据。为此,请设置一个以 Confluent Kafka 为来源、亚马逊服务 OpenSearch 域或亚马逊 OpenSearch 无服务器集合作为目标的 OpenSearch 摄取管道。管道处理来自你的 kafka 集群的所有流数据,并将数据提取到目标集群。

Confluent Kafka 网络配置

OpenSearch Ingestion 支持在 Confluent 中以所有支持的网络模式配置的 Confluent Kafka 集群。 OpenSearch Ingestion 支持以下网络配置模式作为来源。

  • Amazon VPC 对等连接

  • Amazon PrivateLink 适用于专用集群

  • Amazon PrivateLink 适用于企业集群

  • Amazon Transit Gateway

你可以使用 Confluent 托管 Kafka 作为从 Confluent 云中提取数据的来源。为实现这一目标,您需要设置一个管道,将 Kafka 配置为来源,将 Amazon Serv OpenSearch ice 域或 Amazon OpenSearch Serverless 集合配置为接收器。这便于将数据从 Kafka 迁移到指定目的地。迁移还支持使用 confluent 注册表或根本不使用注册表。

要执行数据迁移,您需要以下资源:

  • 充当源的 Confluent Kafka 集群,包含你打算迁移的数据。

  • 目标目的地,例如亚马逊 OpenSearch 服务域名或亚马逊 OpenSearch 无服务器集合作为接收器。

  • 有权访问 Confluent VPC 的亚马逊 VPC 的 VPC ID。

  • Kafka 集群应使用来自 Amazon Secrets Manager的凭据启用身份验证。

要求

要在 Kafka 集群上设置摄取,需要满足以下条件:

  • 您必须在 Kafka 集群上启用 Amazon Secrets Manager 基于身份验证的功能。

  • 您需要提供 VPC CIDR 以供 OpenSearch 摄取服务使用。

    • 如果您使用 Amazon 管理控制台创建管道,则还必须将 Amazon OpenSearch Ingestion 管道附加到您的 VPC,才能使用 Confluent Kafka 作为来源。为此,请找到 “网络配置” 部分,选中 “连接到 VPC” 复选框,然后选择您的 CIDR 或手动输入要由摄取使用的任何 /24 CIDR。 OpenSearch OpenSearch Ingestion 选择使用的 CIDR 应不同于运行 Confluent 托管 Kafka 的 VPC CIDR。有关需要避免使用的 Confluent Kafka CIDR 的更多信息,请点击此处。以下是 OpenSearch 接入服务可用于创建网络连接的默认 CIDR 选项。

      • 10.99.20.0/24

      • 192.168.36.0/24

      • 172.21.56.0/24

  • 您需要在 IAM 中创建一个管道角色,该角色具有访问亚马逊 OpenSearch 服务域或 Amazon OpenSearch Serverless 集合的权限以及从中 Amazon Secrets Manager读取密钥的权限。

    • 基于资源的策略附加到您的 Amazon Ser OpenSearch vicedomain,或者将亚马逊 OpenSearch 无服务器数据访问策略附加到您的馆藏。这些访问策略允许 OpenSearch Ingestion 将数据从你的 Kafka 写入你的亚马逊 OpenSearch 服务域或亚马逊 OpenSearch 无服务器集合。

IAM 角色和权限

以下示例域访问策略允许管道角色向 Amazon S OpenSearch ervice 域写入数据。

注意

您需要使用自己的 AR resource N 进行更新。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

以下示例提供了管理网络接口所需的权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ]

以下示例提供了从中读取密钥所需的权限 Amazon Secrets Manager:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>"] } ] }

以下示例提供了写入亚马逊 OpenSearch 服务域所需的权限:

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }
创建管道

将策略附加到管道角色后,您可以使用 Confluent Kafka 数据迁移管道蓝图来创建您的管道。该蓝图包括在 Kafka 和目标之间迁移数据的默认配置。

  • 您可以指定多个 Amazon OpenSearch 服务域作为数据的目的地。此功能允许有条件地将传入的数据路由或复制到多个 Amazon OpenSearch 服务中。

  • 您可以将数据从源 Confluent Kafka 集群迁移到亚马逊无服务器 V OpenSearch PC 集合。确保在管道配置中提供网络访问策略。

  • 您可以使用 Confluent 架构注册表来定义和 Confluent 架构。

管道配置示例

version: "2" kafka-pipeline: source: kafka: # Encryption is always required encryption: type: "ssl" topics: - name: "topic_4" group_id: "demoGroup" bootstrap_servers: # TODO: for public confluent kafka use public booststrap server dns - "<<bootstrap-server>>.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: "${{aws_secrets:confluent-kafka-secret:username}}" password: "${{aws_secrets:confluent-kafka-secret:password}}" # Schema is optional schema: type: confluent registry_url: https://<<registry-url>>.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: [ "https://<<opensearchdomain>>.us-west-2.es.amazonaws.com" ] index: "enterprise-confluent-demo" aws: sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" region: "<<aws-region>>" extension: aws: secrets: confluent-kafka-secret: secret_id: "enterprise-kafka-credentials" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role" schema-secret: secret_id: "self-managed-kafka-schema" region: "<<aws-region>>" sts_role_arn: "arn:aws:iam::1234567890:role/os-os-test-role"