Workload partitioning with bounded execution
Errors in Spark applications commonly arise from inefficient Spark scripts, distributed in-memory execution of large-scale transformations, and dataset abnormalities. There are many reasons that may cause driver or executor out of memory issues, for example a data skew, listing too many objects, or large data shuffles. These issues often appear when you are processing huge amounts of backlog data with Spark.
Amazon Glue allows you to solve OOM issues and make your ETL processing easier with workload partitioning. With workload partitioning enabled, each ETL job run only picks unprocessed data, with an upper bound on the dataset size or the number of files to be processed with this job run. Future job runs will process the remaining data. For example, if there are 1000 files need to be processed, you can set the number of files to be 500 and separate them into two job runs.
Workload partitioning is supported only for Amazon S3 data sources.
Enabling workload partitioning
You can enable bounded execution by manually setting the options in your script or by adding catalog table properties.
To enable workload partitioning with bounded execution in your script:
-
To avoid reprocessing data, enable job bookmarks in the new job or existing job. For more information, see Tracking Processed Data Using Job Bookmarks.
-
Modify your script and set the bounded limit in the additional options in the Amazon Glue
getSource
API. You should also set the transformation context for the job bookmark to store thestate
element. For example:Python
glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "table_name", redshift_tmp_dir = "", transformation_ctx = "datasource0", additional_options = { "boundedFiles" : "500", # need to be string # "boundedSize" : "1000000000" unit is byte } )
Scala
val datasource0 = glueContext.getCatalogSource( database = "database", tableName = "table_name", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions( Map("boundedFiles" -> "500") // need to be string //"boundedSize" -> "1000000000" unit is byte ) ).getDynamicFrame()
val connectionOptions = JsonOptions( Map("paths" -> List(baseLocation), "boundedFiles" -> "30") ) val source = glueContext.getSource("s3", connectionOptions, "datasource0", "")
To enable workload partitioning with bounded execution in your Data Catalog table:
-
Set the key-value pairs in the
parameters
field of your table structure in the Data Catalog. For more information, see Viewing and Editing Table Details. -
Set the upper limit for the dataset size or the number of files processed:
Set
boundedSize
to the target size of the dataset in bytes. The job run will stop after reaching the target size from the table.Set
boundedFiles
to the target number of files. The job run will stop after processing the target number of files.
Note
You should only set one of
boundedSize
orboundedFiles
, as only a single boundary is supported.
Setting up an Amazon Glue trigger to automatically run the job
Once you have enabled bounded execution, you can set up an Amazon Glue trigger to automatically run the job and incrementally load the data in sequential runs. Go to the Amazon Glue Console and create a trigger, setup the schedule time, and attach to your job. Then it will automatically trigger the next job run and process the new batch of data.
You can also use Amazon Glue workflows to orchestrate multiple jobs to process data from different partitions in parallel. For more information, see Amazon Glue Triggers and Amazon Glue Workflows.
For more information on use cases and options, please refer to the blog Optimizing Spark applications with workload partitioning in Amazon Glue