Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon MWAA 上的 Apache Airflow 的性能调整
本主题介绍如何使用调整适用于 Apache Airflow 的亚马逊托管工作流程环境的性能。在 Amazon MWAA 上使用 Apache Airflow 配置选项
添加 Apache Airflow 配置选项。
使用以下步骤向您的环境中添加 Airflow 配置选项。
-
在 Amazon MWAA 控制台上打开环境页面。
-
选择环境。
-
选择编辑。
-
选择下一步。
-
在 Airflow 配置选项窗格中选择添加自定义配置。
-
从下拉列表中选择配置并输入值,或者键入自定义配置并输入值。
-
为每个您想要添加的配置选择添加自定义配置选项。
-
选择保存。
要了解更多信息,请参阅在 Amazon MWAA 上使用 Apache Airflow 配置选项。
Apache Airflow 计划程序
Apache Airflow 计划程序是 Apache Airflow 的核心组件。调度程序出现问题可能会导致无法 DAGs 解析和调度任务。有关 Apache Airflow 调度程序调整的更多信息,请参阅 Apache Airflow 文档网站上的微调计划程序性能。
参数
本节介绍了 Apache Airflow 调度程序(Apache Airflow v2 及更高版本)可用的配置选项及其用例。
- Apache Airflow v3
-
配置 |
应用场景 |
celery.sync_parallelism
Celery 执行程序用于同步任务状态的进程数。
默认值:1
|
您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设置为,1 以防止在将任务日志传送到 CloudWatch 日志时出错。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。
|
scheduler.scheduler_idle_sleep_time
在调度程序 “循环” 中连续处理 DAG 文件之间等待的秒数。
默认值:1
|
您可以使用此选项通过延长调度器在完成检索 DAG 解析结果、查找和排队任务以及在 Executor 中执行排队任务后的休眠时间来释放调度器上的 CPU 使用量。增加此值会消耗在 Apache Airflow v2 和 Apache Airflow v3 dag_processor.parsing_processes 的环境中运行的调度器线程数。这可能会降低调度程序的解析能力 DAGs,并增加在网络服务器中填充 DAGs 所需的时间。
|
scheduler.max_dagruns_to_create_per_loop
DagRuns为每个调度程序 “循环” 创建的最大数量。 DAGs
默认值:10
|
您可以使用此选项通过减少调度程序 “循环” 的最大数量来腾出资源DagRuns用于调度任务。
|
dag_processor.解析进程
调度器可以并行运行的线程 DAGs数。
默认:使用 (2 * number of vCPUs) - 1
|
您可以使用此选项通过减少调度程序并行运行以解析 DAGs的进程数量来释放资源。如果 DAG 解析影响任务调度,我们建议将此数量保持在较低水平。您必须指定一个小于环境中 vCPU 计数的值。要了解更多信息,请参阅限制。
|
- Apache Airflow v2
-
配置 |
应用场景 |
celery.sync_parallelism
Celery 执行程序用于同步任务状态的进程数。
默认值:1
|
您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设置为,1 以防止在将任务日志传送到 CloudWatch 日志时出错。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。
|
scheduler.idle_sleep_time
在调度程序 “循环” 中连续处理 DAG 文件之间等待的秒数。
默认值:1
|
您可以使用此选项通过延长调度器在完成检索 DAG 解析结果、查找和排队任务以及在 Executor 中执行排队任务后的休眠时间来释放调度器上的 CPU 使用量。增加此值会消耗在 Apache Airflow v2 和 Apache Airflow v3 scheduler.parsing_processes 的环境中运行的调度器线程数。这可能会降低调度程序的解析能力 DAGs,并增加在网络服务器中填充 DAGs 所需的时间。
|
scheduler.max_dagruns_to_create_per_loop
DagRuns为每个调度程序 “循环” 创建的最大数量。 DAGs
默认值:10
|
您可以使用此选项通过减少调度程序 “循环” 的最大数量来腾出资源DagRuns用于调度任务。
|
scheduler.parsing_processes
调度器可以并行运行的线程 DAGs数。
默认:使用 (2 * number of vCPUs) - 1
|
您可以使用此选项通过减少调度程序并行运行以解析 DAGs的进程数量来释放资源。如果 DAG 解析影响任务调度,我们建议将此数量保持在较低水平。您必须指定一个小于环境中 vCPU 计数的值。要了解更多信息,请参阅限制。
|
限制
本节介绍调整调度程序的默认参数时要考虑的限制。
- scheduler.parsing_processes、scheduler.max_threads(仅限 v2)
-
对于环境类,每个 vCPU 允许使用两个线程。必须为环境类的计划程序保留至少一个线程。如果您发现任务计划出现延迟,则可能需要提高环境等级。例如,大型环境为其计划程序设有一个 4 vCPU 的 Fargate 容器实例。这意味着可供其他进程使用的 7
个线程总数的上限。也就是说,两个线程乘以四 vCPUs,调度器本身减一。您在scheduler.max_threads
(仅限 v2)中指定的值,且scheduler.parsing_processes
不得超过环境类的可用线程数,如下所示:
-
mw1.small — 其他进程不得超过 1
个线程。剩下的线程是为调度器保留的。
-
mw1.medium — 其他进程不得超过 3
个线程。剩下的线程是为调度器保留的。
-
mw1.large — 其他进程不得超过 7
个线程。剩下的线程是为调度器保留的。
DAG 文件夹
Apache Airflow 调度程序会持续扫描您环境 DAGs 中的文件夹。任何包含的 plugins.zip
文件,或包含“Airflow”导入语句的 Python (.py
) 文件。然后,所有生成的 Python DAG 对象都将放入该文件中,由调度器处理,以确定需要安排哪些任务(如果有)。DagBag无论文件是否包含任何可行的 DAG 对象,都会进行 DAG 文件解析。
参数
本节介绍该 DAGs 文件夹(Apache Airflow v2 及更高版本)可用的配置选项及其用例。
- Apache Airflow v3
-
配置 |
应用场景 |
dag_processor.刷新间隔
必须对 DAGs 文件夹进行扫描以查找新文件的秒数。
默认值:300 秒
|
您可以使用此选项通过增加解析 DAGs 文件夹的秒数来释放资源。如果您的解析时间很长total_parse_time metrics ,这可能是由于您的 DAGs 文件夹中有大量文件所致,我们建议您增加此值。
|
dag_processor.min_file_process_inter
反映计划程序解析 DAG 并更新 DAG 之后的秒数。
默认值:30 秒
|
您可以使用此选项通过增加计划程序在解析 DAG 之前等待的秒数来释放资源。例如,如果指定 30 的值,则将每 30 秒解析一次 DAG 文件。我们建议将此秒数保持在较高水平,以减小环境上的 CPU 使用率。
|
- Apache Airflow v2
-
配置 |
应用场景 |
scheduler.dag_dir_list_interval
必须对 DAGs 文件夹进行扫描以查找新文件的秒数。
默认值:300 秒
|
您可以使用此选项通过增加解析 DAGs 文件夹的秒数来释放资源。如果您的解析时间很长total_parse_time metrics ,这可能是由于您的 DAGs 文件夹中有大量文件所致,我们建议您增加此值。
|
scheduler.min_file_process_interval
反映计划程序解析 DAG 并更新 DAG 之后的秒数。
默认值:30 秒
|
您可以使用此选项通过增加计划程序在解析 DAG 之前等待的秒数来释放资源。例如,如果指定 30 的值,则将每 30 秒解析一次 DAG 文件。我们建议将此秒数保持在较高水平,以减小环境上的 CPU 使用率。
|
DAG 文件
作为 Apache Airflow 计划程序循环的一部分,将解析单个 DAG 文件以提取 DAG Python 对象。在 Apache Airflow v2 及更高版本中,调度器可以同时解析最大数量的解析进程。在再次解析同一个文件之前,必须经过在 scheduler.min_file_process_interval
dag_processor.min_file_process_interval
(v2) 或 (v3) 中指定的秒数。
参数
本节介绍可用于 Apache Airflow DAG 文件(Apache Airflow v2 及更高版本)的配置选项及其用例。
- Apache Airflow v3
-
配置 |
应用场景 |
dag_processor.dag_file_processor_time
处理 DAG 文件超DagFileProcessor时之前的秒数。
默认值:50 秒
|
您可以使用此选项通过增加超时之前所需的时间来释放资源。DagFileProcessor如果您在 DAG 处理日志中遇到超时导致无法加载可行 DAGs 数据,我们建议您增加此值。
|
core.dagbag_import_timeout
导入 Python 文件之前的秒数超时。
默认值:30 秒
|
您可以使用此选项通过增加在导入 Python 文件以提取 DAG 对象时调度程序超时之前所需的时间来释放资源。此选项作为调度程序 “循环” 的一部分进行处理,并且包含的值必须小于中dag_processor.dag_file_processor_timeout 指定的值。
|
core.min_serialized_dag_update_interval
更新数据库 DAGs 中序列化的最小秒数。
默认值:30
|
您可以使用此选项通过增加更新数据库 DAGs 中序列化的秒数来释放资源。如果您有大量或很复杂,我们建议您增加此值 DAGs。 DAGs增加此值可减少序列化后的调度程序和数据库 DAGs 的负载。
|
core.min_serialized_dag_fetch_interval
序列化的 DAG 已加载到数据库中时从数据库中重新提取的秒数。 DagBag
默认值:10
|
通过增加序列化的 DAG 重新提取的秒数,您可以使用此选项来释放资源。该值必须大于中指定的值core.min_serialized_dag_update_interval 才能降低数据库的 “写入” 速率。增加此值可减少序列化后的 Web 服务器和数据库 DAGs 的负载。
|
- Apache Airflow v2
-
配置 |
应用场景 |
core.dag_file_processor_timeout
处理 DAG 文件超DagFileProcessor时之前的秒数。
默认值:50 秒
|
您可以使用此选项通过增加超时之前所需的时间来释放资源。DagFileProcessor如果您在 DAG 处理日志中遇到超时导致无法加载可行 DAGs 数据,我们建议您增加此值。
|
core.dagbag_import_timeout
导入 Python 文件之前的秒数超时。
默认值:30 秒
|
您可以使用此选项通过增加在导入 Python 文件以提取 DAG 对象时调度程序超时之前所需的时间来释放资源。此选项作为调度程序 “循环” 的一部分进行处理,并且包含的值必须小于中core.dag_file_processor_timeout 指定的值。
|
core.min_serialized_dag_update_interval
更新数据库 DAGs 中序列化的最小秒数。
默认值:30
|
您可以使用此选项通过增加更新数据库 DAGs 中序列化的秒数来释放资源。如果您有大量或很复杂,我们建议您增加此值 DAGs。 DAGs增加此值可减少序列化后的调度程序和数据库 DAGs 的负载。
|
core.min_serialized_dag_fetch_interval
序列化的 DAG 已加载到数据库中时从数据库中重新提取的秒数。 DagBag
默认值:10
|
通过增加序列化的 DAG 重新提取的秒数,您可以使用此选项来释放资源。该值必须大于中指定的值core.min_serialized_dag_update_interval 才能降低数据库的 “写入” 速率。增加此值可减少序列化后的 Web 服务器和数据库 DAGs 的负载。
|
任务
Apache Airflow 计划程序和工作线程都参与排队和出队任务。计划程序将已解析的准备调度的任务从无状态变为已计划状态。也在 Fargate 的计划程序容器上运行的执行程序,对这些任务进行排队并将其状态设置为已排队。当工作线程有容量时,它会从队列中提取任务并将状态设置为正在运行,然后根据任务成功还是失败将其状态更改为成功或失败。
参数
本节介绍可用于 Apache Airflow 任务的配置选项及其用例。
标记 Amazon MWAA 覆盖的默认配置选项。red
- Apache Airflow v3
-
配置 |
应用场景 |
core.parallelism
可以具有Running 状态的任务实例的最大数量。
默认:基于动态设置(maxWorkers * maxCeleryWorkers) / schedulers * 1.5 。
|
通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值必须是可用工作人员的数量乘以工作人员的任务密度。我们建议您仅在遇到大量任务处于 “正在运行” 或 “已排队” 状态时才更改此值。
|
core.execute_tasks_new_python_interpreter
确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。
默认值:True
|
设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。
|
celery.worker_concurrency
Amazon MWAA 会覆盖此选项的 Airflow 基础安装,以作为其自动缩放组件的一部分来扩展工作人员。
默认:不适用
|
Any value specified for this option is ignored.
|
celery.worker_autoscale
工作人员的任务并发度。
默认值:
mw1.micro -3,0 mw1.small - 5,0 mw1.medium - 10,0 mw1.large - 20,0 mw1.xlarge – 40,0 mw1.2xlarge – 80,0
|
您可以使用此选项通过减少工作人员的minimum 任务并maximum 发来释放资源。无论是否有足够的资源,工作人员最多可以接受配置的maximum 并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。
|
- Apache Airflow v2
-
配置 |
应用场景 |
core.parallelism
可以具有Running 状态的任务实例的最大数量。
默认:基于动态设置(maxWorkers * maxCeleryWorkers) / schedulers * 1.5 。
|
通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值必须是可用工作人员的数量乘以工作人员的任务密度。我们建议您仅在遇到大量任务处于 “正在运行” 或 “已排队” 状态时才更改此值。
|
core.dag_concurrency
允许为每个 DAG 同时运行的任务实例数。
默认值:10000
|
通过增加可以并发运行的任务实例数,您可以使用此选项来释放资源。例如,如果您有一百个包含十 DAGs 个并行任务,并且您希望所有 DAGs 任务同时运行,则可以计算最大并行度,方法是可用工作线程的数量乘以中的工作人员任务密度celery.worker_concurrency ,再除以数量。 DAGs
|
core.execute_tasks_new_python_interpreter
确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。
默认值:True
|
设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。
|
celery.worker_concurrency
Amazon MWAA 会覆盖此选项的 Airflow 基础安装,以作为其自动缩放组件的一部分来扩展工作人员。
默认:不适用
|
Any value specified for this option is ignored.
|
celery.worker_autoscale
工作人员的任务并发度。
默认值:
mw1.micro -3,0 mw1.small - 5,0 mw1.medium - 10,0 mw1.large - 20,0 mw1.xlarge – 40,0 mw1.2xlarge – 80,0
|
您可以使用此选项通过减少工作人员的minimum 任务并maximum 发来释放资源。无论是否有足够的资源,工作人员最多可以接受配置的maximum 并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。
|