故障排除:Apache Airflow v2 中的 DAG、运算符、连接和其他问题 - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

故障排除:Apache Airflow v2 中的 DAG、运算符、连接和其他问题

本页上的主题介绍了在 Amazon Managed Workflows for Apache Airflow 环境中可能遇到的 Apache Airflow v2 Python 依赖项、自定义插件、DAG、运算符、连接、任务和 Web 服务器问题的解决方案。

连接

以下主题描述了在使用 Apache Airflow 连接或其他 Amazon 数据库时可能收到的错误。

我无法连接 Secrets Manager

我们建议您完成以下步骤:

  1. 了解如何为 Apache Airflow 连接和变量创建密钥,请参阅 使用密钥配置 Apache Airflow 连接 Amazon Secrets Manager

  2. 要了解如何使用 Apache Airflow 变量(test-variable)的密钥,请参阅 为 Apache Airflow 变量使用 Amazon Secrets Manager 中的密钥

  3. 要了解如何使用密钥进行 Apache Airflow 连接(myconn),请参阅 使用 Amazon Secrets Manager 中的密钥进行 Apache Airflow 连接

如何在我的执行角色策略中配置 secretsmanager:ResourceTag/<tag-key> Secrets Manager 条件或资源限制?

注意

适用于 Apache Airflow 版本 2.0 及更早版本。

目前,由于 Apache Airflow 中存在已知问题,您无法通过在环境的执行角色中使用条件密钥或其他资源限制来限制对 Secrets Manager 密钥的访问。

我无法连接 Snowflake

我们建议您完成以下步骤:

  1. 使用 GitHub 上的 aws-mwaa-local-runner 在本地测试 DAG、自定义插件和 Python 依赖项。

  2. 将以下条目添加到适合环境的 requirements.txt 中。

    apache-airflow-providers-snowflake==1.3.0
  3. 将以下导入添加至 DAG:

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

确保 Apache Airflow 连接对象包含以下键值对:

  1. 连接 ID:snowflake_conn

  2. 连接类型:Snowflake

  3. 主机:<my account>.<my region if not us-west-2>.snowflakecomputing.com

  4. Schema:<my schema>

  5. 登录:<my user name>

  6. 密码:********

  7. 端口:<port, if any>

  8. 附加依赖项:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

例如:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='YOUR_ACCOUNT.YOUR_REGION.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='YOUR_ACCOUNT', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='YOUR_REGION')), ... )

我无法在 Airflow UI 中看到我的连接

Apache Airflow 在 Apache Airflow UI 中提供了连接模板。无论连接类型如何,它都使用此模板来生成连接 URI 字符串。如果 Apache Airflow UI 中没有连接模板,则可以使用备用连接模板来生成连接 URI 字符串,例如使用 HTTP 连接模板。

我们建议您完成以下步骤:

  1. 在 Apache Airflow UI 中查看 Amazon MWAA 提供的连接类型,请参阅 安装在 Amazon MWAA 环境中的 Apache Airflow 提供程序包

  2. 在 CLI 中查看创建 Apache Airflow 连接的命令,请参阅 Apache Airflow CLI 命令参考

  3. 要了解如何交替使用 Apache Airflow UI 中的连接模板来处理 Amazon MWAA 上的 Apache Airflow UI 中没有的连接类型,请参阅 连接类型概述

Web 服务器

以下主题描述了您在 Amazon MWAA 上的 Apache Airflow Web 服务器上可能收到的错误。

我在访问 Web 服务器时看到 5xx 错误

我们建议您完成以下步骤:

  1. 检查 Apache Airflow 配置选项。验证您指定为 Apache Airflow 配置选项的键值对(例如 Amazon Secrets Manager)是否配置正确。要了解更多信息,请参阅 我无法连接 Secrets Manager

  2. 查看 requirements.txt。验证在 requirements.txt 中列出的 Airflow “Extras”程序包和其他库是否与 Apache Airflow 版本兼容。

  3. 探索在 requirements.txt 文件中指定 Python 依赖项的方法,请参阅 在 requirements.txt 中管理 Python 依赖项

我看到“计划程序似乎未运行”错误

如果计划程序似乎未运行,或者最后一个“心跳”是在几个小时前收到的,则 DAG 可能不会出现在 Apache Airflow 中,也不会调度新任务。

我们建议您完成以下步骤:

  1. 确认 VPC 安全组允许入站访问端口 5432。需要使用此端口才能连接到环境的 Amazon Aurora PostgreSQL 元数据数据库。添加此规则后,给 Amazon MWAA 几分钟,错误就会消失。要了解更多信息,请参阅 Amazon MWAA 上的 VPC 安全

    注意
    • Aurora PostgreSQL 元数据库是 Amazon MWAA 服务架构的一部分,在 Amazon Web Services 账户 中不可见。

    • 与数据库相关的错误通常是计划程序失败的症状,而不是根本原因。

  2. 如果计划程序未运行,则可能是由于多种因素造成的,例如依赖项安装失败计划程序过载。在 CloudWatch Logs 中查看相应的日志组,确认 DAG、插件和要求是否正常运行。要了解更多信息,请参阅 Amazon MWAA 的监控和指标

任务

以下主题描述了在环境中执行 Apache Airflow 任务时可能收到的错误。

我看到我的任务卡顿或者没有完成

如果 Apache Airflow 任务 “卡顿” 或未完成,我们建议您执行以下步骤:

  1. 可能定义了大量 DAG。减少 DAG 的数量并执行环境更新(例如更改日志级别)以强制重置。

    1. 无论是否启用 DAG,Airflow 都会对其进行解析。如果您使用的容量超过环境容量的 50%,则可能会开始让 Apache Airflow 计划程序不堪重负。这会导致 CloudWatch 指标中的总解析时间过长,或者在 CloudWatch Logs 中导致 DAG 处理时间过长。还有其他优化 Apache Airflow 配置的方法,这些方法不在本指南的讨论范围之内。

    2. 要详细了解调整环境性能我们建议的最佳实践,请参阅 Amazon MWAA 上的 Apache Airflow 的性能调整

  2. 队列中可能有大量任务。这通常表现为处于“无”状态的大量且不断增长的任务,或者在 CloudWatch 的排队任务和/或待处理任务中显示为大量任务。出现此错误的原因如下:

    1. 要运行的任务是否多于环境的运行能力,和/或在自动扩缩有时间检测任务并部署额外的工作线程之前排队的任务数是否很多。

    2. 如果要运行的任务多于环境的运行容量,我们建议减少 DAG 同时运行的任务数量,和/或增加 Apache Airflow 工作线程的最小数量。

    3. 如果在自动扩缩有时间检测和部署额外工作线程之前有大量任务排队,我们建议错开任务部署和/或增加 Apache Airflow 工作线程的最小数量。

    4. 您可以使用 Amazon Command Line Interface(Amazon CLI)中的 update-environment 命令来更改在环境中运行的工作线程的最小或最大数量。

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. 要详细了解调整环境性能我们建议的最佳实践,请参阅 Amazon MWAA 上的 Apache Airflow 的性能调整

  3. 在执行过程中,可能会删除一些显示为任务日志但因在 Apache Airflow 中无进一步的指示而停止的任务。出现此错误的原因如下:

    1. 是否存在短暂的时刻,此时 1)当前任务超出当前环境容量,然后是 2)几分钟无任务执行或排队,最后 3)新任务正在排队。

    2. Amazon MWAA 自动扩缩通过添加更多工作线程来应对第一种情况。在第二种情况下,它会移除额外的工作线程。某些正在排队的任务可能会导致工作线程处于移除过程中,这些任务将在容器被删除时结束。

    3. 我们建议增加环境中工作线程的最小数量。另一种选择是调整 DAG 和任务的计时,以确保这些情况不会发生。

    4. 您还可以将最小工作线程数设置为等于环境中的最大工作线程数,从而有效地禁用自动扩缩。使用 Amazon Command Line Interface(Amazon CLI)中的 update-environment 命令,将最小和最大工作线程数设置为相等来禁用自动扩缩

      aws mwaa update-environment --name MyEnvironmentName --min-workers 5 --max-workers 5
    5. 要详细了解调整环境性能我们建议的最佳实践,请参阅 Amazon MWAA 上的 Apache Airflow 的性能调整

  4. 如果任务停留在“正在运行”状态,也可以清除任务或将其标记为成功或失败。这允许环境的自动扩缩组件缩减运行在环境上的工作线程的数量。下图显示了滞留任务的示例。

    这是滞留任务的图像。
    1. 选择滞留任务的圆圈,然后选择清除(如图所示)。这允许 Amazon MWAA 缩减工作线程;否则,如果仍有排队的任务,Amazon MWAA 无法确定启用或禁用了哪些 DAG,也无法缩减。

      Apache Airflow 操作
  5. 要详细了解 Apache Airflow 任务生命周期,请参阅《Apache Airflow 参考指南》概念

CLI

以下主题介绍了您在 Amazon Command Line Interface 中运行 Airflow CLI 命令时可能收到的错误。

在 CLI 中触发 DAG 时我看到“503”错误

Airflow CLI 在 Apache Airflow Web 服务器上运行,该服务器的并发性有限。通常,它最多可以同时运行 4 个 CLI 命令。

为什么 dags backfill Apache Airflow CLI 命令会失败? 是否有解决方法?

注意

以下内容仅适用于 Apache Airflow v2.0.2 环境。

与其他 Apache Airflow CLI 命令一样,backfill 命令会在处理任何 DAG 之前在本地解析所有 DAG,无论该 CLI 操作适用于哪个 DAG。在使用 Apache Airflow v2.0.2 的 Amazon MWAA 环境中,由于在 CLI 命令运行时,Web 服务器上尚未安装插件和要求,因此解析操作将失败,并且不会调用 backfill 操作。如果环境中没有任何要求或插件,则 backfill 操作将成功。

为了能够运行 backfill CLI 命令,我们建议在 bash 运算符中调用它。在 bash 运算符中,从工作线程启动 backfill,允许 DAG 成功解析,因为所有必要的要求和插件都可用并已安装。以下示例演示如何使用 BashOperator 创建 DAG 来运行 backfill

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

运算符

以下主题描述了您在使用运算符时可能收到的错误。

我在使用 S3Transform 运算符时遇到了 PermissionError: [Errno 13] Permission denied 错误

如果您尝试使用 S3Transform 运算符运行 shell 脚本并收到 PermissionError: [Errno 13] Permission denied 错误,我们建议您执行以下步骤。以下步骤假设您已有一个 plugins.zip 文件。如果您要创建新的 plugins.zip,请参阅 安装自定义插件

  1. 使用 GitHub 上的 aws-mwaa-local-runner 在本地测试 DAG、自定义插件和 Python 依赖项。

  2. 创建“转换”脚本。

    #!/bin/bash cp $1 $2
  3. (可选)macOS 和 Linux 用户可能需要运行以下命令以确保脚本可执行。

    chmod 777 transform_test.sh
  4. 将脚本添加到 plugins.zip。

    zip plugins.zip transform_test.sh
  5. 按照将 plugins.zip 上传到 Amazon S3 中的步骤进行操作。

  6. 按照在 Amazon MWAA 控制台上指定 plugins.zip 版本中的步骤操作。

  7. 创建以下 DAG 文件。

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://YOUR_S3_BUCKET/files/input.txt', dest_s3_key='s3://YOUR_S3_BUCKET/files/output.txt' )
  8. 按照将 DAG 代码上传到 Amazon S3 中的步骤进行操作。