将 Iceberg 集群与 Flink 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Iceberg 集群与 Flink 结合使用

从 Amazon EMR 版本 6.9.0 开始,您可以将 Iceberg 与 Flink 集群结合使用,而无需使用开源 Iceberg Flink 集成时所需的设置步骤。

创建 Iceberg 集群

您可以使用 Amazon Web Services 管理控制台、Amazon CLI 或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中,您将通过 Amazon CLI 在 Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Athena、Amazon EMR 和 Amazon Glue 构建 Apache Iceberg 数据湖中的步骤操作。

要在 Amazon EMR 上将 Iceberg 与 Amazon CLI 结合使用,请首先按照以下步骤创建一个集群。有关使用 Amazon CLI 指定 Iceberg 分类的信息,请参阅在创建集群时使用 Amazon CLI 提供配置在创建集群时,使用 Java SDK 提供配置。使用以下内容创建名为 configurations.json 的文件:

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

接下来,使用以下配置创建集群,将示例 Amazon S3 桶路径和子网 ID 替换为您自己的值:

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

您还可以创建一个其中包含 Flink 应用程序的 Amazon EMR 6.9.0 集群,并且将文件 /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar 用作 Flink 作业中的 JAR 依赖项。

SQL 客户端脚本位于 /usr/lib/flink/bin 下。您可以使用以下命令运行脚本:

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

这将启动 Flink SQL Shell。

创建 Iceberg 表

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

表 API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

写入 Iceberg 表

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

表 API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

数据流 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

从 Iceberg 表读取

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

表 API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

数据流 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

使用 Hive 目录

确保如 使用 Hive 元存储和 Glue 目录配置 Flink 中所述解析 Flink 和 Hive 依赖项。

向 Flink 提交作业的一种方法是使用每个作业的 Flink YARN 会话。这可以通过以下命令启动:

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
  • 使用 Amazon Glue 作为 Iceberg 的目录时,请确保您在其中创建表的数据库存在于 Amazon Glue 中。如果您使用的是类似 Amazon Lake Formation 的服务并且无法加载目录,请确保您有访问该服务的适当权限来执行命令。

  • Iceberg Glue 集成不适用于 Redshift 托管存储目录。