本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
运行 Flink 应用程序
在 Amazon EMR 6.13.0 及更高版本中,您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。在 Amazon EMR 6.15.0 及更高版本中,您还可以在会话模式中运行 Flink 应用程序。本页介绍您使用 EKS 上的 Amazon EMR 运行 Flink 应用程序时可用的两种方法。
主题
注意
提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。
先决条件 – 在使用 Flink Native Kubernetes Operator 运行 Flink 应用程序之前,请先完成 设置 Amazon EMR on EKS 的 Flink Kubernetes Operator 和 安装 Kubernetes Operator 中的步骤。
- Application mode
-
在 Amazon EMR 6.13.0 及更高版本中,您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。
-
创建一个名为
basic-example-app-cluster.yaml
的FlinkDeployment
定义文件,如下例所示。如果您已激活并使用其中一个选项 Amazon Web Services 区域,请务必取消注释并配置配置。fs.s3a.endpoint.region
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-app-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:
OPT_IN_AWS_REGION_NAME
state.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir:SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher jobManager: storageDir:HIGH_AVAILABILITY_STORAGE_PATH
resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: # if you have your job jar in S3 bucket you can use that path as well jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint savepointTriggerNonce: 0 monitoringConfiguration: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME
-
使用如下命令提交 Flink 部署。此操作还会创建一个名为
basic-example-app-cluster
的FlinkDeployment
对象。kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
-
访问 Flink UI。
kubectl port-forward deployments/basic-example-app-cluster 8081 -n
NAMESPACE
-
打开
localhost:8081
即可在本地查看 Flink 任务。 -
清理任务。记得清理为此任务创建的 S3 项目,例如检查点、高可用性、保存点元数据和日志。 CloudWatch
有关通过 Flink Kubernetes 运算符向 Flink 提交应用程序的更多信息,请参阅文件夹中的 Flink K uber
netes 运算符示例。 apache/flink-kubernetes-operator
GitHub -
- Session mode
-
在 Amazon EMR 6.15.0 及更高版本中,您可以在会话模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。
-
创建一个名为
basic-example-app-cluster.yaml
的FlinkDeployment
定义文件,如下例所示。如果您已激活并使用其中一个选项 Amazon Web Services 区域,请务必取消注释并配置配置。fs.s3a.endpoint.region
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-session-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:
OPT_IN_AWS_REGION_NAME
state.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir:SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.15.0
-flink-latest" jobManager: storageDir:HIGH_AVAILABILITY_S3_STORAGE_PATH
resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 monitoringConfiguration: s3MonitoringConfiguration: logUri: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME
-
使用如下命令提交 Flink 部署。此操作还会创建一个名为
basic-example-session-cluster
的FlinkDeployment
对象。kubectl create -f basic-example-app-cluster.yaml -n
NAMESPACE
使用以下命令确认会话集群
LIFECYCLE
是STABLE
:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n
NAMESPACE
该输出应该类似于以下示例:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster STABLE
使用以下示例内容创建
FlinkSessionJob
自定义定义资源文件basic-session-job.yaml
:apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: basic-session-job spec: deploymentName: basic-session-deployment job: # If you have your job jar in an S3 bucket you can use that path. # To use jar in S3 bucket, set # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$
OPERATOR_EXECUTION_ROLE_ARN
) # when you install Spark operator jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar parallelism: 2 upgradeMode: stateless使用如下命令提交 Flink 会话作业。此操作将会创建一个
FlinkSessionJob
对象basic-session-job
。kubectl apply -f basic-session-job.yaml -n $NAMESPACE
使用以下命令确认会话集群
LIFECYCLE
是STABLE
,JOB STATUS
是RUNNING
:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n
NAMESPACE
该输出应该类似于以下示例:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster RUNNING STABLE
-
访问 Flink UI。
kubectl port-forward deployments/basic-example-session-cluster 8081 -n
NAMESPACE
-
打开
localhost:8081
即可在本地查看 Flink 任务。 -
清理任务。记得清理为此任务创建的 S3 项目,例如检查点、高可用性、保存点元数据和日志。 CloudWatch
-