本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Hudi 数据集
Hudi 支持通过 Spark 在 Hudi 数据集中插入、更新和删除数据。有关更多信息,请参阅 Apache Hudi 文档中的写入 Hudi 表格
以下示例演示如何启动交互式 Spark 外壳、使用 Spark 提交或使用亚马逊EMR笔记本在亚马逊EMR上使用 Hudi。您也可以使用 Hudi DeltaStreamer 实用程序或其他工具写入数据集。在本节中,示例演示了在使用默认hadoop
用户连接到主节点时使用 SSH Spark shell 处理数据集。
运行spark-shell
spark-submit
、或spark-sql
使用 Amazon EMR 6.7.0 或更高版本时,请传递以下命令。
注意
亚马逊 EMR 6.7.0 使用 A pache Hudi 0.11.0-amzn-0,与之前的 Hudi
在主节点上打开 Spark Shell
-
使用连接到主节点SSH。有关更多信息,请参阅《Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。
-
输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请更换
spark-shell
替换为pyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
运行spark-shell
spark-submit
、或spark-sql
使用 Amazon EMR 6.6.x 或更早版本时,请传递以下命令。
注意
-
亚马逊 EMR 6.2 和 5.31 及更高版本(Hudi 0.6.x 及更高版本)可以从配置中省略。
spark-avro.jar
-
亚马逊 EMR 6.5 和 5.35 及更高版本(Hudi 0.9.x 及更高版本)可以从配置中省略
spark.sql.hive.convertMetastoreParquet=false
。 -
亚马逊 EMR 6.6 和 5.36 及更高版本(Hudi 0.10.x 及更高版本)必须包含版本:0.10.0 Spark 指南中所述的
HoodieSparkSessionExtension
配置:--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
在主节点上打开 Spark Shell
-
使用连接到主节点SSH。有关更多信息,请参阅《Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。
-
输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请更换
spark-shell
替换为pyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
要将 Hudi 与 Amazon EMR Notebooks 配合使用,必须先将 Hudi jar 文件从本地文件系统复制到笔记本集群的主节点HDFS上。然后,您可以使用笔记本编辑器将EMR笔记本配置为使用 Hudi。
将 Hudi 与 Amazon EMR 笔记本一起使用
-
为 Amazon EMR 笔记本创建并启动集群。有关更多信息,请参阅《亚马逊EMR管理指南》中的为笔记本创建亚马逊EMR集群。
-
使用连接到群集的主节点,SSH然后将 jar 文件从本地文件系统复制到,HDFS如以下示例所示。在示例中,HDFS为了便于文件管理,我们在中创建了一个目录。如果需要HDFS,您可以选择自己的目的地。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
打开笔记本编辑器,输入以下示例中的代码,然后运行它。
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
要将 Hudi 与 Amazon EMR Notebooks 配合使用,必须先将 Hudi jar 文件从本地文件系统复制到笔记本集群的主节点HDFS上。然后,您可以使用笔记本编辑器将EMR笔记本配置为使用 Hudi。
将 Hudi 与 Amazon EMR 笔记本一起使用
-
为 Amazon EMR 笔记本创建并启动集群。有关更多信息,请参阅《亚马逊EMR管理指南》中的为笔记本创建亚马逊EMR集群。
-
使用连接到群集的主节点,SSH然后将 jar 文件从本地文件系统复制到,HDFS如以下示例所示。在示例中,HDFS为了便于文件管理,我们在中创建了一个目录。如果需要HDFS,您可以选择自己的目的地。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
打开笔记本编辑器,输入以下示例中的代码,然后运行它。
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
初始化 Hudi 的 Spark 会话
使用 Scala 时,您必须在 Spark 会话中导入以下类。这需要在每个 Spark 会话中完成一次。
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
写入 Hudi 数据集
以下示例说明如何创建 DataFrame 并将其写为 Hudi 数据集。
注意
要将代码示例粘贴到 Spark shell 中,请在提示符处键入 :paste
,粘贴示例,然后按 CTRL
+ D
。
每次向 Hudi 数据集写入时,都必须指定DataSourceWriteOptions
。 DataFrame 这些选项中的许多选项在写入操作之间可能是相同的。以下示例使用
变量指定常用选项,随后的示例使用这些选项。hudiOptions
注意
亚马逊 EMR 6.7.0 使用 A pache Hudi 0.11.0-amzn-0,与之前的 Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
注意
您可能会在代码示例和通知中看到“hoodie”而不是 Hudi。Hudi 代码库广泛使用旧的“hoodie”拼写。
DataSourceWriteOptions Hudi 的参考资料 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
选项 | 描述 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TABLE_NAME |
要在其中注册数据集的表名称。 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TABLE_TYPE_OPT_KEY |
可选。指定数据集是创建为 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RECORDKEY_FIELD_OPT_KEY |
其值将用作 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PARTITIONPATH_FIELD_OPT_KEY |
其值将用作 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PRECOMBINE_FIELD_OPT_KEY |
在实际写入之前在预合并中使用的字段。如果两个记录具有相同的键值,Hudi 为预合并选择字段值最大的记录(由 |
仅在元数据仓中注册 Hudi 数据集表时才需要以下选项。如果您未将 Hudi 数据集注册为 Hive 元数据仓中的表,则不需要这些选项。
DataSourceWriteOptions Hive 的参考资料 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
选项 | 描述 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_DATABASE_OPT_KEY |
要同步到的 Hive 数据库。默认为 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
用于将分区字段值提取到 Hive 分区列中的类。 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PARTITION_FIELDS_OPT_KEY |
数据集中用于确定 Hive 分区列的字段。 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_SYNC_ENABLED_OPT_KEY |
设置为 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_TABLE_OPT_KEY |
必需。Hive 中要同步到的表的名称。例如, |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_USER_OPT_KEY |
可选。同步时要使用的 Hive 用户名。例如, |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PASS_OPT_KEY |
可选。由 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_URL_OPT_KEY |
Hive 元数据仓URL。 |
更新插入数据
以下示例演示如何通过编写 a DataFrame 来更新插入数据。与之前的插入示例不同,OPERATION_OPT_KEY
值设置为 UPSERT_OPERATION_OPT_VAL
。此外,还指定 .mode(SaveMode.Append)
以指示应追加记录。
注意
亚马逊 EMR 6.7.0 使用 A pache Hudi 0.11.0-amzn-0,与之前的 Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
删除记录
要硬删除记录,您可以更新插入一个空的负载。在这种情况下,PAYLOAD_CLASS_OPT_KEY
选项指定 EmptyHoodieRecordPayload
类。该示例使用 upsert 示例中使用的相同方法来指定相同的记录。 DataFrame updateDF
注意
亚马逊 EMR 6.7.0 使用 A pache Hudi 0.11.0-amzn-0,与之前的 Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
您还可以通过以下方式硬删除数据:将 OPERATION_OPT_KEY
设置为 DELETE_OPERATION_OPT_VAL
来删除您提交的数据集中的所有记录。有关执行软删除的说明,以及有关删除 Hudi 表中存储的数据的详细信息,请参阅 Apache Hudi 文档中的 Deletes
从 Hudi 数据集读取
要在当前时间点检索数据,Hudi 默认情况下执行快照查询。以下是查询在 写入 Hudi 数据集 中写入 S3 的数据集的示例。Replace(替换) s3://amzn-s3-demo-bucket/myhudidataset
添加表路径,为每个分区级别添加通配符星号,再加上一个星号。在此示例中,有一个分区级别,因此我们添加了两个通配符号。
注意
亚马逊 EMR 6.7.0 使用 A pache Hudi 0.11.0-amzn-0,与之前的 Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
递增查询
您还可以使用 Hudi 执行增量查询,以获取自给定提交时间戳以来已更改的记录流。为此,请将 QUERY_TYPE_OPT_KEY
字段设置为 QUERY_TYPE_INCREMENTAL_OPT_VAL
。然后,为 BEGIN_INSTANTTIME_OPT_KEY
添加一个值,以获取自指定时间以来写入的所有记录。递增查询的效率通常是批处理查询的十倍,因为它们只处理更改的记录。
执行增量查询时,请使用根(基)表路径,而不需要用于快照查询的通配符星号。
注意
Presto 不支持递增查询。
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
有关从 Hudi 数据集读取的更多信息,请参阅 Apache Hudi 文档中的 查询 Hudi 表