将 Iceberg 集群与 Flink 结合使用 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Iceberg 集群与 Flink 结合使用

从 Amazon EMR 版本 6.9.0 开始,您可以将 Iceberg 与 Flink 集群结合使用,而无需使用开源 Iceberg Flink 集成时所需的设置步骤。

创建 Iceberg 集群

您可以使用 Amazon Web Services Management Console、 Amazon CLI或 Amazon EMR API 创建安装了 Iceberg 的集群。在本教程中,您将使用在 Amazon CLI Amazon EMR 集群上使用 Iceberg。要使用控制台创建安装了 Iceberg 的集群,请按照使用 Amazon Athena、Amazon EMR 和 Amazon Glue 构建 Apache Iceberg 数据湖中的步骤操作。

要将 Amazon EMR 上的 Iceberg 与一起 Amazon CLI使用,请先按照以下步骤创建一个集群。有关使用指定 Iceberg 分类的信息 Amazon CLI,请参阅创建集群 Amazon CLI 时使用提供配置在创建集群时,使用 Java SDK 提供配置。使用以下内容创建名为 configurations.json 的文件:

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

接下来,使用以下配置创建集群,将示例 Amazon S3 桶路径和子网 ID 替换为您自己的值:

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

您还可以创建一个其中包含 Flink 应用程序的 Amazon EMR 6.9.0 集群,并且将文件 /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar 用作 Flink 作业中的 JAR 依赖项。

SQL 客户端脚本位于 /usr/lib/flink/bin 下。您可以使用以下命令运行脚本:

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

这将启动 Flink SQL Shell。

创建 Iceberg 表

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.dynamodb.DynamoDbLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

表 API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

写入 Iceberg 表

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

表 API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

数据流 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

从 Iceberg 表读取

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

表 API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

数据流 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

使用 Hive 目录

确保如 使用 Hive 元存储和 Glue 目录配置 Flink 中所述解析 Flink 和 Hive 依赖项。

向 Flink 提交作业的一种方法是使用每个作业的 Flink YARN 会话。这可以通过以下命令启动:

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME