Use a Delta Lake cluster with Flink
With Amazon EMR release 6.11 and higher, you can use Delta Lake with your Flink cluster. The following examples use the Amazon CLI to work with Delta Lake on an Amazon EMR Flink cluster.
Note
Amazon EMR supports the Flink DataStream API when you use Delta Lake with a Flink cluster.
Create a Delta Lake cluster
-
Create a file,
delta_configurations.json
, with the following content:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
Create a cluster with the following configuration. Replace the
example Amazon S3 bucket path
and thesubnet ID
with your own.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Initialize a Flink yarn session
To initialize a Flink yarn session, run the following command:
flink-yarn-session -d
Build a Flink job with Delta Lake
The following examples show how to use sbt or Maven to build your Flink job with Delta Lake.
Write to a Delta table with Flink Datastream API
Use the following example to create a DeltaSink to write to the table with a
deltaTablePath:
public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
Read from a Delta table with Flink Datastream API
Use the following example to create a bounded DeltaSource to read from the
table with a deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(
deltaTablePath
), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }
Sink creation with multi-cluster support for Delta Lake standalone
Use the following example to create a DeltaSink to write to table with a
deltaTablePath
and multi cluster support
public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "
delta_log
"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
Run the Flink job
Use the following command to run your job:
flink run FlinkJob.jar