在不同的 Amazon MWAA 环境中调用 DAG - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在不同的 Amazon MWAA 环境中调用 DAG

以下代码示例创建了一个 Apache Airflow CLI 令牌。然后,该代码使用一个 Amazon MWAA 环境中的有向无环图(DAG)在另一个 Amazon MWAA 环境中调用 DAG。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的代码示例,您需要以下内容:

  • 两个具有公有网络 Web 服务器访问权限的 Amazon MWAA 环境,包括您当前的环境。

  • 上传到目标环境的 Amazon Simple Storage Service(Amazon S3)桶的示例 DAG。

权限

要使用本页上的代码示例,环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加Amazon托管策略AmazonMWAAAirflowCliAccess来授予此权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

有关更多信息,请参阅 Apache Airflow CLI 政策:AmazonmWAA AirflowCliAccess

附属物

代码示例

以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。

  1. 在您的终端,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下示例的内容并本地另存为 invoke_dag.py。用您自己的信息替换以下值。

    • your-new-environment-name— 您要调用 DAG 的另一个环境的名称。

    • your-target-dag-id— 您要调用 DAG 的另一个环境中的 DAG ID。

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果 DAG 成功运行,您将看到类似于 invoke_dag_task 的任务日志中的以下内容的输出。

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    要验证 DAG 是否已成功调用,请导航到新环境的 Apache Airflow UI,然后执行以下操作:

    1. DAG 页面上,在 DAG 列表中找到新的目标 DAG。

    2. 上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 invoke_dag 的最新时间戳非常匹配。

    3. 近期任务下,检查上次运行是否成功。