运行 Flink 应用程序 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

运行 Flink 应用程序

在 Amazon EMR 6.13.0 及更高版本中,您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。在 Amazon EMR 6.15.0 及更高版本中,您还可以在会话模式中运行 Flink 应用程序。本页介绍您使用 EKS 上的 Amazon EMR 运行 Flink 应用程序时可用的两种方法。

主题
    注意

    提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。

    先决条件 – 在使用 Flink Native Kubernetes Operator 运行 Flink 应用程序之前,请先完成 设置 Amazon EMR on EKS 的 Flink Kubernetes Operator安装 Operator 中的步骤。

    Application mode

    在 Amazon EMR 6.13.0 及更高版本中,您可以在应用程序模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

    1. 使用以下示例内容创建 FlinkDeployment 文件定义文件 basic-example-app-cluster.yaml

      apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-app-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH flinkVersion: v1_17 executionRoleArn: JOB_EXECUTION_ROLE_ARN emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher jobManager: storageDir: HIGH_AVAILABILITY_STORAGE_PATH resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: # if you have your job jar in S3 bucket you can use that path as well jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint savepointTriggerNonce: 0 monitoringConfiguration: cloudWatchMonitoringConfiguration: logGroupName: LOG_GROUP_NAME
    2. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 basic-example-app-clusterFlinkDeployment 对象。

      kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
    3. 访问 Flink UI。

      kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
    4. 打开 localhost:8081 即可在本地查看 Flink 任务。

    5. 清理任务。记得清理为此任务创建的 S3 项目,例如检查点、高可用性、保存点元数据和日志。 CloudWatch

    有关通过 Flink Kubernetes 运算符向 Flink 提交应用程序的更多信息,请参阅文件夹中的 Flink K uber netes 运算符示例。apache/flink-kubernetes-operator GitHub

    Session mode

    在 Amazon EMR 6.15.0 及更高版本中,您可以在会话模式下在 EKS 上的 Amazon EMR 上使用 Flink Kubernetes Operator 来运行 Flink 应用程序。

    1. 使用以下示例内容创建 FlinkDeployment 文件定义文件 basic-example-session-cluster.yaml

      apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-session-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH flinkVersion: v1_17 executionRoleArn: JOB_EXECUTION_ROLE_ARN emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 monitoringConfiguration: s3MonitoringConfiguration: logUri: cloudWatchMonitoringConfiguration: logGroupName: LOG_GROUP_NAME
    2. 使用如下命令提交 Flink 部署。此操作还会创建一个名为 basic-example-session-clusterFlinkDeployment 对象。

      kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
    3. 使用以下命令确认会话集群 LIFECYCLESTABLE

      kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE

      该输出应该类似于以下示例:

      NAME                              JOB STATUS   LIFECYCLE STATE
      basic-example-session-cluster                          STABLE
    4. 使用以下示例内容创建 FlinkSessionJob 自定义定义资源文件 basic-session-job.yaml

      apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: basic-session-job spec: deploymentName: basic-session-deployment job: # If you have your job jar in an S3 bucket you can use that path. # To use jar in S3 bucket, set # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN) # when you install Spark operator jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar parallelism: 2 upgradeMode: stateless
    5. 使用如下命令提交 Flink 会话作业。此操作将会创建一个 FlinkSessionJob 对象 basic-session-job

      kubectl apply -f basic-session-job.yaml -n $NAMESPACE
    6. 使用以下命令确认会话集群 LIFECYCLESTABLEJOB STATUSRUNNING

      kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE

      该输出应该类似于以下示例:

      NAME                              JOB STATUS   LIFECYCLE STATE
      basic-example-session-cluster     RUNNING      STABLE
    7. 访问 Flink UI。

      kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
    8. 打开 localhost:8081 即可在本地查看 Flink 任务。

    9. 清理任务。记得清理为此任务创建的 S3 项目,例如检查点、高可用性、保存点元数据和日志。 CloudWatch