Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Apache Airflow REST API
Apache Airflow 托管工作流程(亚马逊 MWAA)支持在运行 Apache Airflow v2.4.3 及更高版本的环境中使用 Apache Airflow REST API 直接与你的 Apache Airflow 环境进行交互。这使您可以通过编程方式访问和管理您的 Amazon MWAA 环境,从而为调用数据编排工作流程、管理和监控各种 Apache Airflow 组件(例如元数据数据库 DAGs、触发器和调度程序)的状态提供了一种标准化的方式。
为了在使用 Apache Airflow REST API 时支持可扩展性,Amazon MWAA 为您提供了横向扩展网络服务器容量的选项,以应对不断增长的需求,无论是来自 REST API 请求、命令行界面 (CLI) 使用还是更多并发的 Apache Airflow 用户界面 (UI) 用户。有关 Amazon MWAA 如何扩展网络服务器的更多信息,请参阅。配置 Amazon MWAA 网络服务器自动扩展
您可以使用 Apache Airflow REST API 实现环境的以下应用场景:
-
编程访问 – 您现在可以在不依赖 Apache Airflow 用户界面或 CLI 的情况下,启动 Apache Airflow DAG 运行、管理数据集以及检索元数据数据库、触发器和调度器等各种组件的状态。
-
与外部应用程序和微服务集成 – 由于支持 REST API,您可以构建自定义解决方案以将您的 Amazon MWAA 环境与其他系统集成。例如,您可以启动工作流以响应来自外部系统的事件,例如已完成的数据库作业或新用户注册等。
-
集中监控 — 您可以构建监控控制面板,汇总多 DAGs 个 Amazon MWAA 环境中的状态,从而实现集中监控和管理。
有关 Apache Airflow REST API 的更多信息,请参阅 Apache Airflo w R EST API 参考。
通过使用InvokeRestApi
,您可以使用凭据访问 Apache Airflow REST API。 Amazon 或者,您也可以通过获取 Web 服务器访问令牌,然后使用该令牌调用它来访问它。
如果您在使用该InvokeRestApi
操作时遇到消息错误,Update your environment to use InvokeRestApi
则表示您需要更新您的 Amazon MWAA 环境。当您的 Amazon MWAA 环境与该功能相关的最新更改不兼容时,就会发生此错误。InvokeRestApi
要解决此问题,请更新您的 Amazon MWAA 环境以纳入该功能的必要更改。InvokeRestApi
该InvokeRestApi
操作的默认超时持续时间为 10 秒。如果操作未在这 10 秒的时间范围内完成,则操作将自动终止,并会引发错误。确保您的 REST API 调用设计为在此超时时间内完成,以避免遇到错误。
为了在使用 Apache Airflow REST API 时支持可扩展性,Amazon MWAA 提供了水平扩缩 Web 服务器容量的选项,以满足增加的需求,无论是来自 REST API 请求、命令行界面(CLI)的使用还是并发 Apache Airflow 用户界面(UI)用户数量的增加。有关 Amazon MWAA 如何扩展 Web 服务器的更多信息,请参阅。配置 Amazon MWAA 网络服务器自动扩展
您可以使用 Apache Airflow REST API 实现环境的以下应用场景:
-
编程访问 – 您现在可以在不依赖 Apache Airflow 用户界面或 CLI 的情况下,启动 Apache Airflow DAG 运行、管理数据集以及检索元数据数据库、触发器和调度器等各种组件的状态。
-
与外部应用程序和微服务集成 – 由于支持 REST API,您可以构建自定义解决方案以将您的 Amazon MWAA 环境与其他系统集成。例如,您可以启动工作流以响应来自外部系统的事件,例如已完成的数据库作业或新用户注册等。
-
集中监控 — 您可以构建监控控制面板,汇总多 DAGs 个 Amazon MWAA 环境中的状态,从而实现集中监控和管理。
有关 Apache Airflow REST API 的更多信息,请参阅《Apache Airflo w R EST API 参考》。
通过使用InvokeRestApi
,您可以使用凭据访问 Apache Airflow REST API。 Amazon 您也可以通过获取 Web 服务器访问令牌,然后使用该令牌进行调用的方式来访问。
-
如果您在使用该InvokeRestApi
操作时遇到消息错误,Update your environment to use InvokeRestApi
则表示您需要更新您的 Amazon MWAA 环境。当您的 Amazon MWAA 环境与该功能相关的最新更改不兼容时,就会发生此错误。InvokeRestApi
要解决此问题,请更新您的 Amazon MWAA 环境以纳入该功能的必要更改。InvokeRestApi
-
该InvokeRestApi
操作的默认超时持续时间为 10 秒。如果操作未在这 10 秒的时间范围内完成,则操作将自动终止,并会引发错误。确保您的 REST API 调用设计为在此超时时间内完成,以避免遇到错误。
响应负载大小不能超过 6 MB。如果超过此限制,则会RestApi
失败。
使用以下示例对 Apache Airflow REST API 进行 API 调用并开始新的 DAG 运行:
授予对 Apache Airflow REST API 的访问权限:airflow:InvokeRestApi
要 Amazon 使用证书访问 Apache Airflow REST API,您必须在 IAM 策略airflow:InvokeRestApi
中授予权限。在以下策略示例中,指定Admin
Op
、User
、Viewer
、或Public
角色{airflow-role}
以自定义用户访问权限级别。有关更多信息,请参阅 Apache Air flow 参考指南中的默认角色。
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "AllowMwaaRestApiAccess",
"Effect": "Allow",
"Action": "airflow:InvokeRestApi",
"Resource": [
"arn:aws:airflow:us-east-1
:111122223333
:role/{your-environment-name}/{airflow-role}"
]
}
]
}
配置私有 Web 服务器时,无法从 Virtual Private Cloud (VPC) 外部调用该InvokeRestApi
操作。您可以使用 aws:SourceVpc
键对此操作执行更精细的访问控制。有关更多信息,请参阅 a ws: SourceVpc。
调用 Apache Airflow REST API
以下示例脚本介绍了如何使用 Apache Airflow REST API 列出环境中的 DAGs 可用变量以及如何创建 Apache Airflow 变量:
import boto3
env_name = "MyAirflowEnvironment"
def list_dags(client):
request_params = {
"Name": env_name,
"Path": "/dags",
"Method": "GET",
"QueryParameters": {
"paused": False
}
}
response = client.invoke_rest_api(
**request_params
)
print("Airflow REST API response: ", response['RestApiResponse'])
def create_variable(client):
request_params = {
"Name": env_name,
"Path": "/variables",
"Method": "POST",
"Body": {
"key": "test-restapi-key",
"value": "test-restapi-value",
"description": "Test variable created by MWAA InvokeRestApi API",
}
}
response = client.invoke_rest_api(
**request_params
)
print("Airflow REST API response: ", response['RestApiResponse'])
if __name__ == "__main__":
client = boto3.client("mwaa")
list_dags(client)
create_variable(client)
创建网络服务器会话令牌并调用 Apache Airflow REST API
要创建网络服务器访问令牌,请使用以下 Python 函数。该函数会首先调用 Amazon MWAA API 来获取 Web 登录令牌。网络登录令牌将在 60 秒后过期,然后将其交换为网络会话令牌,该令牌允许您访问网络服务器并使用 Apache Airflow REST API。如果您需要每秒 10 个事务(TPS)以上的节流容量,则可以使用此方法访问 Apache Airflow REST API。
会话令牌将在 12 小时后过期。
从 Apache Airflow v2 到 v3 的以下代码示例中的主要变化是:
-
REST API 路径已从/api/v1
更改为 /api/v2
-
登录路径已从/aws_maa/login
更改为 /pluginsv2/aws_mwaa/login
-
登录响应response.cookies["_token"]
包含令牌信息,您必须在后续的 API 调用中使用这些信息
-
对于 REST API 调用,您必须在标头中传递jwt_token
信息,如下所示:
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json"
}
- Apache Airflow v3
-
def get_token_info(region, env_name):
logging.basicConfig(level=logging.INFO)
try:
# Initialize MWAA client and request a web login token
mwaa = boto3.client('mwaa', region_name=region)
response = mwaa.create_web_login_token(Name=env_name)
# Extract the web server hostname and login token
web_server_host_name = response["WebServerHostname"]
web_token = response["WebToken"]
# Construct the URL needed for authentication
login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
login_payload = {"token": web_token}
# Make a POST request to the MWAA login url using the login payload
response = requests.post(
login_url,
data=login_payload,
timeout=10
)
# Check if login was successful
if response.status_code == 200:
# Return the hostname and the session cookie
return (
web_server_host_name,
response.cookies['_token']
)
else:
# Log an error
logging.error("Failed to log in: HTTP %d", response.status_code)
return None
except requests.RequestException as e:
# Log any exceptions raised during the request to the MWAA login endpoint
logging.error("Request failed: %s", str(e))
return None
except Exception as e:
# Log any other unexpected exceptions
logging.error("An unexpected error occurred: %s", str(e))
return None
- Apache Airflow v2
-
def get_token_info(region, env_name):
logging.basicConfig(level=logging.INFO)
try:
# Initialize MWAA client and request a web login token
mwaa = boto3.client('mwaa', region_name=region)
response = mwaa.create_web_login_token(Name=env_name)
# Extract the web server hostname and login token
web_server_host_name = response["WebServerHostname"]
web_token = response["WebToken"]
# Construct the URL needed for authentication
login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
login_payload = {"token": web_token}
# Make a POST request to the MWAA login url using the login payload
response = requests.post(
login_url,
data=login_payload,
timeout=10
)
# Check if login was successful
if response.status_code == 200:
# Return the hostname and the session cookie
return (
web_server_host_name,
response.cookies['_token']
)
else:
# Log an error
logging.error("Failed to log in: HTTP %d", response.status_code)
return None
except requests.RequestException as e:
# Log any exceptions raised during the request to the MWAA login endpoint
logging.error("Request failed: %s", str(e))
return None
except Exception as e:
# Log any other unexpected exceptions
logging.error("An unexpected error occurred: %s", str(e))
return None
身份验证完成后,您就有了开始向 API 终端节点发送请求的凭据。在下一节的示例中,使用终端节点dags/{dag_name}/dagRuns
。
- Apache Airflow v3
-
def trigger_dag(region, env_name, dag_id):
"""
Triggers a DAG in a specified MWAA environment using the Airflow REST API.
Args:
region (str): AWS region where the MWAA environment is hosted.
env_name (str): Name of the MWAA environment.
dag_id (str): ID of the DAG to trigger.
"""
logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}")
# Retrieve the web server hostname and token for authentication
try:
web_server_host_name, jwt_token = get_token_info(region, env_name)
if not jwt_token:
logging.error("Authentication failed, no jwt token retrieved.")
return
except Exception as e:
logging.error(f"Error retrieving token info: {str(e)}")
return
# Prepare headers and payload for the request
request_headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json" # Good practice to include, even for GET
}
# sample request body input
json_body = {"logical_date": "2025-09-17T14:15:00Z"}
# Construct the URL for triggering the DAG
url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns"
# Send the POST request to trigger the DAG
try:
response = requests.post(url, headers=request_headers, json=json_body)
# Check the response status code to determine if the DAG was triggered successfully
if response.status_code == 200:
logging.info("DAG triggered successfully.")
else:
logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
except requests.RequestException as e:
logging.error(f"Request to trigger DAG failed: {str(e)}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Check if the correct number of arguments is provided
if len(sys.argv) != 4:
logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}")
sys.exit(1)
region = sys.argv[1]
env_name = sys.argv[2]
dag_id = sys.argv[3]
# Trigger the DAG with the provided arguments
trigger_dag(region, env_name, dag_id)
- Apache Airflow v2
-
def trigger_dag(region, env_name, dag_name):
"""
Triggers a DAG in a specified MWAA environment using the Airflow REST API.
Args:
region (str): AWS region where the MWAA environment is hosted.
env_name (str): Name of the MWAA environment.
dag_name (str): Name of the DAG to trigger.
"""
logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
# Retrieve the web server hostname and session cookie for authentication
try:
web_server_host_name, session_cookie = get_session_info(region, env_name)
if not session_cookie:
logging.error("Authentication failed, no session cookie retrieved.")
return
except Exception as e:
logging.error(f"Error retrieving session info: {str(e)}")
return
# Prepare headers and payload for the request
cookies = {"session": session_cookie}
json_body = {"conf": {}}
# Construct the URL for triggering the DAG
url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
# Send the POST request to trigger the DAG
try:
response = requests.post(url, cookies=cookies, json=json_body)
# Check the response status code to determine if the DAG was triggered successfully
if response.status_code == 200:
logging.info("DAG triggered successfully.")
else:
logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
except requests.RequestException as e:
logging.error(f"Request to trigger DAG failed: {str(e)}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Check if the correct number of arguments is provided
if len(sys.argv) != 4:
logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}")
sys.exit(1)
region = sys.argv[1]
env_name = sys.argv[2]
dag_name = sys.argv[3]
# Trigger the DAG with the provided arguments
trigger_dag(region, env_name, dag_name)