Call Amazon EMR on EKS with Amazon Step Functions
Step Functions can control certain Amazon services directly from the Amazon States Language. For more information about working with Amazon Step Functions and its integrations, see the following:
How the Optimized Amazon EMR on EKS integration is different than the Amazon EMR on EKS Amazon SDK integration
-
The Run a Job (.sync) integration pattern is supported.
-
There are no optimizations for the Request Response integration pattern.
-
The Wait for a Callback with the Task Token integration pattern is not supported.
Note
For integration with Amazon EMR, Step Functions has a hard-coded 60 seconds job polling frequency for the first 10 minutes and 300 seconds after that.
To integrate Amazon Step Functions with Amazon EMR on EKS, use the Amazon EMR on EKS service integration APIs. The service integration APIs are the same as the corresponding Amazon EMR on EKS APIs, but not all APIs support all integration patterns, as shown in the following table.
API | Request response | Run a job (.sync) |
---|---|---|
CreateVirtualCluster | ✓ | |
DeleteVirtualCluster | ✓ | ✓ |
StartJobRun | ✓ | ✓ |
Supported Amazon EMR on EKS APIs:
Note
There is a quota for the maximum input or result data size for a task in Step Functions. This restricts you to 262,144 bytes of data as a UTF-8 encoded string when you send to, or receive data from, another service. See Quotas related to state machine executions.
The following includes a Task
state that creates a virtual cluster.
"Create_Virtual_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::emr-containers:createVirtualCluster",
"Parameters": {
"Name": "MyVirtualCluster",
"ContainerProvider": {
"Id": "EKSClusterName",
"Type": "EKS",
"Info": {
"EksInfo": {
"Namespace": "Namespace"
}
}
}
},
"End": true
}
The following includes a Task
state that submits a job to a virtual cluster
and waits for it to complete.
"Submit_Job": {
"Type": "Task",
"Resource": "arn:aws:states:::emr-containers:startJobRun.sync",
"Parameters": {
"Name": "MyJobName",
"VirtualClusterId.$": "$.VirtualClusterId",
"ExecutionRoleArn": "arn:aws:iam::<accountId>
:role/job-execution-role",
"ReleaseLabel": "emr-6.2.0-latest",
"JobDriver": {
"SparkSubmitJobDriver": {
"EntryPoint": "s3://<mybucket>
/jobs/trip-count.py",
"EntryPointArguments": [
"60"
],
"SparkSubmitParameters": "--conf spark.driver.cores=2 --conf spark.executor.instances=10 --conf spark.kubernetes.pyspark.pythonVersion=3 --conf spark.executor.memory=10G --conf spark.driver.memory=10G --conf spark.executor.cores=1 --conf spark.dynamicAllocation.enabled=false"
}
},
"ConfigurationOverrides": {
"ApplicationConfiguration": [
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.instances": "2",
"spark.executor.memory": "2G"
}
}
],
"MonitoringConfiguration": {
"PersistentAppUI": "ENABLED",
"CloudWatchMonitoringConfiguration": {
"LogGroupName": "MyLogGroupName",
"LogStreamNamePrefix": "MyLogStreamNamePrefix"
},
"S3MonitoringConfiguration": {
"LogUri": "s3://<mylogsbucket>
"
}
}
},
"Tags": {
"taskType"
: "jobName"
}
},
"End": true
}
The following includes a Task
state that deletes a virtual cluster and waits
for the deletion to complete.
"Delete_Virtual_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::emr-containers:deleteVirtualCluster.sync",
"Parameters": {
"Id.$": "$.VirtualClusterId"
},
"End": true
}
For information on how to configure IAM when using Step Functions with other Amazon services, see IAM Policies for integrated services.