本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
DAGs 在不同的 Amazon MWAA 环境中调用
以下代码示例创建了一个 Apache Airflow CLI 令牌。然后,该代码使用一个 Amazon MWAA 环境中的有向无环图(DAG)在另一个 Amazon MWAA 环境中调用 DAG。
版本
你可以将本页上的代码示例与 Python 3. 10 中的 Apache Airflow v2 和 Python
先决条件
要使用本页上的代码示例,您需要以下内容:
-
两个具有公共网络网络服务器访问权限的 A mazon MWAA 环境,包括您当前的环境。
-
上传到目标环境的 Amazon Simple Storage Service(Amazon S3)桶的示例 DAG。
权限
要使用本页上的代码示例,环境的执行角色必须具有创建 Apache Airflow CLI 令牌的权限。您可以附加 Amazon-managed 策略AmazonMWAAAirflowCliAccess来授予此权限。
有关更多信息,请参阅Apache Airflow CLI 政策:亚马逊 MWAAAirflow CliAccess。
依赖项
要在 Apache Airflow v2 及更高版本中使用此代码示例,不需要额外的依赖关系。用于aws-mwaa-docker-images
代码示例
以下代码示例假设您在当前环境中使用 DAG 在另一个环境中调用 DAG。
-
在您的终端,导航到存储 DAG 代码的目录。例如:
cd dags -
复制以下示例的内容并本地另存为
invoke_dag.py。用您自己的信息替换以下值。-
your-new-environment-name— 您要调用 DAG 的另一个环境的名称。 -
your-target-dag-id— 您要调用 DAG 的另一个环境中的 DAG ID。
from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag() -
-
运行以下 Amazon CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow 用户界面触发 DAG。
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
如果 DAG 成功运行,您将在的任务日志中获得类似于以下内容的输出
invoke_dag_task。[2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
要验证 DAG 是否已成功调用,请导航到新环境的 Apache Airflow UI,然后执行以下操作:
-
在该DAGs页面上,在列表中找到您的新目标 DAG DAGs。
-
在上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中
invoke_dag的最新时间戳非常匹配。 -
在近期任务下,检查上次运行是否成功。
-