使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录访问 Amazon S3 表
可以使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录客户端目录,从开源查询引擎(例如 Apache Spark)访问 S3 表。适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录是由 Amazon 实验室托管的开源库。它的工作原理是将查询引擎中的 Apache Iceberg 操作(例如表发现、元数据更新以及添加或移除表)转换为 S3 表类数据存储服务类 API 操作。
适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录以名为 s3-tables-catalog-for-iceberg.jar
的 Maven JAR 形式分发。可以从 Amazon Labs GitHub repository
将适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录与 Apache Spark 结合使用
初始化 Spark 会话时,可以使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录客户端目录,从开源应用程序连接到表。在会话配置中,您可以指定 Iceberg 和 Amazon S3 依赖关系,并创建将表存储桶用作元数据仓库的自定义目录。
先决条件
可以访问表存储桶和 S3 表类数据存储服务操作的 IAM 身份。有关更多信息,请参阅 S3 表类数据存储服务的访问管理。
使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录初始化 Spark 会话
-
使用以下命令初始化 Spark。要使用该命令,请将适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录
版本号
替换为 Amazon Labs GitHub repository中的最新版本,并将 表存储桶 ARN
替换为您自己的表存储桶 ARN。spark-shell \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:
0.1.4
\ --conf spark.sql.catalog.s3tablesbucket
=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.s3tablesbucket
.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \ --conf spark.sql.catalog.s3tablesbucket
.warehouse=arn:aws:s3tables:us-east-1
:111122223333
:bucket/amzn-s3-demo-table-bucket
\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
使用 Spark SQL 查询 S3 表
使用 Spark,您可以对 S3 表运行 DQL、DML 和 DDL 操作。查询表时,请使用完全限定的表名称,包括遵循以下规律的会话目录名称:
CatalogName
.NamespaceName
.TableName
以下示例查询显示了您可以与 S3 表进行交互的一些方式。要在查询引擎中使用这些示例查询,请将用户输入占位符
值替换为您自己的信息。
使用 Spark 查询表
-
创建命名空间
spark.sql(" CREATE NAMESPACE IF NOT EXISTS
s3tablesbucket
.my_namespace
") -
创建表
spark.sql(" CREATE TABLE IF NOT EXISTS
s3tablesbucket
.my_namespace
.`my_table
` ( id INT, name STRING, value INT ) USING iceberg ") -
查询表
spark.sql(" SELECT * FROM
s3tablesbucket
.my_namespace
.`my_table
` ").show() -
将数据插入表中
spark.sql( """ INSERT INTO
s3tablesbucket
.my_namespace
.my_table
VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) -
将现有数据文件加载到表中
将数据读入 Spark。
val data_file_location = "Path such as S3 URI to data file" val data_file = spark.read.parquet(
data_file_location
)将数据写入 Iceberg 表。
data_file.writeTo("
s3tablesbucket
.my_namespace
.my_table
").using("Iceberg").tableProperty ("format-version", "2").createOrReplace()