使用 Lambda 函数调用 DAG - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数调用 DAG

以下代码示例使用 Amazon Lambda 函数获取 Apache Airflow CLI 令牌并在 Amazon MWAA 环境中调用有向无环图(DAG)。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用此代码示例,您必须:

注意

如果 Lambda 函数和 Amazon MWAA 环境处于同一 VPC 中,则可以在私有网络上使用此代码。对于本配置,Lambda 函数的执行角色需要获得调用 Amazon Elastic Compute Cloud(Amazon EC2)CreateNetworkInterface API 操作的权限。您可以使用 AWSLambdaVPCAccessExecutionRole Amazon 托管策略添加此权限。

权限

要使用本页上的代码示例,Amazon MWAA 环境的执行角色需要访问权限才能执行 airflow:CreateCliToken 操作。您可以使用 AmazonMWAAAirflowCliAccess Amazon 托管策略添加此权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

有关更多信息,请参阅 Apache Airflow CLI 政策:AmazonmWAA AirflowCliAccess

依赖项

代码示例

  1. 打开 Amazon Lambda 控制台,地址:https://console.aws.amazon.com/lambda/

  2. Functions 列表中选择 Lambda 函数。

  3. 在函数页面上,复制以下代码并将以下代码替换为资源名称:

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 环境名称。

    • YOUR_DAG_NAME – 您想调用的 DAG 名称。

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. 选择 Deploy(部署)。

  5. 选择 Test,使用 Lambda 控制台调用函数。

  6. 要验证 Lambda 是否成功调用了 DAG,请使用 Amazon MWAA 控制台导航到环境的 Apache Airflow UI 界面,然后执行以下操作:

    1. DAG 页面上,在 DAG 列表中找到新的目标 DAG。

    2. 上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 invoke_dag 的最新时间戳非常匹配。

    3. 近期任务下,检查上次运行是否成功。