使用 Amazon Data Firehose 将数据流式传输到表 - Amazon Simple Storage Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Amazon Data Firehose 将数据流式传输到表

Amazon Data Firehose 是一项完全托管式服务,用于实时将流数据传输到目标,如 Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk、Apache Iceberg 表和自定义 HTTP 端点或由受支持的第三方服务提供商拥有的 HTTP 端点。在使用 Amazon Data Firehose 时,您无需编写应用程序或管理资源。您可以配置数据生成工具向 Firehose 发送数据,然后 Firehose 会将数据自动传输到您指定的目标。还可以配置 Firehose 在传输数据之前转换数据。要了解有关 Amazon Data Firehose 的更多信息,请参阅 What is Amazon Data Firehose?

将表存储桶与 Amazon 分析服务集成后,您可以执行以下操作:

  1. 配置 Firehose 以将数据传输到 S3 表中。为此,您需要创建一个支持 Firehose 访问表的 Amazon Identity and Access Management(IAM)服务角色。

  2. 创建指向表或表的命名空间的资源链接。

  3. 通过授予对资源链接的权限,向 Firehose 服务角色授予对表或表命名空间的显式权限。

  4. 创建一个 Firehose 流来将数据路由到表。

为 Firehose 创建一个角色以使用 S3 表作为目标

Firehose 需要具有特定权限的 IAM 服务角色,才能访问 Amazon Glue 表并将数据写入 S3 表。在创建 Firehose 流时,您需要提供此 IAM 角色。

  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 在左侧导航窗格中,选择策略

  3. 选择创建策略,并在策略编辑器中选择 JSON

  4. 添加以下内联策略,来授予对数据目录中所有数据库和表的权限。如果需要,则您可以仅向特定的表和数据库授予权限。要使用这一策略,请将 user input placeholders 替换为您自己的信息。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "S3TableAccessViaGlueFederation", "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetDatabase", "glue:UpdateTable" ], "Resource": [ "arn:aws:glue:region:account-id:catalog/s3tablescatalog/*", "arn:aws:glue:region:account-id:catalog/s3tablescatalog", "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/*", "arn:aws:glue:region:account-id:table/*/*" ] }, { "Sid": "S3DeliveryErrorBucketPermission", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::error delivery bucket", "arn:aws:s3:::error delivery bucket/*" ] }, { "Sid": "RequiredWhenUsingKinesisDataStreamsAsSource", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:account-id:stream/stream-name" }, { "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation", "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": "*" }, { "Sid": "RequiredWhenUsingKMSEncryptionForS3ErrorBucketDelivery", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": [ "arn:aws:kms:region:account-id:key/KMS-key-id" ], "Condition": { "StringEquals": { "kms:ViaService": "s3.region.amazonaws.com" }, "StringLike": { "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::error delivery bucket/prefix*" } } }, { "Sid": "LoggingInCloudWatch", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:log-group-name:log-stream:log-stream-name" ] }, { "Sid": "RequiredWhenAttachingLambdaToFirehose", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": [ "arn:aws:lambda:region:account-id:function:function-name:function-version" ] } ] }

    此策略有一些语句,支持访问 Kinesis Data Streams、调用 Lambda 函数和访问 Amazon KMS 密钥。如果您不使用这些资源中的任何一个,则可以删除相应的语句。

    如果启用了错误日志记录,则 Firehose 还会将数据传输错误发送到您的 CloudWatch 日志组和流。为此,您必须配置日志组和日志流名称。对于日志组和日志流名称,请参阅 Monitor Amazon Data Firehose Using CloudWatch Logs

  5. 创建策略后,创建一个 IAM 角色,其中 Amazon 服务可信实体类型

  6. 对于服务或使用案例,选择 Kinesis。对于使用案例,选择 Kinesis Firehose

  7. 选择下一步,然后选择之前创建的策略。

  8. 为您的角色命名。检查角色详细信息,然后选择创建角色。该角色将具有以下信任策略。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sts:AssumeRole" ], "Principal": { "Service": [ "firehose.amazonaws.com" ] } } ] }

要访问表,Amazon Data Firehose 需要一个以表的命名空间为目标的资源链接。资源链接是一个数据目录对象,它充当指向另一个数据目录资源(如数据库或表)的别名或指针。该链接存储在创建该链接的账户或区域的数据目录中。有关更多信息,请参阅《Amazon Lake Formation Developer Guide》中的 How resource links work

将表存储桶与 Amazon 分析服务集成后,可以创建资源链接,以便在 Firehose 中使用表。

您可以创建指向表命名空间的资源链接,然后提供指向 Firehose 的链接名称,以便 Firehose 可以处理链接的表。

以下 Amazon CLI 命令创建一个资源链接,您可以使用该链接将 S3 表连接到 Firehose。要使用此示例命令,请将 user input placeholders 替换为您自己的信息。

aws glue create-database --region us-east-1 \ --catalog-id "111122223333" \ --database-input \ '{ "Name": "resource-link-name", "TargetDatabase": { "CatalogId": "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket", "DatabaseName": "my_namespace" }, "CreateTableDefaultPermissions": [] }'
注意

必须分别授予对资源链接和目标(链接)命名空间的权限。有关更多信息,请参阅 授予对资源链接的权限

使用资源链接访问表时,必须分别授予对资源链接和目标(链接)命名空间或表的权限。您可以通过 Lake Formation 控制台或 Amazon CLI,向 IAM 主体授予对链接到表命名空间的资源链接的 Lake Formation 权限。

Console
  1. 打开 Amazon Lake Formation 控制台(网址为 https://console.amazonaws.cn/lakeformation/),并以数据湖管理员身份登录。有关如何创建数据湖管理员的更多信息,请参阅《Amazon Lake Formation Developer Guide》中的 Create a data lake administrator

  2. 在导航窗格中,选择数据权限,然后选择授予

  3. 授予权限页面的主体下,选择 IAM 用户和角色,然后选择您创建的用于流式传输到表的服务角色。

  4. LF 标签或目录资源下,选择命名 Data Catalog 资源

  5. 对于目录,选择您的账户 ID,即默认目录

  6. 对于数据库,选择为表命名空间创建的资源链接。

  7. 对于资源链接权限,选择描述

  8. 选择授权

CLI
  1. 务必要以数据湖管理员身份运行 Amazon CLI 命令。有关更多信息,请参阅《Amazon Lake Formation Developer Guide》中的 Create a data lake administrator

  2. 运行以下命令,向 IAM 主体授予对 S3 表存储桶中表的 Lake Formation 权限,以便主体可以访问该表。要使用此示例,请将 user input placeholders 替换为您自己的信息。DataLakePrincipalIdentifier 值可以是 IAM 用户或角色 ARN。

    aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::account-id:role/role-name \ --resource Database='{CatalogId=account-id, Name=database-name}' \ --permissions DESCRIBE

设置到 S3 表的 Firehose 流

以下过程说明如何设置 Firehose 流以使用控制台向 S3 表传输数据。要设置到 S3 表的 Firehose 流,需要满足以下先决条件。

先决条件

要在配置流时向 Firehose 提供路由信息,可以使用为命名空间创建的资源链接名称作为数据库名称和该命名空间中表的名称。可以在 Firehose 流配置的“唯一键”部分使用这些值,来将数据路由到单个表。也可以使用这些值通过 JSON 查询表达式路由到表。有关更多信息,请参阅 Route incoming records to a single Iceberg table

设置到 S3 表的 Firehose 流(控制台)
  1. https://console.aws.amazon.com/firehose/ 中打开 Firehose 控制台。

  2. 选择创建 Firehose 流

  3. 对于,选择下列源之一:

    • Amazon Kinesis Data Streams

    • Amazon MSK

    • 直接 PUT

  4. 对于目标,选择 Apache Iceberg 表

  5. 输入 Firehose 流名称

  6. 配置源设置

  7. 对于目标设置,选择当前账户和要流式传输到的表的 Amazon 区域

  8. 使用唯一密钥配置、JSONQuery 表达式或在 Lambda 函数中配置数据库和表名称。有关更多信息,请参阅《Amazon Data Firehose Developer Guide》中的 Route incoming records to a single Iceberg tableRoute incoming records to different Iceberg tables

  9. 备份设置下,指定 S3 备份存储桶

  10. 对于高级设置下的现有 IAM 角色,选择您为 Firehose 创建的 IAM 角色。

  11. 选择创建 Firehose 流

有关您可以为流配置的其它设置的更多信息,请参阅《Amazon Data Firehose Developer Guide》中的 Set up the Firehose stream