Using the Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using the Apache Airflow REST API

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) supports interacting with your Apache Airflow environments directly using the Apache Airflow REST API for environments running Apache Airflow v2.4.3 and above. This lets you access and manage your Amazon MWAA environments programmatically, providing a standardized way to invoke data orchestration workflows, manage your DAGs, and monitor the status of various Apache Airflow components such as the metadata database, triggerer, and scheduler.

In order to support directly using the Apache Airflow REST API, Amazon MWAA provides you with the option to horizontally scale web server capacity to handle increased demand, whether from REST API requests, command line interface (CLI) usage, or more concurrent Apache Airflow user interface (UI) users. For more information on how Amazon MWAA scales web servers, see Configuring Amazon MWAA web server automatic scaling.

You can use the Apache Airflow REST API to implement the following use-cases for your environments:

  • Programmatic access – You can now start Apache Airflow DAG runs, manage datasets, and retrieve the status of various components such as the metadata database, triggerers, and schedulers without relying on the Apache Airflow UI or CLI.

  • Integrate with external applications and microservices – REST API support allows you to build custom solutions that integrate your Amazon MWAA environments with other systems. For example, you can start workflows in response to events from external systems, such as completed database jobs or new user sign-ups.

  • Centralized monitoring – You can build monitoring dashboards that aggregate the status of your DAGs across multiple Amazon MWAA environments, enabling centralized monitoring and management.

The following topics show how you obtain a web server access token, then use that token to make API calls to the Apache Airflow REST API. In the following example, you will call the API to start a new DAG run.

For more information about the Apache Airflow REST API, see The Apache Airflow REST API Reference.

Create a web server session token

To create a web server access token, use the following Python function. This function first calls the Amazon MWAA API to obtain a web login token. The web login token, which expires after 60 seconds, is then exchanged for a web session token, which lets you access the web server and use the Apache Airflow REST API.

Note

The session token expires after 12 hours.

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Call the Apache Airflow REST API

Once authentication is complete, you have the credentials to start sending requests to the API endpoints. In the example below, use the endpoint /dags/dag_id/dag.

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)