

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# DAGs 在不同的 Amazon MWAA 环境中调用
<a name="samples-invoke-dag"></a>

以下代码示例创建了一个 Apache Airflow CLI 令牌。然后，该代码使用一个 Amazon MWAA 环境中的有向无环图（DAG）在另一个 Amazon MWAA 环境中调用 DAG。

**Topics**
+ [版本](#samples-invoke-dag-version)
+ [先决条件](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [依赖项](#samples-invoke-dag-dependencies)
+ [代码示例](#samples-invoke-dag-code)

## 版本
<a name="samples-invoke-dag-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-invoke-dag-prereqs"></a>

要使用本页上的代码示例，您需要以下内容：
+ 两个具有**公有网络** Web 服务器访问权限的 [Amazon MWAA 环境](get-started.md)，包括您当前的环境。
+ 上传到目标环境的 Amazon Simple Storage Service（Amazon S3）桶的示例 DAG。

## Permissions
<a name="samples-invoke-dag-permissions"></a>

要使用本页上的代码示例，环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加 Amazon-managed 策略`AmazonMWAAAirflowCliAccess`来授予此权限。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

有关更多信息，请参阅[Apache Airflow CLI 政策：亚马逊 MWAAAirflow CliAccess](access-policies.md#cli-access)。

## 依赖项
<a name="samples-invoke-dag-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-invoke-dag-code"></a>

以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。

1. 在您的终端，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下示例的内容并本地另存为 `invoke_dag.py`。用您自己的信息替换以下值。
   + `your-new-environment-name`— 您要调用 DAG 的另一个环境的名称。
   + `your-target-dag-id`— 您要调用 DAG 的另一个环境中的 DAG ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶，然后使用 Apache Airflow 用户界面触发 DAG。

   ```
   aws s3 cp {{your-dag}}.py s3://{{your-environment-bucket}}/dags/
   ```

1. 如果 DAG 成功运行，您将看到类似于 `invoke_dag_task` 的任务日志中的以下内容的输出。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   要验证 DAG 是否已成功调用，请导航到新环境的 Apache Airflow UI，然后执行以下操作：

   1. 在该**DAGs**页面上，在列表中找到您的新目标 DAG DAGs。

   1. 在**上次运行**下，查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 `invoke_dag` 的最新时间戳非常匹配。

   1. 在**近期任务**下，检查上次运行是否成功。