使用 Hudi 数据集 - Amazon EMR
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

使用 Hudi 数据集

Hudi 支持通过 Spark 在 Hudi 数据集中插入、更新和删除数据。有关更多信息,请参阅 Apache Hudi 文档中的写入 Hudi 表格

以下示例演示如何启动交互式 Spark shell、使用 Spark 提交以及如何使用 Amazon EMR 笔记本 在 Amazon EMR 上处理 Hudi。您也可以使用 Hudi DeltaStreamer 实用程序或其他工具来写入数据集。在本节中,示例演示使用 Spark shell 处理数据集,同时使用 SSH 作为默认 hadoop 用户连接到主节点。

Spark Shell

在主节点上打开 Spark shell

  1. 使用 SSH 连接主节点。有关更多信息,请参阅 Amazon EMR 管理指南 中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Spark shell。

    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark-submit

要提交使用 Hudi 的 Spark 应用程序,请确保将以下参数传递给 spark-submit。

spark-submit --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR 笔记本

要 Hudi 与 Amazon EMR 笔记本 结合使用,您必须首先将 Hudi jar 文件从本地文件系统复制到笔记本集群的主节点上的 HDFS。然后,您可以使用笔记本编辑器来配置 EMR 笔记本 以使用 Hudi。

将 Hudi 与 Amazon EMR 笔记本 结合使用

  1. 为 Amazon EMR 笔记本 创建并启动集群。有关更多信息,请参阅 Amazon EMR 管理指南 中的为笔记本创建 Amazon EMR 集群

  2. 使用 SSH 连接到集群的主节点,然后将 jar 文件从本地文件系统复制到 HDFS,如以下示例所示。在此示例中,我们在 HDFS 中创建了一个目录,以便清晰地管理文件。如果需要,您可以在 HDFS 中选择自己的目的地。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. 打开笔记本编辑器,输入以下示例中的代码,然后运行它。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

初始化 Hudi 的 Spark 会话

在 Spark 会话中,导入以下类。这需要在每个 Spark 会话中完成一次。

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

写入 Hudi 数据集

以下示例演示如何将 Spark DataFrame 作为 Hudi 数据集写入。

注意

要将代码示例粘贴到 Spark shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL + D

每次向 Hudi 数据集写入 DataFrame 时,都必须指定 DataSourceWriteOptions。这些选项中的许多选项在写入操作之间可能是相同的。以下示例使用 hudiOptions 变量指定常用选项,随后的示例使用这些选项。

// Read data from S3 and create a DataFrame with Partition and Record Key val inputDF = spark.read.format("parquet").load("s3://mybucket/mydata/parquet/") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME → "my_hudi_table", DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "dataframe_column_name1", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"dataframe_column_name2", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "dataframe_column_name3" DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "my_hudi_table", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "dataframe_column_name2", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName ) // Write a DataFrame as a Hudi dataset inputDF.write   .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://mybucket/myhudidataset/")
注意

STORAGE_TYPE_OPT_KEY 选项在示例中将数据集类型建立为 CoW。CoW 是默认设置,因此不需要此选项;为了清晰起见,我们将其包含在内。

Hudi 的 DataSourceWriteOptions 首选项
选项 描述

PARTITIONPATH_FIELD_OPT_KEY

其值将用作 HoodieKeypartitionPath 组件的分区路径字段。实际值将通过对字段值调用 .toString() 来获得。

PRECOMBINE_FIELD_OPT_KEY

在实际写入之前在预合并中使用的字段。如果两个记录具有相同的键值,Hudi 为预合并选择字段值最大的记录(由 Object.compareTo(..) 确定)。

RECORDKEY_FIELD_OPT_KEY

其值将用作 HoodieKeyrecordKey 组件的记录键字段。实际值将通过对字段值调用 .toString() 来获得。可以使用点表示法指定嵌套字段,例如 a.b.c

STORAGE_TYPE_OPT_KEY

可选。指定数据集是创建为 "COPY_ON_WRITE" 还是 "MERGE_ON_READ"。默认为 "COPY_ON_WRITE"

TABLE_NAME

要在其中注册数据集的表名称。

仅在元数据仓中注册 Hudi 数据集表时才需要以下选项。如果您未将 Hudi 数据集注册为 Hive 元数据仓中的表,则不需要这些选项。

Hive 的 DataSourceWriteOptions 首选项
选项 描述

HIVE_ASSUME_DATE_PARTITION_OPT_KEY

有效值为 "true""false"。默认为 "false"。设置为 "true" 时,Hudi 假定分区为 yyyy/mm/dd

HIVE_DATABASE_OPT_KEY

要同步到的 Hive 数据库。默认为 "default"

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

用于将分区字段值提取到 Hive 分区列中的类。

HIVE_PARTITION_FIELDS_OPT_KEY

数据集中用于确定 Hive 分区列的字段。

HIVE_SYNC_ENABLED_OPT_KEY

设置为 "true" 时,将向 Apache Hive 元数据仓注册数据集。默认为 "false"

HIVE_TABLE_OPT_KEY

必填项。Hive 中要同步到的表的名称。例如:"my_hudi_table_cow"

HIVE_USER_OPT_KEY

可选。同步时要使用的 Hive 用户名。例如:"hadoop"

HIVE_PASS_OPT_KEY

可选。由 HIVE_USER_OPT_KEY 指定的用户的 Hive 密码。

HIVE_URL_OPT_KEY

Hive 元数据仓 URL。

正在更新插入数据

以下示例演示如何编写 DataFrame 来执行更新插入。与插入示例不同,OPERATION_OPT_KEY 值设置为 UPSERT_OPERATION_OPT_VAL。此外,还指定 .mode(SaveMode.Append) 以指示应追加记录。

inputDF4.write   .format("org.apache.hudi")   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)   .options(hudiOptions)   .mode(SaveMode.Append)   .save("s3://mybucket/myhudidataset/")

通过更新插入空负载来删除记录

要删除前面示例中指定的相同记录,此示例再次使用更新插入操作。在这种情况下,PAYLOAD_CKLASS_OPT_KEY 选项指定 EmptyHoodieRecordPayload 类。该示例使用更新插入示例中使用的相同 DataFrame inputDF4,以便指定相同的记录。

inputDF4.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://mybucket/myhudidataset/")