将 Iceberg 集群与 Spark 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Iceberg 集群与 Spark 结合使用

从 Amazon EMR 版本 6.5.0 开始,您可以将 Iceberg 用于您的 Spark 集群,无需包含引导操作。对于 Amazon EMR 版本 6.4.0 及更早版本,您可以使用引导操作来预装所有需要的依赖项。

在本教程中,您将使用在 Amazon EMR Spark 集群上使用 Iceberg。 Amazon CLI 要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Athena、Amazon EMR 和 Amazon Glue 构建 Apache Iceberg 数据湖中的步骤操作。

创建 Iceberg 集群

您可以使用 Amazon Web Services Management Console、 Amazon CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中,您将使用在 Amazon CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Athena、Amazon EMR 和 Amazon Glue 构建 Apache Iceberg 数据湖中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 Amazon CLI使用,请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 Amazon CLI,请参阅创建集群 Amazon CLI 时使用提供配置在创建集群时,使用 Java SDK 提供配置

  1. 创建 configurations.json 文件并输入以下内容:

    [{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
  2. 接下来,使用以下配置创建集群。将实例 Amazon S3 桶路径和子网 ID 替换为您自己的值。

    aws emr create-cluster --release-label emr-6.5.0 \ --applications Name=Spark \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_Spark_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

您还可以创建一个包含 Spark 应用程序的 Amazon EMR 集群,并且将文件 /usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar 作为 Spark 任务中的 JAR 依赖关系包含在内。有关更多信息,请参阅提交应用程序

要将 jar 作为 Spark 作业中的依赖项包含在内,请将以下配置属性添加到 Spark 应用程序中:

--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"

有关 Spark 作业依赖项的更多信息,请参阅 Apache Spark 文档 Running Spark on Kubernetes(在 Kubernetes 上运行 Spark)中的 Dependency Management(依赖项管理)。

为 Iceberg 初始化 Spark 会话

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交,或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Iceberg。

spark-shell
  1. 使用 SSH 连接主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请spark-shell替换为pyspark

    spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
spark-submit
  1. 使用 SSH 连接主节点。有关更多信息,请参阅《Amazon EMR 管理指南》中的使用 SSH 连接到主节点

  2. 输入以下命令以为 Iceberg 启动 Spark 会话。

    spark-submit \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
EMR Studio notebooks

要使用 EMR Studio notebooks 初始化 Spark 会话,请使用 Amazon EMR notebook 中的 %%configure 魔法命令配置 Spark 会话,如以下示例所示。有关更多信息,请参阅 Amazon EMR 管理指南中的使用 EMR Notebooks 魔法命令

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.type":"hadoop", "spark.sql.catalog.dev.warehouse":"s3://DOC-EXAMPLE-BUCKET/example-prefix/" } }

写入 Iceberg 表

以下示例说明如何创建 DataFrame 并将其写为 Iceberg 数据集。这些示例演示使用 Spark Shell 处理数据集,同时使用 SSH 作为原定设置将 hadoop 用户连接到主节点(master node)。

注意

要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste,粘贴示例,然后按 CTRL+D

PySpark

Spark 包含一个基于 Python 的 Shell pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。在主节点上调用 pyspark

## Create a DataFrame. data = spark.createDataFrame([ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ],["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame. val data = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()

从 Iceberg 表读取

PySpark
df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Scala
val df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Spark SQL
SELECT * from dev.db.iceberg_table LIMIT 10

将 Spark 属性配置为使用 Glue Amazon 数据目录作为 Iceberg 表元存储

要使用 Glue Amazon Catalog 作为 Iceberg 表格的元数据仓,请按如下方式设置 Spark 配置属性:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \ --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager \ --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable