本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Autoscaler 参数自动调整
本节介绍了各个 Amazon EMR 版本的自动调整行为。还详细介绍了不同的自动扩缩配置。
注意
Amazon EMR 7.2.0 及更高版本使用开源配置 job.autoscaler.restart.time-tracking.enabled
来实现重新缩放时间估计。重新缩放时间估计与 Amazon EMR 自动调整的功能相同,因此无需为重启时间手动分配经验值。
如果运行的是 Amazon EMR 7.1.0 或更低版本,您仍然可以使用 Amazon EMR 自动调整。
- 7.2.0 and higher
-
Amazon EMR 7.2.0 及更高版本测量会应用自动扩缩决策所需的实际重启时间。在 7.1.0 及更低版本中,必须使用配置
job.autoscaler.restart.time
来手动配置估计的最长重启时间。通过使用配置job.autoscaler.restart.time-tracking.enabled
,您只需输入第一次缩放的重启时间。之后,Operator 会记录实际的重启时间,并将其用于后续缩放。要启用此跟踪,请使用以下命令:
job.autoscaler.restart.time-tracking.enabled: true
以下是重新缩放时间估计的相关配置。
配置 必需 默认值 描述 job.autoscaler.restart.time-tracking.enabled 否 False 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置,以优化缩放决策。请注意,Autoscaler 只能自动调整 Autoscaler 参数 restart.time
。job.autoscaler.restart.time 否 5m Amazon EMR on EKS 使用的预期重启时间,直到 Operator 可根据之前的缩放确定实际重启时间。 job.autoscaler.restart.time-tracking.limit 否 15m 当 job.autoscaler.restart.time-tracking.enabled
设置为true
时观察到的最长重启时间。下面是一个示例部署规范,可用来尝试重新缩放时间估计:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autoscaler parameters job.autoscaler.enabled: "true" job.autoscaler.scaling.enabled: "true" job.autoscaler.stabilization.interval: "5s" job.autoscaler.metrics.window: "1m" job.autoscaler.restart.time-tracking.enabled: "true" job.autoscaler.restart.time: "2m" job.autoscaler.restart.time-tracking.limit: "10m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" pipeline.max-parallelism: "12" executionRoleArn:
<JOB ARN>
emrReleaseLabel: emr-7.8.0-flink-latest jobManager: highAvailabilityEnabled: false storageDir: s3://<s3_bucket>
/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<s3_bucket>
/some-job-with-back-pressure parallelism: 1 upgradeMode: stateless要模拟背压,请使用以下部署规范。
job: jarURI: s3://
<s3_bucket>
/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: stateless将以下 Python 脚本上传到 S3 存储桶。
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()
要验证重缩时间估计是否有效,请确保已启用 Flink Operator 的
DEBUG
级别日志记录。下面的示例演示了如何更新 Helm 图表文件values.yaml
。然后重新安装更新后的 Helm 图表并再次运行 Flink 作业。log4j-operator.properties: |+ # Flink Operator Logging Overrides rootLogger.level = DEBUG
获取主容器组(pod)的名称。
ip=$(kubectl get configmap -n $NAMESPACE
<job-name>
-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"运行以下命令,获取指标评估中使用的实际重启时间。
kubectl logs
<FLINK-OPERATOR-POD-NAME>
-c flink-kubernetes-operator -n<OPERATOR-NAMESPACE>
-f | grep "Restart time used in scaling summary computation"您应该会看到类似下面的日志。请注意,只有第一次缩放才会使用
job.autoscaler.restart.time
。后续缩放使用观察到的重启时间。2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M 2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
- 7.0.0 and 7.1.0
-
开源内置 Flink Autoscaler 使用大量指标来做出最佳缩放决策。但在计算时使用的默认值适用于大多数工作负载,对于给定作业来说可能不是最佳值。Amazon EMR on EKS 版本的 Flink Operator 中添加的自动调整功能会查看针对特定捕获指标观察到的历史趋势,然后相应地尝试计算为给定作业定制的最佳值。
配置 必需 默认值 描述 kubernetes.operator.job.autoscaler.autotune.enable 否 False 指示 Flink Autoscaler 是否应随着时间的推移自动调整配置,以优化 Autoscaler 缩放决策。目前,Autoscaler 只能自动调整 Autoscaler 参数 restart.time
。kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count 否 3 指示 Autoscaler 在 Amazon EMR on EKS 指标配置映射中保留的 Amazon EMR on EKS 历史指标数。 kubernetes.operator.job.autoscaler.autotune.metrics.restart.count 否 3 指示 Autoscaler 在开始计算给定作业的平均重启时间之前执行的重启次数。 要启用自动调整,必须完成以下操作:
-
将
kubernetes.operator.job.autoscaler.autotune.enable:
设置为true
-
将
metrics.job.status.enable:
设置为TOTAL_TIME
-
按照在 Flink 应用程序中使用 Autoscaler 的设置启用自动扩缩功能。
下面是一个示例部署规范,可用来尝试自动调整。
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autotuning parameters kubernetes.operator.job.autoscaler.autotune.enable: "true" kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" metrics.job.status.enable: TOTAL_TIME # Autoscaler parameters kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.scaling.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: "5s" kubernetes.operator.job.autoscaler.metrics.window: "1m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/ state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/ pipeline.max-parallelism: "4" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-6.14.0-flink-latest jobManager: highAvailabilityEnabled: true storageDir: s3://<S3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<S3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: last-state
要模拟背压,请使用以下部署规范。
job: jarURI: s3://<S3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: last-state
将以下 Python 脚本上传到 S3 存储桶。
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()
要验证 Autosutiner 是否正在运行,请使用以下命令。请注意,必须对 Flink Operator 使用您自己的主容器组(pod)信息。
首先,获取主容器组(pod)的名称。
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
获取主容器组(pod)的名称后,您可以运行以下命令。
kubectl logs -n $NAMESPACE -c flink-kubernetes-operator --follow <
YOUR-FLINK-OPERATOR-POD-NAME
> | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'您应该会看到类似下面的日志。
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1)) [m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
-