Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Iceberg 集群与 Spark 结合使用
从 Amazon EMR 版本 6.5.0 开始,您可以将 Iceberg 与 Spark 集群配合使用,无需包含引导操作。对于 Amazon 6.4.0 及更早EMR版本,您可以使用引导操作来预安装所有必要的依赖项。
在本教程中,您将使用在 Amazon EMR Spark 集群上使用 Iceberg。 Amazon CLI 要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Ath ena、Amazon 和 Glue 构建 Apache Iceberg 数据湖中的步骤进行操作。EMR Amazon
创建 Iceberg 集群
您可以使用 Amazon Web Services Management Console、 Amazon CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中,您将使用在 Amazon CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Ath ena、Amazon 和 Glue 构建 Apache Iceberg 数据湖中的步骤进行操作。EMR Amazon
要将 Amazon 上的 Iceberg EMR 与一起使用 Amazon CLI,请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 Amazon CLI,请参阅创建集群 Amazon CLI 时使用提供配置或创建集群SDK时使用 Java 提供配置。
-
创建 configurations.json
文件并输入以下内容:
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
-
接下来,使用以下配置创建集群。将实例 Amazon S3 桶路径和子网 ID 替换为您自己的值。
aws emr create-cluster --release-label emr-6.5.0 \
--applications Name=Spark \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_Spark_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/
\
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
或者,您可以创建包含 Spark 应用程序的 Amazon EMR 集群,并将该文件/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
作为JAR依赖项包含在 Spark 任务中。有关更多信息,请参阅提交应用程序。
要将 jar 作为 Spark 作业中的依赖项包含在内,请将以下配置属性添加到 Spark 应用程序中:
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
有关 Spark 作业依赖项的更多信息,请参阅 Apache Spark 文档 Running Spark on Kubernetes(在 Kubernetes 上运行 Spark)中的 Dependency Management(依赖项管理)。
为 Iceberg 初始化 Spark 会话
以下示例演示如何启动交互式 Spark 外壳、使用 Spark 提交或使用亚马逊EMR笔记本在亚马逊EMR上使用 Iceberg。
- spark-shell
-
-
使用连接到主节点SSH。有关更多信息,请参阅《Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。
-
输入以下命令以启动 Spark shell。要使用 PySpark 外壳,请spark-shell
替换为pyspark
。
spark-shell \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.dev.type=hadoop" \
--conf "spark.sql.catalog.dev.warehouse=s3://amzn-s3-demo-bucket
/example-prefix
/"
- spark-submit
-
-
使用连接到主节点SSH。有关更多信息,请参阅《Amazon EMR 管理指南》SSH中的 “使用连接到主节点”。
-
输入以下命令以为 Iceberg 启动 Spark 会话。
spark-submit \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.dev.type=hadoop" \
--conf "spark.sql.catalog.dev.warehouse=s3://amzn-s3-demo-bucket
/example-prefix
/"
- EMR Studio notebooks
-
要使用 EMR Studio 笔记本初始化 Spark 会话,请使用亚马逊EMR笔记本中的%%configure
神奇命令配置 Spark 会话,如下例所示。有关更多信息,请参阅《Amazon EMR 管理指南》中的 “使用EMR笔记本魔法”。
%%configure -f
{
"conf":{
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dev.type":"hadoop",
"spark.sql.catalog.dev.warehouse":"s3://amzn-s3-demo-bucket
/example-prefix
/"
}
}
写入 Iceberg 表
以下示例说明如何创建 DataFrame 并将其写为 Iceberg 数据集。这些示例演示了以默认 hadoop 用户SSH身份连接到主节点时使用 Spark shell 处理数据集。
要将代码示例粘贴到 Spark Shell 中,请在提示符处键入 :paste
,粘贴示例,然后按 CTRL+D
。
- PySpark
-
Spark 包含一个基于 Python 的 Shell pyspark
,您可以用它来设计以 Python 编写的 Spark 程序的原型。在主节点上调用 pyspark
。
## Create a DataFrame.
data = spark.createDataFrame([
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])
## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket
/example-prefix
/db/iceberg_table'""")
data.writeTo("dev.db.iceberg_table").append()
- Scala
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")
// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://amzn-s3-demo-bucket
/example-prefix
/db/iceberg_table'""")
data.writeTo("dev.db.iceberg_table").append()
从 Iceberg 表读取
- PySpark
-
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
- Scala
-
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
- Spark SQL
-
SELECT * from dev.db.iceberg_table LIMIT 10
将 Spark 属性配置为使用 Glue Amazon 数据目录作为 Iceberg 表元存储
要使用 Glue Amazon Catalog 作为 Iceberg 表格的元数据仓,请按如下方式设置 Spark 配置属性:
spark-submit \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager \
--conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable