使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录访问 Amazon S3 表 - Amazon Simple Storage Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录访问 Amazon S3 表

可以使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录客户端目录,从开源查询引擎(例如 Apache Spark)访问 S3 表。适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录是由 Amazon 实验室托管的开源库。它的工作原理是将查询引擎中的 Apache Iceberg 操作(例如表发现、元数据更新以及添加或移除表)转换为 S3 表类数据存储服务类 API 操作。

适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录以名为 s3-tables-catalog-for-iceberg.jar 的 Maven JAR 形式分发。可以从 Amazon Labs GitHub repository 中构建客户端目录 JAR,也可以从 Maven 下载它。连接到表时,在为 Apache Iceberg 初始化 Spark 会话的过程中,客户端目录 JAR 将用作依赖关系。

将适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录与 Apache Spark 结合使用

初始化 Spark 会话时,可以使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录客户端目录,从开源应用程序连接到表。在会话配置中,您可以指定 Iceberg 和 Amazon S3 依赖关系,并创建将表存储桶用作元数据仓库的自定义目录。

先决条件
使用适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录初始化 Spark 会话
  • 使用以下命令初始化 Spark。要使用该命令,请将适用于 Apache Iceberg 的 Amazon S3 表类数据存储服务目录版本号替换为 Amazon Labs GitHub repository 中的最新版本,并将表存储桶 ARN 替换为您自己的表存储桶 ARN。

    spark-shell \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.4 \ --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \ --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:us-east-1:111122223333:bucket/amzn-s3-demo-table-bucket \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

使用 Spark SQL 查询 S3 表

使用 Spark,您可以对 S3 表运行 DQL、DML 和 DDL 操作。查询表时,请使用完全限定的表名称,包括遵循以下规律的会话目录名称:

CatalogName.NamespaceName.TableName

以下示例查询显示了您可以与 S3 表进行交互的一些方式。要在查询引擎中使用这些示例查询,请将用户输入占位符值替换为您自己的信息。

使用 Spark 查询表
  • 创建命名空间

    spark.sql(" CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.my_namespace")
  • 创建表

    spark.sql(" CREATE TABLE IF NOT EXISTS s3tablesbucket.my_namespace.`my_table` ( id INT, name STRING, value INT ) USING iceberg ")
  • 查询表

    spark.sql(" SELECT * FROM s3tablesbucket.my_namespace.`my_table` ").show()
  • 将数据插入表中

    spark.sql( """ INSERT INTO s3tablesbucket.my_namespace.my_table VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """)
  • 将现有数据文件加载到表中

    1. 将数据读入 Spark。

      val data_file_location = "Path such as S3 URI to data file" val data_file = spark.read.parquet(data_file_location)
    2. 将数据写入 Iceberg 表。

      data_file.writeTo("s3tablesbucket.my_namespace.my_table").using("Iceberg").tableProperty ("format-version", "2").createOrReplace()