Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Glue 3.0 及更高版本支持数据湖的 Apache Hudi 框架。Hudi 是一个开源数据湖存储框架,简化增量数据处理和数据管道开发。本主题涵盖了在 Hudi 表中传输或存储数据时,在 Amazon Glue 中使用数据的可用功能。要了解有关 Hudi 的更多信息,请参阅 Apache Hudi 官方文档。
您可以使用 Amazon Glue 对 Amazon S3 中的 Hudi 表执行读写操作,也可以使用 Amazon Glue 数据目录处理 Hudi 表。还支持其他操作,包括插入、更新和所有 Apache Spark 操作。
Apache Hudi 0.10.1 for Amazon Glue 3.0 不支持 Read(MoR)表上的 Hudi Merge。
下表列出了 Amazon Glue 每个版本中包含的 Hudi 版本。
Amazon Glue 版本 |
支持的 Hudi 版本 |
4.0 |
0.12.1 |
3.0 |
0.10.1 |
要了解有关 Amazon Glue 支持的数据湖框架的更多信息,请参阅在 Amazon Glue ETL 任务中使用数据湖框架。
要为 Amazon Glue 启用 Hudi,请完成以下任务:
-
指定 hudi
作为 --datalake-formats
作业参数的值。有关更多信息,请参阅Amazon Glue 作业参数:
-
--conf
为 Glue 作业创建一个名为 Amazon 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf
设置以下配置。这些设置有助于 Apache Spark 正确处理 Hudi 表。
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
使用不同的 Hudi 版本
要使用 Amazon Glue 不支持的 Hudi 版本,请使用 --extra-jars
作业参数指定您自己的 Hudi JAR 文件。请勿使用 hudi
作为 --datalake-formats
作业参数的值。
以下示例脚本脚本演示了如何将 Hudi 表写入 Amazon S3,并将该表注册到 Amazon Glue 数据目录。该示例使用 Hudi Hive 同步工具来注册该表。
此示例要求您设置 --enable-glue-datacatalog
任务参数,才能将 Amazon Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 Amazon Glue 作业参数。
- Python
-
# Example: Create a Hudi table from a DataFrame
# and register the table to Glue Data Catalog
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"path": "s3://<s3Path/>
"
}
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save()
- Scala
-
// Example: Example: Create a Hudi table from a DataFrame
// and register the table to Glue Data Catalog
val additionalOptions = Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> "s3://<s3Path/>
")
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
此示例从 Amazon S3 读取您在 示例:将 Hudi 表写入 Amazon S3 并将其注册到 Amazon Glue 数据目录中 中创建的 Hudi 表。
此示例要求您设置 --enable-glue-datacatalog
任务参数,才能将 Amazon Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 Amazon Glue 作业参数。
- Python
-
在本示例中,使用 GlueContext.create_data_frame.from_catalog()
方法。
# Example: Read a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
dataFrame = glueContext.create_data_frame.from_catalog(
database = "<your_database_name>
",
table_name = "<your_table_name>
"
)
- Scala
-
在本示例中,使用 getCatalogSource 方法。
// Example: Read a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val dataFrame = glueContext.getCatalogSource(
database = "<your_database_name>
",
tableName = "<your_table_name>
"
).getDataFrame()
}
}
此示例使用Amazon Glue Data Catalog 将 a DataFrame 插入到您在中创建的 Hudi 表中示例:将 Hudi 表写入 Amazon S3 并将其注册到 Amazon Glue 数据目录中。
此示例要求您设置 --enable-glue-datacatalog
任务参数,才能将 Amazon Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 Amazon Glue 作业参数。
- Python
-
在本示例中,使用 GlueContext.write_data_frame.from_catalog()
方法。
# Example: Upsert a Hudi table from Glue Data Catalog
from awsglue.context import GlueContext
from pyspark.context import SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.write_data_frame.from_catalog(
frame = dataFrame,
database = "<your_database_name>
",
table_name = "<your_table_name>
",
additional_options={
"hoodie.table.name": "<your_table_name>
",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field": "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "<your_database_name>
",
"hoodie.datasource.hive_sync.table": "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms"
}
)
- Scala
-
在本示例中,使用 getCatalogSink 方法。
// Example: Upsert a Hudi table from Glue Data Catalog
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions
import org.apacke.spark.SparkContext
object GlueApp {
def main(sysArgs: Array[String]): Unit = {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.getCatalogSink("<your_database_name>
", "<your_table_name>
",
additionalOptions = JsonOptions(Map(
"hoodie.table.name" -> "<your_table_name>
",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>
",
"hoodie.datasource.write.precombine.field" -> "<your_precombine_field>
",
"hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>
",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> "<your_database_name>
",
"hoodie.datasource.hive_sync.table" -> "<your_table_name>
",
"hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>
",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms"
)))
.writeDataFrame(dataFrame, glueContext)
}
}
此示例使用 Spark DataFrame API 从 Amazon S3 读取 Hudi 表。
- Python
-
# Example: Read a Hudi table from S3 using a Spark DataFrame
dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
- Scala
-
// Example: Read a Hudi table from S3 using a Spark DataFrame
val dataFrame = spark.read.format("hudi").load("s3://<s3path/>
")
示例:使用 Spark 向 Amazon S3 写入 Hudi 表
- Python
-
# Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi") \
.options(**additional_options) \
.mode("overwrite") \
.save("s3://<s3Path/>
)
- Scala
-
// Example: Write a Hudi table to S3 using a Spark DataFrame
dataFrame.write.format("hudi")
.options(additionalOptions)
.mode("overwrite")
.save("s3://<s3path/>
")