使用 Hudi 数据集 - Amazon EMR
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Hudi 数据集

Hudi 支持通过 Spark 在 Hudi 数据集中插入、更新和删除数据。有关更多信息,请参阅 Apache Hudi 文档中的写入 Hudi 表格

以下示例展示了如何启动交互式Sparkshell、使用Spark提交或使用 Amazon EMR 笔记本 与 Hudi 于 Amazon EMR. 您还可以使用 Hudi DeltaStreamer 用于写入数据集的实用工具或其他工具。在本节中,示例演示使用 Spark shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。

Spark shell

在主节点上打开 Spark shell

  1. 使用 SSH 连接主节点。有关更多信息,请参阅 使用SSH连接到管理节点Amazon EMR 管理指南.

  2. 输入以下命令以启动 Spark shell。

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark-submit

要提交使用 Hudi 的 Spark 应用程序,请确保将以下参数传递给 spark-submit。

spark-submit \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR 笔记本

要 Hudi 与 Amazon EMR 笔记本 结合使用,您必须首先将 Hudi jar 文件从本地文件系统复制到笔记本集群的主节点上的 HDFS。然后,您可以使用笔记本编辑器来配置 EMR 笔记本 以使用 Hudi。

将 Hudi 与 Amazon EMR 笔记本 结合使用

  1. 为 Amazon EMR 笔记本 创建并启动集群。有关更多信息,请参阅 正在创建 Amazon EMR 笔记本的群集Amazon EMR 管理指南.

  2. 使用 SSH 连接到集群的主节点,然后将 jar 文件从本地文件系统复制到 HDFS,如以下示例所示。在此示例中,我们在 HDFS 中创建了一个目录,以便清晰地管理文件。如果需要,您可以在 HDFS 中选择自己的目的地。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. 打开笔记本编辑器,输入以下示例中的代码,然后运行它。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

初始化 Hudi 的 Spark 会话

在 Spark 会话中,导入以下类。这需要在每个 Spark 会话中完成一次。

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

写入 Hudi 数据集

以下示例显示了如何编写Spark DataFrame 作为Hudi数据集。

注意

要将代码示例粘贴到 Spark shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D

您每次写 DataFrame 到Hudi数据集,您必须指定 DataSourceWriteOptions。这些选项中的许多可能在写入操作之间是相同的。以下示例使用 hudiOptions 变量指定常用选项,随后的示例使用这些选项。

// Read data from S3 and create a DataFrame with Partition and Record Key val inputDF = spark.read.format("parquet").load("s3://mybucket/mydata/parquet/") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "my_hudi_table", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "my_hudi_table", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write a DataFrame as a Hudi dataset inputDF.write   .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://mybucket/myhudidataset/")
注意

您可能会在代码示例和Hudi通知中看到“hodie”而不是Hudi。Hudi代码库广泛使用旧的“hodie”拼写。

适用于 Hudi 的 DataSourceWriteOptions 参考
选项 Description

TABLE_NAME

要在其中注册数据集的表名称。

表_类型_点_键

可选。指定数据集是否创建为 "COPY_ON_WRITE""MERGE_ON_READ"。默认值为 "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

其值将用作 recordKey 的组件 HoodieKey。实际值将通过调用获得 .toString() 字段值。可以使用点表示法指定嵌套字段,例如 a.b.c

PARTITIONPATH_FIELD_OPT_KEY

其值将用作 partitionPath 的组件 HoodieKey。实际值将通过调用获得 .toString() 字段值。

PRECOMBINE_FIELD_OPT_KEY

在实际写入之前在预合并中使用的字段。如果两个记录具有相同的键值,Hudi 为预合并选择字段值最大的记录(由 Object.compareTo(..) 确定)。

仅在元数据仓中注册 Hudi 数据集表时才需要以下选项。如果您未将 Hudi 数据集注册为 Hive 元数据仓中的表,则不需要这些选项。

DataSourceWriteOptions Hive参考
选项 Description

HIVE_ASSUME_DATE_PARTITION_OPT_KEY

有效值为 "true""false"。默认值为 "false"。当设置为 "true", Hudi 假设分区是 yyyy/mm/dd.

HIVE_DATABASE_OPT_KEY

要同步到的 Hive 数据库。默认值为 "default"

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

用于将分区字段值提取到 Hive 分区列中的类。

HIVE_PARTITION_FIELDS_OPT_KEY

数据集中用于确定 Hive 分区列的字段。

HIVE_SYNC_ENABLED_OPT_KEY

设置为 "true" 时,将向 Apache Hive 元数据仓注册数据集。默认值为 "false"

HIVE_TABLE_OPT_KEY

必填项。Hive 中要同步到的表的名称。例如:"my_hudi_table_cow"

HIVE_USER_OPT_KEY

可选。同步时要使用的 Hive 用户名。例如:"hadoop"

HIVE_PASS_OPT_KEY

可选。由 HIVE_USER_OPT_KEY 指定的用户的 Hive 密码。

HIVE_URL_OPT_KEY

Hive 元数据仓 URL。

正在更新插入数据

以下示例演示了如何编写 DataFrame 以执行upserts。与插入示例不同, OPERATION_OPT_KEY 值设置为 UPSERT_OPERATION_OPT_VAL。此外 .mode(SaveMode.Append) 指定以指示记录应附加。

inputDF4.write   .format("org.apache.hudi")   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)   .options(hudiOptions)   .mode(SaveMode.Append)   .save("s3://mybucket/myhudidataset/")

通过更新插入空负载来删除记录

要删除前面示例中指定的相同记录,此示例再次使用更新插入操作。在这种情况下,PAYLOAD_CKLASS_OPT_KEY 选项指定 EmptyHoodieRecordPayload 类。该示例使用相同的 DataFrame, inputDF4,在upsert示例中使用,以便指定相同的记录。

inputDF4.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://mybucket/myhudidataset/")