Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

优化 Spark 性能

Amazon EMR 为 Spark 提供了多项性能优化功能。本主题详细介绍了各个优化功能。

有关如何设置 Spark 配置的更多信息,请参阅 配置 Spark

动态分区修剪

动态分区修剪通过针对特定的查询更准确地选择表中需要读取和处理的特定分区来提高作业性能。通过减少读取和处理的数据量,可节省大量的作业执行时间。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以从 Spark 中或在创建集群时,通过设置 Spark 属性 spark.sql.dynamicPartitionPruning.enabled 来启用此功能。

这种优化功能在 Spark 2.4.2 的现有功能基础之上进行改进,只支持向下推送可以在计划时解析的静态谓词。

以下是 Spark 2.4.2 中静态谓词向下推送的示例。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

动态分区修剪允许 Spark 引擎在运行时动态地推断哪些分区需要读取,哪些分区可以安全地消除。例如,以下查询涉及两个表:store_sales 表,其中包含所有店铺的全部总销售额(按区域分区);以及 store_regions 表,其中包含每个国家/地区的区域映射。这些表包含有关分布于全球的存储的数据,但我们只查询北美的数据。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

如果没有动态分区修剪,此查询将读取所有区域,然后过滤出与子查询的结果匹配的区域子集。使用动态分区修剪,此查询将只读取和处理子查询中返回的区域的分区。这样,通过减少数据存储和处理较少的记录,节省了时间和资源。

展平标量子查询

这种优化功能通过对同一个表执行标量子查询来提高查询的性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以从 Spark 中或在创建集群时,通过设置 Spark 属性 spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled 来启用此功能。当此属性设置为 true 时,查询优化程序会展平使用相同关系的聚合标量子查询(如果可能)。标量子查询通过以下方法展平:将子查询中存在的任何谓词推送到聚合函数,然后执行一个聚合(针对所有聚合函数,按每个关系)。

以下示例是一个将受益于此优化的查询示例。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

此优化将之前的查询重写为:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

请注意,重写的查询将只读取一次 student 表,而三个子查询的谓词将推送到 avg 函数中。

DISTINCT Before INTERSECT

这种优化可优化使用 INTERSECT 时的联接。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以从 Spark 中或在创建集群时,通过设置 Spark 属性 spark.sql.optimizer.distinctBeforeIntersect.enabled 来启用此功能。使用 INTERSECT 的查询会自动转换为使用左半联接。当此属性设置为 true 时,如果查询优化程序检测到 DISTINCT 运算符可以进行左半联接 BroadcastHashJoin(而非 SortMergeJoin),它会将 DISTINCT 运算符推送到 INTERSECT 的子级。

以下示例是一个将受益于此优化的查询示例。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

如果没有启用此属性 spark.sql.optimizer.distinctBeforeIntersect.enabled,则查询将被重写,如下所示。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

当您启用此属性 spark.sql.optimizer.distinctBeforeIntersect.enabled 时,查询将被重写,如下所示。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bloom 筛选条件联接

这种优化可以通过使用从联接另一端的值生成的 Bloom 筛选条件对联接的一端进行预筛选,来提高部分联接的性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.25.0,您可以从 Spark 中或在创建集群时,通过将 Spark 属性 spark.sql.bloomFilterJoin.enabled 设置为 true 来启用此功能。

下面是一个可以受益于 Bloom 筛选条件的示例查询。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

启用此功能后,Bloom 筛选条件将根据所有类别位于要查询的类别集中的项目ID构建。扫描销售表时,Bloom 筛选条件用于确定哪些销售属于肯定不在 Bloom 筛选条件定义的集中的项目。借此,可以尽早筛选出这些被标识的销售。

优化的联接重新排序

这项优化通过将涉及带筛选条件的表的联接进行重新排序来提高查询性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.25.0,您可以通过将 Spark 配置参数 spark.sql.optimizer.sizeBasedJoinReorder.enabled 设置为 true 来启用此功能。Spark 的默认行为是从左到右联接表,如查询中所列。此策略可能会错过首先使用筛选条件执行较小联接的机会,以便之后利用更昂贵的联接。

下面的示例查询报告了一个国家/地区所有商店的所有退回商品。如果不经过优化的联接重新排序,Spark 首先会联接两个大型表 store_salesstore_returns,然后将其与 store 联接,最终再联接 item

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

经过优化的联接重新排序,Spark 首先会联接 store_salesstore,因为 store 有一个筛选条件并且小于 store_returnsbroadcastable。然后,Spark 会联接 store_returns,最后联接 item。如果 item 有一个筛选条件并且可广播,则其也符合重新排序的条件,这会使 store_salesstore 联接,之后联接 item,并在最后联接 store_returns