使用 Hudi 数据集 - Amazon EMR
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Hudi 数据集

Hudi 支持通过 Spark 在 Hudi 数据集中插入、更新和删除数据。有关更多信息,请参阅 Apache Hudi 文档中的编写 Hudi 表

以下示例演示如何在 Amazon EMR 笔记本 上启动交互式 Spark shell、使用 Spark 提交或使用 Hudi 处理 Amazon EMR。您还可以使用 Hudi DeltaStreamer 实用程序或其他工具来写入数据集。在本节中,示例演示使用 Spark shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。

注意

Hudi 0.6.0 包含 spark-avro 程序包作为不同名称下的依赖项。使用 EMR 5.31.0 及更高版本时,您不必在配置中包含 spark-avro.jar

spark-shell

在主节点上打开 Spark shell

  1. 使用 SSH 连接主节点。有关更多信息,请参阅 https://docs.amazonaws.cn/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html 中的使用 SSH 连接到主节点Amazon EMR 管理指南。

  2. 输入以下命令以启动 Spark shell。要使用 PySpark shell,请替换 spark-shell 替换为 pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark-submit

要提交使用 Hudi 的 Spark 应用程序,请确保将以下参数传递给 spark-submit。

spark-submit \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR 笔记本

要 Hudi 与 Amazon EMR 笔记本 结合使用,您必须首先将 Hudi jar 文件从本地文件系统复制到笔记本集群的主节点上的 HDFS。然后,您可以使用笔记本编辑器来配置 EMR 笔记本 以使用 Hudi.

将 Hudi 与 Amazon EMR 笔记本 结合使用

  1. 为 创建并启动集群。Amazon EMR 笔记本. 有关更多信息,请参阅 中的Amazon EMR为笔记本创建 集群。Amazon EMR 管理指南

  2. 使用 SSH 连接到集群的主节点,然后将 jar 文件从本地文件系统复制到 HDFS,如以下示例所示。在此示例中,我们在 HDFS 中创建了一个目录,以便清晰地管理文件。如果需要,您可以在 HDFS 中选择自己的目的地。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. 打开笔记本编辑器,输入以下示例中的代码,然后运行它。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

初始化 Hudi 的 Spark 会话

使用 Scala 时,请确保在 Spark 会话中导入以下类。这需要在每个 Spark 会话中完成一次。

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

写入 Hudi 数据集

以下示例演示如何创建 DataFrame 并将其作为 Hudi 数据集写入。

注意

要将代码示例粘贴到 Spark shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D.

每次将 DataFrame 写入到 Hudi 数据集时,都必须指定 DataSourceWriteOptions。 这些选项中的许多选项在写入操作之间可能是相同的。以下示例使用 hudiOptions 变量指定常用选项,随后的示例使用这些选项。

Scala
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "my_hudi_table", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "my_hudi_table", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset inputDF.write   .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/")
PySpark
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'my_hudi_table', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'my_hudi_table', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write .format('org.apache.hudi') .option('hoodie.datasource.write.operation', 'insert') .options(**hudiOptions) .mode('overwrite') .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')
注意

您可能会在代码示例和通知中看到“hodie”,而不是 Hudi。Hudi 代码库广泛使用旧的“方法”拼写。

DataSourceWriteOptions 适用于 的 参考 Hudi
选项 描述

TABLE_NAME

要在其中注册数据集的表名称。

表类型_OPT_KEY

可选。指定数据集是创建为 "COPY_ON_WRITE" 还是 "MERGE_ON_READ"。 默认值为 "COPY_ON_WRITE"

RECORDKEY_FIELD_OPT_KEY

其值将用作 recordKeyHoodieKey组件的记录键字段。 实际值将通过对字段值调用 .toString() 来获取。可以使用点表示法指定嵌套字段,例如 a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

其值将用作 partitionPath 的组件HoodieKey的分区路径字段。 实际值将通过对字段值调用 .toString() 来获取。

PRECOMBINE_FIELD_OPT_KEY

在实际写入之前在预合并中使用的字段。如果两个记录具有相同的键值,Hudi 为预合并选择字段值最大的记录(由 Object.compareTo(..). 确定)。

仅在元数据仓中注册 Hudi 数据集表时才需要以下选项。如果您未将 Hudi 数据集注册为 Hive 元数据仓中的表,则不需要这些选项。

DataSourceWriteOptions 适用于 Hive 的 参考
选项 描述

HIVE_DATABASE_OPT_KEY

要同步到的 Hive 数据库。默认值为 "default"

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

用于将分区字段值提取到 Hive 分区列中的类。

HIVE_PARTITION_FIELDS_OPT_KEY

数据集中用于确定 Hive 分区列的字段。

HIVE_SYNC_ENABLED_OPT_KEY

设置为 "true" 时,将向 Apache Hive 元数据仓注册数据集。默认值为 "false"

HIVE_TABLE_OPT_KEY

必填项。Hive 中要同步到的表的名称。例如,"my_hudi_table_cow"

HIVE_USER_OPT_KEY

可选。同步时要使用的 Hive 用户名。例如,"hadoop"

HIVE_PASS_OPT_KEY

可选。由 指定的用户的 Hive 密码。HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

Hive 元数据仓 URL。

更新插入数据

以下示例演示如何通过编写 DataFrame 来更新插入数据。 与上一个插入示例不同,OPERATION_OPT_KEY 值设置为 UPSERT_OPERATION_OPT_VAL。 此外,指定 .mode(SaveMode.Append) 以指示应附加记录。

Scala
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) updateDF.write   .format("org.apache.hudi")   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)   .options(hudiOptions)   .mode(SaveMode.Append)   .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/")
PySpark
# Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write .format('org.apache.hudi') .option('hoodie.datasource.write.operation', 'upsert') .options(**hudiOptions) .mode('append') .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

删除记录

要硬删除记录,您可以更新插入空负载。在这种情况下,PAYLOAD_CLASS_OPT_KEY 选项指定 EmptyHoodieRecordPayload 类。该示例使用更新插入示例中使用的相同 DataFrame updateDF 指定相同的记录。

Scala
updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/")
PySpark
updateDF.write .format('org.apache.hudi') .option('hoodie.datasource.write.operation', 'upsert') .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') .options(**hudiOptions) .mode('append') .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

您还可以通过将 OPERATION_OPT_KEY 设置为 DELETE_OPERATION_OPT_VAL 来硬删除您提交的数据集中的所有记录,从而硬删除数据。有关执行软删除的说明以及有关删除存储在 Hudi 表中的数据的更多信息,请参阅 Apache Hudi 文档中的删除

从 Hudi 数据集读取

为了检索当前时间点的数据,Hudi 默认执行快照查询。以下是查询写入 写入 Hudi 数据集 中的 S3 的数据集的示例。Replace s3://mybucket/myhudidataset 替换为您的表路径,并为每个分区级别添加通配符星号(加上一个额外的星号)。在此示例中,有一个分区级别,因此我们添加了两个通配符符号。

Scala
val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" + "/*/*") snapshotQueryDF.show()
PySpark
snapshotQueryDF = spark.read .format('org.apache.hudi') .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset' + '/*/*') snapshotQueryDF.show()

增量查询

您还可以使用 Hudi 执行增量查询,以获取自给定提交时间戳以来发生更改的记录流。为此,请将 QUERY_TYPE_OPT_KEY 字段设置为 QUERY_TYPE_INCREMENTAL_OPT_VAL。 然后,为 BEGIN_INSTANTTIME_OPT_KEY 添加一个值,以获取自指定时间以来写入的所有记录。增量查询通常比其批处理对应查询的效率高十倍,因为它们仅处理更改的记录。

执行增量查询时,请使用不带用于快照查询的通配符星号的根 (基本) 表路径。

注意

Presto 不支持增量查询。

Scala
val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" ); incQueryDF.show()
PySpark
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read .format('org.apache.hudi') .options(**readOptions) .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset') incQueryDF.show()

有关从 Hudi 数据集读取数据的更多信息,请参阅 Apache Hudi 文档中的查询 Hudi 表