Using Map state in Distributed mode
Step Functions provides a high-concurrency mode for the Map
state known as
Distributed mode. In this mode, the Map
state can
accept input from large-scale Amazon S3 data sources. For example, your input can be a JSON or
CSV file stored in an Amazon S3 bucket, or a JSON array passed from a previous step in the
workflow. A Map
state set to Distributed is known as a Distributed Map state. In this mode, the Map
state runs each iteration as a child workflow execution, which enables high concurrency of up to 10,000 parallel child workflow executions. Each child workflow execution has its own, separate execution history from that of the parent workflow.
Use the Map
state in Distributed mode when you need to orchestrate large-scale parallel workloads that meet any combination of the following conditions:
The size of your dataset exceeds 256 KB.
The workflow's execution event history exceeds 25,000 entries.
You need a concurrency of more than 40 parallel iterations.
For more information about working with large-scale parallel workloads, see Orchestrating large-scale parallel workloads in your state machines.
For an introduction to using the Distributed Map state, see the tutorial Copying large-scale CSV data using Distributed Map.
Tip
To deploy an example of a workflow that uses a Distributed Map state to your Amazon Web Services account, see Large-Scale Parallelization with Distributed Map
Contents
Key concepts in this topic
- Distributed mode
-
A processing mode of the
Map
state. In this mode, each iteration of theMap
state runs as a child workflow execution that enables high concurrency. Each child workflow execution has its own execution history, which is separate from the parent workflow's execution history. This mode supports reading input from large-scale Amazon S3 data sources. - Distributed Map state
-
A
Map
state set to Distributed processing mode. - Map workflow
A set of steps that a
Map
state runs.- Child workflow execution
-
An iteration of the Distributed Map state. A child workflow execution has its own execution history, which is separate from the parent workflow's execution history.
- Map Run
-
When you run a
Map
state in Distributed mode, Step Functions creates a Map Run resource. A Map Run refers to a set of child workflow executions that a Distributed Map state starts, and the runtime settings that control these executions. Step Functions assigns an Amazon Resource Name (ARN) to your Map Run. You can examine a Map Run in the Step Functions console. You can also invoke theDescribeMapRun
API action. A Map Run also emits metrics to CloudWatch.For more information, see Examining Map Run.
Distributed Map state fields
To use the Distributed Map state in your workflows, specify one or more of these fields. You specify these fields in addition to the common state fields.
Type
(Required)-
Sets the type of state, such as
Map
. ItemProcessor
(Required)-
Contains the following JSON objects that specify the
Map
state processing mode and definition.-
ProcessorConfig
– A JSON object that specifies the configuration for theMap
state. This object contains the following sub-fields:Mode
– Set toDISTRIBUTED
to use theMap
state in Distributed mode.Note
Currently, if you use the
Map
state inside Express workflows, you can't set theMode
toDISTRIBUTED
. However, if you use theMap
state inside Standard workflows, you can set theMode
toDISTRIBUTED
.ExecutionType
– Specifies the execution type for the Map workflow as either STANDARD or EXPRESS. You must provide this field if you specifiedDISTRIBUTED
for theMode
sub-field. For more information about workflow types, see Standard vs. Express Workflows.
StartAt
– Specifies a string that indicates the first state in a workflow. This string is case-sensitive and must match the name of one of the state objects. This state runs first for each item in the dataset. Any execution input that you provide to theMap
state passes to theStartAt
state first.States
– A JSON object containing a comma-delimited set of states. In this object, you define the Map workflow.
-
ItemReader
(Optional)-
Specifies a dataset and its location. The
Map
state receives its input data from the specified dataset.In Distributed mode, you can use either a JSON payload passed from a previous state or a large-scale Amazon S3 data source as the dataset. For more information, see ItemReader.
ItemsPath
(Optional)-
Specifies a reference path using the JsonPath
syntax to select the JSON node that contains an array of items inside the state input. In Distributed mode, you specify this field only when you use a JSON array from a previous step as your state input. For more information, see ItemsPath.
ItemSelector
(Optional)-
Overrides the values of individual dataset items before they're passed on to each
Map
state iteration.In this field, you specify a valid JSON input that contains a collection of key-value pairs. These pairs can either be static values that you define in your state machine definition, values selected from the state input using a path, or values accessed from the context object. For more information, see ItemSelector.
ItemBatcher
(Optional)-
Specifies to process the dataset items in batches. Each child workflow execution then receives a batch of these items as input. For more information, see ItemBatcher.
MaxConcurrency
(Optional)-
Specifies the number of child workflow executions that can run in parallel. The interpreter only allows up to the specified number of parallel child workflow executions. If you don't specify a concurrency value or set it to zero, Step Functions doesn't limit concurreny and runs 10,000 parallel child workflow executions.
Note
While you can specify a higher concurrency limit for parallel child workflow executions, we recommend that you don't exceed the capacity of a downstream Amazon service, such as Amazon Lambda.
ToleratedFailurePercentage
(Optional)-
Defines the percentage of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this percentage. Step Functions calculates the percentage of failed items as the result of the total number of failed or timed out items divided by the total number of items. You must specify a value between zero and 100. For more information, see Tolerated failure threshold.
ToleratedFailureCount
(Optional)-
Defines the number of failed items to tolerate in a Map Run. The Map Run automatically fails if it exceeds this number. For more information, see Tolerated failure threshold.
Label
(Optional)-
A string that uniquely identifies a
Map
state. For each Map Run, Step Functions adds the label to the Map Run ARN. The following is an example of a Map Run ARN with a custom label nameddemoLabel
:arn:aws-cn:states:us-east-1:123456789012:mapRun:demoWorkflow/demoLabel:3c39a231-69bb-3d89-8607-9e124eddbb0b
If you don't specify a label, Step Functions automatically generates a unique label.
Note
Labels can't exceed 40 characters in length, must be unique within a state machine definition, and can't contain any of the following characters:
-
Whitespace characters
-
Wildcard characters (
? *
) -
Bracket characters (
< > { } [ ]
) -
Special characters (
: ; , \ | ^ ~ $ # % & ` "
) -
Control characters (
\\u0000
-\\u001f
or\\u007f
-\\u009f
).
Step Functions allows you to create names for state machines, executions, activities, and labels that contain non-ASCII characters. These non-ASCII names don't work with Amazon CloudWatch. To ensure that you can track CloudWatch metrics, choose a name that uses only ASCII characters.
-
ResultWriter
(Optional)-
Specifies the Amazon S3 location where Step Functions writes all child workflow execution results.
Step Functions consolidates all child workflow execution data, such as execution input and output, ARN, and execution status. It then exports executions with the same status to their respective files in the specified Amazon S3 location. For more information, see ResultWriter.
If you don't export the
Map
state results, it returns an array of all the child workflow execution results. For example:[1, 2, 3, 4, 5]
ResultPath
(Optional)-
Specifies where in the input to place the output of the iterations. The input is then filtered as specified by the OutputPath field if present, before it is passed as the state's output. For more information, see Input and Output Processing.
ResultSelector
(Optional)-
Pass a collection of key-value pairs, where the values are static or selected from the result. For more information, see ResultSelector.
Retry
(Optional)-
An array of objects, called Retriers, that define a retry policy. An execution uses the retry policy if the state encounters runtime errors. For more information, see Examples using Retry and using Catch.
Note
If you define Retriers for the Distributed Map state, the retry policy applies to all of the child workflow executions the
Map
state started. For example, imagine yourMap
state started three child workflow executions, out of which one fails. When the failure occurs, the execution uses theRetry
field, if defined, for theMap
state. The retry policy applies to all the child workflow executions and not just the failed execution. If one or more child workflow executions fails, the Map Run fails.When you retry a
Map
state, it creates a new Map Run. Catch
(Optional)-
An array of objects, called Catchers, that define a fallback state. Step Functions uses the Catchers defined in
Catch
if the state encounters runtime errors. When an error occurs, the execution first uses any retriers defined inRetry
. If the retry policy isn't defined or is exhausted, the execution uses its Catchers, if defined. For more information, see Fallback States.
Distributed Map state example
To use the Map
state in Distributed mode, you must configure the
following required options:
ItemReader – Specifies the dataset and its location from which the
Map
state can read input.ItemProcessor – Specifies the following values:
ProcessorConfig
– Set theMode
andExecutionType
toDISTRIBUTED
andEXPRESS
respectively. This sets theMap
state's processing mode and the workflow type for child workflow executions that the Distributed Map state starts.StartAt
– The first state in the Map workflow.States
– Defines the Map workflow, which is a set of steps to repeat in each child workflow execution.
ResultWriter – Specifies the Amazon S3 location where Step Functions writes the Distributed Map state results.
Important
Make sure that the Amazon S3 bucket you use to export the results of a Map Run is under the same Amazon Web Services account and Amazon Web Services Region as your state machine. Otherwise, your state machine execution will fail with the
States.ResultWriterFailed
error.
You can optionally configure additional fields as explained in Distributed Map state input and output configuration.
The following is an example of a Distributed Map state definition that specifies the dataset as a CSV file stored in an Amazon S3 bucket. It also specifies a Lambda function that processes the data in each row of the CSV file. Because this example uses a CSV file, it also specifies the location of the CSV column headers. To view the complete state machine definition of this example, see the tutorial Copying large-scale CSV data using Distributed Map.
{
"Map": {
"Type": "Map",
"ItemReader": {
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "FIRST_ROW"
},
"Resource": "arn:aws-cn:states:::s3:getObject",
"Parameters": {
"Bucket": "Database
",
"Key": "csv-dataset/ratings.csv
"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS
"
},
"StartAt": "LambdaTask",
"States": {
"LambdaTask": {
"Type": "Task",
"Resource": "arn:aws-cn:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws-cn:lambda:us-west-2:123456789012:function:processCSVData
"
},
"End": true
}
}
},
"Label": "Map",
"End": true,
"ResultWriter": {
"Resource": "arn:aws-cn:states:::s3:putObject",
"Parameters": {
"Bucket": "myOutputBucket
",
"Prefix": "csvProcessJobs
"
}
}
}
}
Distributed Map state input and output configuration
You can configure the input and output for a Distributed Map state using
its
fields, such as ItemReader
and ResultWriter
.
The following examples show how you can configure the input and output in a Distributed Map state. To view the complete state machine definition containing these examples, see the tutorial Copying large-scale CSV data using Distributed Map.
Note
Based on your use case, you may not need to apply all of these fields. For more information about the following fields, see Map state input and output fields.
IAM policies to run Distributed Map state
When you include a Distributed Map state in your workflows, Step Functions needs appropriate permissions to allow the state machine role to invoke the StartExecution
API action for the Distributed Map state.
The following IAM policy example grants the least privileges required to your state machine role for running the Distributed Map state.
Note
Make sure that you replace
with the name of the state machine in which you're using the Distributed Map state. For example, stateMachineName
arn:aws-cn:states:
.us-west-2
:123456789012
:stateMachine:mystateMachine
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "states:StartExecution" ], "Resource": [ "arn:aws-cn:states:
region
:accountID
:stateMachine:stateMachineName
" ] }, { "Effect": "Allow", "Action": [ "states:DescribeExecution", "states:StopExecution" ], "Resource": "arn:aws-cn:states:region
:accountID
:execution:stateMachineName
/*" } ] }
In addition, you need to make sure that you've the least privileges necessary to access the Amazon resources used in the Distributed Map state, such as Amazon S3 buckets. For information, see IAM policies for using Distributed Map state.
Distributed Map state execution
When you run a Map
state in Distributed mode, Step Functions creates a Map Run resource. A Map Run refers to a set of child workflow executions that a Distributed Map state starts. You can view a Map Run in the Step Functions console. You can also invoke the DescribeMapRun
API action. A Map Run also emits metrics to CloudWatch.
The Step Functions console provides a Map Run Details page, which displays all the information related to a Distributed Map state execution. For example, you can view the status of the Distributed Map state's execution, the Map Run ARN, statuses of the items processed in the child workflows executions started by the Distributed Map state, and a list of all the child workflow executions. The console shows this information in a dashboard format.
For more information about viewing a Distributed Map state's execution in the console, see Examining Map Run.