使用 Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Apache Airflow REST API

Apache Airflow 托管工作流程(亚马逊 MWAA)支持在运行 Apache Airflow v2.4.3 及更高版本的环境中使用 Apache Airflow REST API 直接与你的 Apache Airflow 环境进行交互。这使您可以通过编程方式访问和管理 Amazon MWAA 环境,从而为调用数据编排工作流程、管理 DAG 以及监控各种 Apache Airflow 组件(例如元数据数据库、触发器和调度程序)的状态提供了一种标准化的方式。

为了支持直接使用 Apache Airflow REST API,Amazon MWAA 为您提供了横向扩展 Web 服务器容量的选项,以应对不断增长的需求,无论是来自 REST API 请求、命令行界面 (CLI) 使用还是更多并发 Apache Airflow 用户界面 (UI) 用户。有关 Amazon MWAA 如何扩展 Web 服务器的更多信息,请参阅。配置 Amazon MWAA 网络服务器自动扩展

你可以使用 Apache Airflow REST API 为你的环境实现以下用例:

  • 编程访问 — 现在,您可以启动 Apache Airflow DAG 运行、管理数据集以及检索元数据数据库、触发器和调度器等各种组件的状态,而无需依赖 Apache Airflow 用户界面或 CLI。

  • 与外部应用程序和微服务集成 — REST API 支持允许您构建自定义解决方案,将您的 Amazon MWAA 环境与其他系统集成。例如,您可以启动工作流以响应来自外部系统的事件,例如已完成的数据库作业或新用户注册。

  • 集中监控 — 您可以构建监控控制面板,汇总多个 Amazon MWAA 环境中 DAG 的状态,从而实现集中监控和管理。

以下主题展示了如何获取 Web 服务器访问令牌,然后使用该令牌对 Apache Airflow REST API 进行 API 调用。在以下示例中,您将调用 API 开始新的 DAG 运行。

有关 Apache Airflow REST API 的更多信息,请参阅 Apach e Airflow REST AP I 参考。

创建 Web 服务器会话令牌

要创建 Web 服务器访问令牌,请使用以下 Python 函数。此函数首先调用 Amazon MWAA API 来获取网页登录令牌。网络登录令牌将在 60 秒后过期,然后将其交换为网络会话令牌,该令牌允许您访问网络服务器并使用 Apache Airflow REST API。

注意

会话令牌将在 12 小时后过期。

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

调用 Apache 气流 REST API

身份验证完成后,您就有了开始向 API 端点发送请求的凭据。在下面的示例中,使用终端节点/dags/dag_id/dag

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)