使用 Athena 查询 Apache Hudi 数据集 - Amazon Athena
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Athena 查询 Apache Hudi 数据集

Apache Hudi是一个开源数据管理框架,可简化增量递增数据的处理。记录级别的插入、更新、加入和删除操作的处理更加精细,从而降低了开销。Upsert 指的是将记录插入到现有数据集中(如果它们不存在)或对数据集进行更新(如果它们存在)的功能。

Hudi 处理数据插入和更新事件,而不会创建许多可能导致分析性能问题的小文件。Apache Hudi 会自动跟踪更改并合并文件,以便它们保持最佳大小。这样就不需要构建自定义解决方案来监控许多小文件并将其重写为大文件以减少其数量。

Hudi 数据集适用于以下使用案例:

Hudi 管理的数据集使用开放存储格式存储在 Amazon S3 中。目前,Athena 可以读取压缩的 Hudi 数据集,但不能写 Hudi 数据。Athena 最高可支持将 Hudi 版本 0.8.0 与 Athena 引擎版本 2 结合使用,以及将 Hudi 版本 0.14.0与 Athena 引擎版本 3 结合使用。这可能随时更改。Athena 不保证与使用更高版本的 Hudi 创建的表具有读取兼容性。有关 Athena 引擎版本控制的更多信息,请参阅 Athena 引擎版本控制。有关 Hudi 功能和版本控制的更多信息,请参阅 Apache 网站上的 Hudi 文档

Hudi 数据集表类型

Hudi 数据集可以采用以下类型之一:

  • 写入时复制 (CoW) – 数据以列状格式存储 (Parquet),并且每次更新都会在写入过程中创建一个新版本的文件。

  • 读取时合并 (MOR) – 数据使用列式 (Parquet) 和基于行 (Avro) 的格式的组合进行存储。更新记录到基于行的 delta 文件中,并根据需要进行压缩以创建新版本的列式文件。

对于 CoW 数据集,每次更新记录时,包含该记录的文件都会使用更新后的值进行重写。对于 MoR 数据集,每次进行更新时,Hudi 仅写入已更改记录对应的行。MoR 更适合写入或更改繁重而读取量较少的工作负载。CoW 更适合更改频率较低但读取量繁重的工作负载。

Hudi 提供三个查询类型用于访问数据:

  • 快照查询 – 该查询用于查看截至给定提交或压缩操作时表的最新快照。对于 MOR 表,快照查询通过在查询时合并最新文件切片的基本文件和增量文件来显示表的最新状态。

  • 增量查询 – 查询只能看到自给定提交/压缩以来,写入表的新数据。这有效地提供了更改流以启用增量数据管道。

  • 读取优化查询 – 对于 MOR 表,查询将看到最新压缩的数据。对于 CoW 表,查询将看到最新提交的数据。

下表显示了适用于每种表类型的可用 Hudi 查询类型。

表类型 可用的 Hudi 查询类型
写入时复制 快照、增量
读取时合并 快照、增量、读取优化

目前,Athena 支持快照查询和读取优化查询,但不支持增量查询。在 MOR 表上,所有在读取优化查询中显示的 数据均已压缩。这提供了良好的性能,但不包括最新的增量提交。快照查询包含最新的数据,但会但产生一些计算开销,这使得这些查询的性能降低。

有关在表和查询类型之间权衡的更多信息,请参阅 Apache Hudi 文档中的表和查询类型

Hudi 术语变更:视图现在为查询

从发行版 0.5.1 开始,Apache Hudi 改变了一些术语。以前的视图在较新的版本中称为查询。下表总结了旧新术语之间的变更。

旧术语 新术语

CoW:读取优化视图

MoR:实时视图

快照查询

增量视图 增量查询
MoR 读取优化视图 读取优化的查询

引导启动操作中的表

从 Apache Hudi 版本 0.6.0 开始,引导启动操作功能可为现有的 Parquet 数据集提供更好的性能。引导操作只能生成元数据,使数据集保持原位,而不是重写数据集。

您可以使用 Athena 从引导启动操作中查询表,就像基于 Amazon S3 中数据的其他表一样。在您的 CREATE TABLE 语句中,在 LOCATION 子句指定 Hudi 表路径。

有关在 Amazon EMR 中使用引导启动操作创建 Hudi 表的更多信息,请参阅 Amazon 大数据博客文章:New features from Apache Hudi available in Amazon EMR(在 Amazon EMR 中可用的 Apache Hudi 新功能)。

Hudi 元数据列表

Apache Hudi 有一个元数据表,其中包含用于提高性能的索引功能,例如文件列表、使用列统计数据跳过数据以及基于布隆筛选器的索引。

在这些功能中,Athena 目前仅支持文件列表索引。文件列表索引通过从维护分区到文件映射的索引中获取信息,从而消除了“列表文件”之类的文件系统调用。这样就无需以递归方式列出表路径下的每个分区即可查看文件系统。当您处理大型数据集时,这种索引可以大大减少写入和查询期间获取文件列表时出现的延迟。还可以避免诸如 Amazon S3 LIST 调用的请求限制节流之类的瓶颈。

注意

Athena 目前不支持数据跳过或布隆筛选器索引。

启用 Hudi 元数据表

默认情况下,禁用基于元数据表的文件列表。要启用 Hudi 元数据表和相关的文件列表功能,请将 hudi.metadata-listing-enabled 表格属性设置为 TRUE

示例

以下 ALTER TABLE SET TBLPROPERTIES 示例在示例 partition_cow 表上启用元数据表。

ALTER TABLE partition_cow SET TBLPROPERTIES('hudi.metadata-listing-enabled'='TRUE')

注意事项和限制

  • Athena 不支持增量查询。

  • Athena 不支持对 Hudi 数据执行 CTAS 或者 INSERT INTO。如果您希望 Athena 支持编写 Hudi 数据集,请将反馈发送至

    有关编写 Hudi 数据的更多信息,请参阅以下资源:

  • 不支持在 Athena 的 Hudi 表上使用 MSCK REPAIR TABLE。如果您需要加载未在 Amazon Glue 中创建的 Hudi 表,请使用 ALTER TABLE ADD PARTITION

  • 不支持跳过 S3 Glacier 对象 – 如果 Apache Hudi 表中的对象属于 Amazon S3 Glacier 存储类,将 read_restored_glacier_objects 表属性设置为 false 则无效。

    例如,假设发出以下命令:

    ALTER TABLE table_name SET TBLPROPERTIES ('read_restored_glacier_objects' = 'false')

    对于 Iceberg 和 Delta Lake 表,该命令会生成错误 Unsupported table property key: read_restored_glacier_objects。对于 Hudi 表,ALTER TABLE 命令不会产生错误,但是 Amazon S3 Glacier 对象仍无法跳过。在 ALTER TABLE 命令之后运行 SELECT 查询会继续返回所有对象。

创建 Hudi 表

本节提供了 Athena 中针对 Hudi 数据的分区表和非分区表的 CREATE TABLE 语句的示例。

如果您有已在 Amazon Glue 中创建的 Hudi 表,您可以直接在 Athena 中查询它们。当您在 Athena 中创建分区 Hudi 表时,您必须运行 ALTER TABLE ADD PARTITION 以加载 Hudi 数据,然后再查询这些数据。

写入时复制(CoW)创建表示例

未分区 CoW 表

以下示例在 Athena 中创建了一个未分区的 CoW 表。

CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/non_partition_cow/'

分区 CoW 表

以下示例在 Athena 中创建了一个已分区的 CoW 表。

CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/'

以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_cow 表。

ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_cow/two/'

读取时合并(MoR)创建表示例

Hudi 在元数据仓中为 MoR 创建了两个表:一个用于快照查询的表,一个用于读取优化查询的表。两个表都可查询。在 0.5.1 之前的 Hudi 版本中,用于读优化查询的表具有您在创建表时指定的名称。从 Hudi 版本 0.5.1 开始,表名将默认以 _ro 为后缀。用于快照查询的表的名称是指定的名称,其中附加了 _rt

未分区的读取时合并(MoR)表

以下示例在 Athena 中创建了一个未分区的 MoR 表用于读取优化查询。请注意,读取优化查询使用输入格式 HoodieParquetInputFormat

CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/nonpartition_mor/'

以下示例在 Athena 中创建了一个未分区的 MoR 表用于快照查询。对于快照查询,请使用输入格式 HoodieParquetRealtimeInputFormat

CREATE EXTERNAL TABLE `nonpartition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/nonpartition_mor/'

已分区的读取时合并(MoR)表

以下示例在 Athena 中创建了一个已分区的 MoR 表用于读取优化查询。

CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/'

以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_mor 表。

ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/two/'

以下示例在 Athena 中创建了一个已分区的 MoR 表用于快照查询。

CREATE EXTERNAL TABLE `partition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/'

同样,以下 ALTER TABLE ADD PARTITION 示例将两个分区添加到了示例 partition_mor_rt 表。

ALTER TABLE partition_mor_rt ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder/partition_mor/two/'

其他 资源