运行 Flink 应用程序 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

运行 Flink 应用程序

Amazon EMR 6.13.0 或更高版本都支持 Flink Kubernetes Operator。按照下述步骤在 Amazon EMR on EKS 6.13.0 或更高版本上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

注意

对于公开预览版,Amazon EMR on EKS Flink Operator 不支持公开预览版 Flink Session 作业。只能提交在自定义资源定义(CRD)flinkdeployments.flink.apache.org 中定义的 Flink Application 作业。

注意

提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。

  1. 在使用 Flink Native Kubernetes Operator 运行 Flink 应用程序之前,请先完成 设置 Amazon EMR on EKS 的 Flink Kubernetes Operator安装 Operator 中的步骤。

  2. 使用以下示例内容创建 FlinkDeployment 文件定义文件 basic-example.yaml

    apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: CHECKPOINT S3 STORAGE PATH state.savepoints.dir: SAVEPOINT S3 STORAGE PATH flinkVersion: v1_17 executionRoleArn: JOB EXECUTION IAM ROLE ARN emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: storageDir: HA S3 STORAGE PATH resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: # if you have your job jar in S3 bucket you can use that path as well jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint savepointTriggerNonce: 0 monitoringConfiguration: cloudWatchMonitoringConfiguration: logGroupName: LOG GROUP NAME
  3. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 basic-exampleFlinkDeployment 对象。

    kubectl create -f example.yaml -n <NAMESPACE>
  4. 访问 Flink UI。

    kubectl port-forward deployments/basic-example 8081 -n <NAMESPACE>
  5. 打开 localhost:8081 即可在本地查看 Flink 任务。

  6. 清理任务。记得清理为此任务创建的 S3 构件,例如检查点、HA、保存点元数据、CloudWatch 日志。

有关通过 Flink Kubernetes Operator 向 Flink 提交应用程序的更多信息,请在 GitHub 上参阅 apache/flink-kubernetes-operator 文件夹中的 Flink Kubernetes operator examples