为 Flink Operator 和 Flink 应用程序使用高可用性(HA) - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

为 Flink Operator 和 Flink 应用程序使用高可用性(HA)

注意

Amazon EMR on EKS 中的 Flink 功能为预览版,随时可能更改。该功能作为 Amazon 服务条款中定义的预览服务提供。

Flink Job Manager

Flink 部署的高可用性(HA)允许任务继续推进,即使遇到临时错误、JobManager 崩溃,也是如此。任务将重新启动,但会从上次成功启用 HA 的检查点开始。如果不启用 HA,Kubernetes 会重启 JobManager,但任务会从新的任务开始并失去之前的进度。配置 HA 后,我们可以让 Kubernetes 将 HA 元数据存储在永久存储中,以备JobManager 出现临时故障时参考,然后从上次成功的检查点恢复任务。

Flink 任务会默认启用 HA(副本计数设置为 2,这需要您提供 S3 存储位置来永久存储 HA 元数据)。

HA 配置

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

以下是 Job Manager 中上述 HA 配置的描述(在 .spec.jobManager 下定义):

  • highAvailabilityEnabled(可选,默认值为 true):如果不想启用 HA,也不想使用提供的 HA 配置,请将其设置为 false 。您仍然可以操作“replicas”字段来手动配置 HA。

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他备用 JobManager,从而更快地恢复任务。如果禁用 HA,则必须将副本计数设置为 1,否则会不断收到验证错误(如果未启用 HA,则仅支持 1 个副本)。

  • storageDir(必需):由于默认使用副本计数 2,我们必须提供永久 storageDir。目前,此字段仅接受 S3 路径作为存储位置。

Pod 区域

如果启用 HA,我们还会尝试将 Pod 配置在同一个可用区中,从而提高性能(通过将 Pod 置于相同可用区来减少网络延迟)。这是一个尽力而为的过程,即如果在调度了大多数 Pod 的可用区中没有足够的资源,那么剩余 Pod 仍会被调度,但最终可能会出现在该可用区之外的节点上。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本,可以查看 Configmap 的内容,在数据下查看密钥 org.apache.flink.k8s.leader.restserver,找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n <NAMESPACE> -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Flink Operator 高可用性

我们为 Flink Operator 启用了高可用性,这样就可以使用备用 Flink Operator 进行故障转移,从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”,启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 values.yaml 文件中配置副本字段。

以下字段支持自定义:

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他备用 Operator,从而更快地恢复任务。

  • highAvailabilityEnabled(可选,默认值为 true):控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持,并设置正确的 flink-conf.yaml 参数。

values.yaml 文件中设置以下配置可以为 Operator 禁用 HA。

... imagePullSecrets: [] replicas: 1 # set this to false if you do not want HA highAvailabilityEnabled: false ...

多可用区部署

我们在多个可用区中创 Operator Pod。这是一个软约束,如果不同可用区中没有足够的资源,您的 Operator Pod 将被调度到同一可用区中。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Lease 来选举主副本。您可以描述租约并查看 .Spec.Holder Identity 字段来确定当前主副本

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3 交互

配置访问凭证

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

从 S3 应用程序模式获取任务 jar

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 jarURI 的 S3 位置。

您也可以使用此功能来下载 PyFlink 脚本等其他构件。生成的 Python 脚本放在路径 /opt/flink/usrlib/ 下。

以下示例演示如何将此功能用于 PyFlink 任务。注意 jarURI 和 args 字段。

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink S3 连接器

Flink 随附两个 S3 连接器(如下所示)。以下各节旨在介绍何时使用哪个连接器。

检查点:Presto S3 连接器

  • 将 S3 方案设置为 s3p://

  • 用于检查点到 s3 的推荐连接器。

FlinkDeployment 规范示例:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<UCKET-NAME>/flink-checkpoint/
  • 将 S3 方案设置为 s3://s3a://

  • 用于从 S3 读取和写入文件的推荐连接器(仅限实现 Flinks Filesystem 接口的 S3 连接器)。

  • 默认在 flink-conf.yaml 文件中设置 fs.s3a.aws.credentials.provider,即 com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆盖默认值 flink-conf,并且正在与 S3 进行交互,请务必使用此提供程序。

FlinkDeployment 规范示例

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless