Autoscaler 参数自动调整 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Autoscaler 参数自动调整

本节介绍了各个 Amazon EMR 版本的自动调整行为。还详细介绍了不同的自动扩缩配置。

注意

Amazon EMR 7.2.0 及更高版本使用开源配置 job.autoscaler.restart.time-tracking.enabled 来实现重新缩放时间估计。重新缩放时间估计与 Amazon EMR 自动调整的功能相同,因此无需为重启时间手动分配经验值。

如果运行的是 Amazon EMR 7.1.0 或更低版本,您仍然可以使用 Amazon EMR 自动调整。

7.2.0 and higher

Amazon EMR 7.2.0 及更高版本测量会应用自动扩缩决策所需的实际重启时间。在 7.1.0 及更低版本中,必须使用配置 job.autoscaler.restart.time 来手动配置估计的最长重启时间。通过使用配置 job.autoscaler.restart.time-tracking.enabled,您只需输入第一次缩放的重启时间。之后,Operator 会记录实际的重启时间,并将其用于后续缩放。

要启用此跟踪,请使用以下命令:

job.autoscaler.restart.time-tracking.enabled: true

以下是重新缩放时间估计的相关配置。

配置 必需 默认值 描述
job.autoscaler.restart.time-tracking.enabled False 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置,以优化缩放决策。请注意,Autoscaler 只能自动调整 Autoscaler 参数 restart.time
job.autoscaler.restart.time 5m Amazon EMR on EKS 使用的预期重启时间,直到 Operator 可根据之前的缩放确定实际重启时间。
job.autoscaler.restart.time-tracking.limit 15m job.autoscaler.restart.time-tracking.enabled 设置为 true 时观察到的最长重启时间。

下面是一个示例部署规范,可用来尝试重新缩放时间估计:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autoscaler parameters job.autoscaler.enabled: "true" job.autoscaler.scaling.enabled: "true" job.autoscaler.stabilization.interval: "5s" job.autoscaler.metrics.window: "1m" job.autoscaler.restart.time-tracking.enabled: "true" job.autoscaler.restart.time: "2m" job.autoscaler.restart.time-tracking.limit: "10m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" pipeline.max-parallelism: "12" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-7.8.0-flink-latest jobManager: highAvailabilityEnabled: false storageDir: s3://<s3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<s3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: stateless

要模拟背压,请使用以下部署规范。

job: jarURI: s3://<s3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: stateless

将以下 Python 脚本上传到 S3 存储桶。

import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()

要验证重缩时间估计是否有效,请确保已启用 Flink Operator 的 DEBUG 级别日志记录。下面的示例演示了如何更新 Helm 图表文件 values.yaml。然后重新安装更新后的 Helm 图表并再次运行 Flink 作业。

log4j-operator.properties: |+ # Flink Operator Logging Overrides rootLogger.level = DEBUG

获取主容器组(pod)的名称。

ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

运行以下命令,获取指标评估中使用的实际重启时间。

kubectl logs <FLINK-OPERATOR-POD-NAME> -c flink-kubernetes-operator -n <OPERATOR-NAMESPACE> -f | grep "Restart time used in scaling summary computation"

您应该会看到类似下面的日志。请注意,只有第一次缩放才会使用 job.autoscaler.restart.time。后续缩放使用观察到的重启时间。

2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M 2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
7.0.0 and 7.1.0

开源内置 Flink Autoscaler 使用大量指标来做出最佳缩放决策。但在计算时使用的默认值适用于大多数工作负载,对于给定作业来说可能不是最佳值。Amazon EMR on EKS 版本的 Flink Operator 中添加的自动调整功能会查看针对特定捕获指标观察到的历史趋势,然后相应地尝试计算为给定作业定制的最佳值。

配置 必需 默认值 描述
kubernetes.operator.job.autoscaler.autotune.enable False 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置,以优化 Autoscaler 缩放决策。目前,Autoscaler 只能自动调整 Autoscaler 参数 restart.time
kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count 3 指示 Autoscaler 在 Amazon EMR on EKS 指标配置映射中保留的 Amazon EMR on EKS 历史指标数。
kubernetes.operator.job.autoscaler.autotune.metrics.restart.count 3 指示 Autoscaler 在开始计算给定作业的平均重启时间之前执行的重启次数。

要启用自动调整,必须完成以下操作:

  • kubernetes.operator.job.autoscaler.autotune.enable: 设置为 true

  • metrics.job.status.enable: 设置为 TOTAL_TIME

  • 按照在 Flink 应用程序中使用 Autoscaler 的设置启用自动扩缩功能。

下面是一个示例部署规范,可用来尝试自动调整。

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autotuning parameters kubernetes.operator.job.autoscaler.autotune.enable: "true" kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" metrics.job.status.enable: TOTAL_TIME # Autoscaler parameters kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.scaling.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: "5s" kubernetes.operator.job.autoscaler.metrics.window: "1m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/ state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/ pipeline.max-parallelism: "4" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-6.14.0-flink-latest jobManager: highAvailabilityEnabled: true storageDir: s3://<S3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<S3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: last-state

要模拟背压,请使用以下部署规范。

job: jarURI: s3://<S3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: last-state

将以下 Python 脚本上传到 S3 存储桶。

import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()

要验证 Autosutiner 是否正在运行,请使用以下命令。请注意,必须对 Flink Operator 使用您自己的主容器组(pod)信息。

首先,获取主容器组(pod)的名称。

ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

获取主容器组(pod)的名称后,您可以运行以下命令。

kubectl logs -n $NAMESPACE -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME> | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'

您应该会看到类似下面的日志。

[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1)) [m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S