Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions,
see Getting Started with Amazon Web Services in China
(PDF).
Running a Flink
application
With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes
operator in Application mode on Amazon EMR on EKS. With Amazon EMR 6.15.0 and higher, you can also run a
Flink application in Session mode. This page describes both methods that you can use to run a
Flink application with Amazon EMR on EKS.
You must have an Amazon S3 bucket created to store the high-availability metadata when you
submit your Flink job. If you don’t want to use this feature, you can disable it. It's
enabled by default.
Prerequisite – Before you can run a Flink application with the Flink Kubernetes operator, complete
the steps in Setting up the Flink Kubernetes
operator for Amazon EMR on EKS and Installing the operator.
- Application mode
-
With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes
operator in Application mode on Amazon EMR on EKS.
-
Create a FlinkDeployment
file definition file
basic-example-app-cluster.yaml
with the following example content:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example-app-cluster
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17
executionRoleArn: JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
jobManager:
storageDir: HIGH_AVAILABILITY_STORAGE_PATH
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
# if you have your job jar in S3 bucket you can use that path as well
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
savepointTriggerNonce: 0
monitoringConfiguration:
cloudWatchMonitoringConfiguration:
logGroupName: LOG_GROUP_NAME
-
Submit the Flink deployment with the following command. This will also create a
FlinkDeployment
object named basic-example-app-cluster
.
kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
-
Access the Flink UI.
kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
-
Open localhost:8081
to view your Flink jobs locally.
-
Clean up the job. Remember to clean up the S3 artifacts that were created for this
job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch
logs.
For more information on submitting applications to Flink through the Flink Kubernetes
operator, see Flink
Kubernetes operator examples in the apache/flink-kubernetes-operator
folder on GitHub.
- Session mode
-
With Amazon EMR 6.15.0 and higher, you can run a Flink application with the Flink Kubernetes
operator in Session mode on Amazon EMR on EKS.
-
Create a FlinkDeployment
file definition file
basic-example-session-cluster.yaml
with the following example content:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example-session-cluster
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17
executionRoleArn: JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.15.0
-flink-latest"
jobManager:
storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
monitoringConfiguration:
s3MonitoringConfiguration:
logUri:
cloudWatchMonitoringConfiguration:
logGroupName: LOG_GROUP_NAME
-
Submit the Flink deployment with the following command. This will also create a
FlinkDeployment
object named basic-example-session-cluster
.
kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
Use the following command to confirm that the session cluster LIFECYCLE
is STABLE
:
kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
The output should be similar to the following example:
NAME JOB STATUS LIFECYCLE STATE
basic-example-session-cluster STABLE
Create a FlinkSessionJob
custom definition resource file basic-session-job.yaml
with the following example content:
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job
spec:
deploymentName: basic-session-deployment
job:
# If you have your job jar in an S3 bucket you can use that path.
# To use jar in S3 bucket, set
# OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN
)
# when you install Spark operator
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 2
upgradeMode: stateless
Submit the Flink session job with the following command. This will create a FlinkSessionJob
object basic-session-job
.
kubectl apply -f basic-session-job.yaml -n $NAMESPACE
Use the following command to confirm that the session cluster LIFECYCLE
is STABLE
, and the JOB STATUS
is RUNNING
:
kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
The output should be similar to the following example:
NAME JOB STATUS LIFECYCLE STATE
basic-example-session-cluster RUNNING STABLE
-
Access the Flink UI.
kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
-
Open localhost:8081
to view your Flink jobs locally.
-
Clean up the job. Remember to clean up the S3 artifacts that were created for this
job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch
logs.