本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Delta Lake 集群与 Flink 结合使用
从 Amazon EMR 6.11 版本开始,您可以将 Delta Lake 与您的 Flink 集群结合使用。以下示例使用在 Amazon EMR Flink 集群上使用 Delta Lake。 Amazon CLI
注意
当你将 Delta Lake 与 Flink 集群配合使用时,亚马逊 EMR 支持 Flink DataStream API。
创建 Delta Lake 集群
-
创建文件
delta_configurations.json
并输入以下内容:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
使用以下配置创建集群。在该 URL 中,将
example Amazon S3 bucket path
和subnet ID
替换为您自己的值。aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
初始化 Flink yarn 会话
要初始化 Flink yarn 会话,请运行以下命令:
flink-yarn-session -d
使用 Delta Lake 创建 Flink 作业
以下示例展示如何使用 sbt 或 Maven 在 Delta Lake 中构建 Flink 作业。
通过 Flink Datastream API 写入 Delta 表
使用以下示例创建要写入 DeltaSink 到带有 a 的表中 deltaTablePath:
public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
通过 Flink Datastream API 从 Delta 表中读取
使用以下示例创建要从表中读 DeltaSource 取的有界值,其中带有 deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(
deltaTablePath
), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }
使用对 Delta Lake 独立版的多集群支持创建接收器
使用以下示例创建 DeltaSink 支持deltaTablePath
和多集群
public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "
delta_log
"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
运行 Flink 作业
使用下列命令以运行您的作业:
flink run FlinkJob.jar