亚马逊上的 Apache Airflow 的性能调整 MWAA - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊上的 Apache Airflow 的性能调整 MWAA

本页介绍了在使用 在 Amazon MWAA 上使用 Apache Airflow 配置选项 调整 Amazon MWAA 环境的性能时,我们推荐的最佳实践。

添加 Apache Airflow 配置选项。

以下过程将指导您完成将 Airflow 配置选项添加到环境中的步骤。

  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. 选择编辑

  4. 选择下一步

  5. Airflow 配置选项窗格中选择添加自定义配置。

  6. 从下拉列表中选择配置并输入值,或者键入自定义配置并输入值。

  7. 为每个您想要添加的配置选择添加自定义配置选项

  8. 选择保存

要了解更多信息,请参阅 在 Amazon MWAA 上使用 Apache Airflow 配置选项

Apache Airflow 计划程序

Apache Airflow 计划程序是 Apache Airflow 的核心组件。调度程序出现问题可能会导致无法DAGs解析和调度任务。有关 Apache Airflow 计划程序调整的更多信息,请参阅 Apache Airflow 文档网站中的微调计划程序性能

参数

本节介绍可用于 Apache Airflow 计划程序的配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

celery.sync_parallelism

1

Celery 执行程序用于同步任务状态的进程数。

您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设置为,1以防止在将任务日志传送到 CloudWatch 日志时出错。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。

v2

scheduler.processor_poll_interval

1

调度器 “循环” 中连续处理DAG文件之间等待的秒数。

您可以使用此选项通过延长调度器在完成检索解DAG析结果、查找和排队任务以及在 Executor 中执行排队任务后的休眠时间来腾出调度器上的CPU使用量。增加此值会消耗在 Apache Airflow v2 的 scheduler.parsing_processes 和 Apache Airflow v1 的 scheduler.max_threads 的环境中运行的计划程序线程数。这可能会降低调度程序的解析能力DAGs,并增加出现在 Web DAGs 服务器上的时间。

v2

scheduler.max_dagruns_to_create_per_loop

10

每个调度程序 “循环” DAGs 要创建DagRuns的最大数量。

您可以使用此选项通过减少调度程序 “循环” 的最大数量来腾出资源DagRuns用于调度任务。

v2

scheduler.parsing_processes

使用以下公式进行设置:(2 * number of vCPUs) - 1默认情况下。

调度器可以并行运行的线程DAGs数。

您可以使用此选项通过减少 S cheduler 并行运行以解析DAGs的进程数量来释放资源。如果DAG解析影响任务调度,我们建议将此数字保持在较低水平。在您的环境中,您必须指定一个小于 v CPU 计数的值。要了解更多信息,请参阅限制

限制

本节介绍调整计划程序的默认参数时应考虑的限制。

scheduler.parsing_processes、scheduler.max_threads

CPU对于一个环境类,每个 v 允许有两个线程。必须为环境类的计划程序保留至少一个线程。如果您发现任务计划出现延迟,则可能需要增加环境。例如,大型环境的调度器有一个 4 v CPU Fargate 容器实例。这意味着可供其他进程使用的 7 个线程总数的上限。也就是说,两个线程乘以四vCPUs,调度器本身减一。您在 scheduler.max_threadsscheduler.parsing_processes 中指定的值不得超过环境类的可用线程数(如下所示:

  • mw1.small — 其他进程不得超过 1 个线程。剩下的线程是为计划程序保留的。

  • mw1.medium — 其他进程不得超过 3 个线程。剩下的线程是为计划程序保留的。

  • mw1.large — 其他进程不得超过 7 个线程。剩下的线程是为计划程序保留的。

DAG文件夹

Apache 气流调度器会持续扫描您环境中的DAGs文件夹。任何包含的 plugins.zip 文件,或包含“Airflow”导入语句的 Python (.py) 文件。然后,所有生成的 Python DAG 对象都将放入该文件中,由调度器处理,以确定需要调度哪些任务(如果有)。DagBag无论文件是否包含任何可行的DAG对象,都会进行 DAG 文件解析。

参数

本节介绍该DAGs文件夹的可用配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

scheduler.dag_dir_list_interval

300 秒

扫描该DAGs文件夹以查找新文件的秒数。

您可以使用此选项通过增加解析DAGs文件夹的秒数来释放资源。如果您发现解析时间较长(这可能是由于您的DAGs文件夹中有total_parse_time metrics大量文件所致),我们建议您增加此值。

v2

scheduler.min_file_process_interval

30 秒

将反映调度器解析 a DAG 并对其进行更新的秒数。DAG

您可以使用此选项通过增加调度程序在解析之前等待的秒数来释放资源。DAG例如,如果将值指定为30,则每 30 秒钟就会解析一次DAG文件。我们建议将此数字保持在较高水平,以减少环境中的CPU使用量。

DAG档案

作为 Apache Airflow 调度器循环的一部分,将解析单个DAG文件以提取 Python 对象。DAG在 Apache Airflow v2 及更高版本中,计划程序可以同时解析的解析进程的最大数量。在再次解析同一个文件之前,scheduler.min_file_process_interval 中指定的秒数必须传递。

参数

本节介绍可用于 Apache Airflow DAG 文件的配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

core.dag_file_processor_timeout

50 秒

处理DAG文件超DagFileProcessor时之前的秒数。

您可以使用此选项通过增加超时之前所需的时间来释放资源。DagFileProcessor如果您在DAG处理日志中看到超时导致无法加载可行DAGs数据,我们建议您增加此值。

v2

core.dagbag_import_timeout

30 秒

导入 Python 文件之前的秒数超时。

您可以使用此选项通过增加在导入 Python 文件来提取DAG对象时调度程序超时之前所需的时间来释放资源。此选项作为计划程序“循环”的一部分进行处理,并且必须包含一个小于core.dag_file_processor_timeout 中指定值的值。

v2

core.min_serialized_dag_update_interval

30

更新数据库DAGs中序列化的最小秒数。

您可以使用此选项通过增加更新数据库DAGs中序列化的秒数来释放资源。如果您有大量或很复杂,我们建议您增加此值DAGs。DAGs增加此值可减少序列化后的调度程序和数据库DAGs的负载。

v2

core.min_serialized_dag_fetch_interval

10

序列化DAG后从数据库中重新获取序列化的秒数。 DagBag

您可以使用此选项通过增加重新获取序列化的DAG秒数来释放资源。该值必须大于 core.min_serialized_dag_update_interval 中指定的值才能降低数据库的“写入”速率。增加此值可减少序列化后的 Web 服务器和数据库DAGs的负载。

任务

Apache Airflow 计划程序和工作线程都参与排队和出队任务。计划程序将已解析的准备调度的任务从状态变为已计划状态。也在 Fargate 的计划程序容器上运行的执行程序,对这些任务进行排队并将其状态设置为已排队。当工作线程有容量时,它会从队列中提取任务并将状态设置为正在运行,然后根据任务成功还是失败将其状态更改为成功失败

参数

本节介绍可用于 Apache Airflow 任务的配置选项及其用例。

Amazon MWAA 覆盖的默认配置选项已标记 red.

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

core.parallelism

动态设置基于(maxWorkers * maxCeleryWorkers) / schedulers * 1.5

状态为“正在运行”的最大任务实例数。

通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值应为可用工作线程数“乘以”工作线程任务密度。我们建议您仅在看到大量任务处于“正在运行”或“已排队”状态时才更改此值。

v2

core.dag_concurrency

10000

允许同时运行的每个DAG任务实例的数量。

通过增加可以并发运行的任务实例数,您可以使用此选项来释放资源。例如,如果您有一百DAGs个包含十个并行任务,并且您希望所有任务同时运行,则可以DAGs将最大并行度计算为可用 Workers 的数量 “乘以” W or kers 任务密度celery.worker_concurrency,除以DAGs(例如 100)的数量。

v2

core.execute_tasks_new_python_interpreter

True

确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。

设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。

v2

celery.worker_concurrency

不适用

Amazon MWAA 会覆盖此选项的 Airflow 基础安装,以作为其自动缩放组件的一部分扩展 Workers。

Any value specified for this option is ignored.

v2

celery.worker_autoscale

mw1.small - 5,0

mw1.medium - 10,0

mw1.large - 20,0

mw1.xlarge-40,0

mw1.2 xlarge-80,0

工作线程的任务并发度。

通过降低工作线程minimummaximum任务并发度,您可以使用此选项来释放资源。无论是否有足够的资源这样做,工作线程最多可以接受配置的 maximum 并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。