Submitting EMR Serverless jobs from Airflow - Amazon EMR
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China.

Submitting EMR Serverless jobs from Airflow

An official Airflow 2.0 operator is currently under development for Amazon EMR Serverless. In the meantime, you can install a preview version of the operator from the EMR Serverless Samples GitHub repository.

You can use EmrServerlessCreateApplicationOperator to create a Spark or Hive application. You can also use EmrServerlessStartJobOperator to start one or more jobs with the your new application.

To use the operator, add the following line to your requirements.txt file and update your MWAA environment to use the new file.

emr_serverless @

The following abbreviated example shows how to create an application, run multiple Spark jobs, and then stop the application. A full example is available in the EMR Serverless Samples GitHub repository.

from datetime import datetime from airflow import DAG from emr_serverless.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "DOC-EXAMPLE-BUCKET" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application-id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)