优化 Spark 性能 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

优化 Spark 性能

Amazon EMR 为 Spark 提供了多项性能优化功能。本主题详细介绍了各个优化功能。

有关如何设置 Spark 配置的更多信息,请参阅 配置 Spark

自适应查询执行

自适应查询执行是一个基于运行时统计信息重新优化查询计划的框架。从亚马逊 EMR 5.30.0 开始,以下来自 Apache Spark 3 的自适应查询执行优化可在适用于 Spark 2 的 Apache EMR 运行时上使用。

  • 自适应连接转换

  • 随机分区的自适应合并

自适应联接转换

自适应连接转换通过基于查询阶段的运行时大小将排序合并联接操作转换为广播散列联接操作来提高查询性能。当连接的一侧小到足以在所有执行程序之间高效地广播其输出时,Broadcast-hash-连接往往表现更好,从而避免了对连接的两侧进行洗牌交换和排序的需要。自适应连接转换扩大了 Spark 自动执行广播哈希连接时的情况范围。

该功能已默认启用。它可以通过设置spark.sql.adaptive.enabledfalse,它还禁用自适应查询执行框架。Spark 决定在其中一个连接侧的运行时大小统计数据不超过时将排序合并连接转换为广播哈希连接spark.sql.autoBroadcastJoinThreshold,默认值为 10,485,760 字节(10 MiB)。

随机分区的自适应合并

随机分区的自适应合并通过合并小的连续随机分区,以避免出现太多小任务的开销,从而提高查询性能。这使您可以预先配置更多的初始洗牌分区,然后在运行时将其减少到目标大小,从而提高了分布更均匀的洗牌分区的可能性。

该功能已默认启用,除非spark.sql.shuffle.partitions被显式设置。它可以通过设置spark.sql.adaptive.coalescePartitions.enabledtrue。初始数量的洗牌分区和目标分区大小都可以使用spark.sql.adaptive.coalescePartitions.minPartitionNumspark.sql.adaptive.advisoryPartitionSizeInBytes属性。有关此功能的相关 Spark 属性的详细信息,请参阅下表。

火花自适应合并分区属性
属性 默认值 描述

spark.sql.adaptive.coalescePartitions.enabled

true,除非spark.sql.shuffle.partitions已显式设置

如果为 true,并启用了 spark.sql.适配性.适应性.为true,则 Spark 将根据目标大小合并连续的随机播放分区(由spark.sql.adaptive.advisoryPartitionSizeInBytes),以避免过多的小任务。

spark.sql.adaptive.advisoryPartitionSizeInBytes

64MB

合并时的 Shuffle 分区的指导大小(以字节为单位)。此配置仅在spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled代表这两者的true

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

合并后的最小随机放置分区数。此配置仅在spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled代表这两者的true

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

合并前随机播放分区的初始数量。此配置仅在spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled代表这两者的true

动态分区修剪

动态分区修剪通过针对特定的查询更准确地选择表中需要读取和处理的特定分区来提高作业性能。通过减少读取和处理的数据量,可节省大量的作业执行时间。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以通过设置 Spark 属性来启用此功能。spark.sql.dynamicPartitionPruning.enabled从 Spark 中或当创建集群时。

这种优化功能在 Spark 2.4.2 的现有功能基础之上进行改进,只支持向下推送可以在计划时解析的静态谓词。

以下是 Spark 2.4.2 中静态谓词向下推送的示例。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

动态分区修剪允许 Spark 引擎在运行时动态地推断哪些分区需要读取,哪些分区可以安全地消除。例如,以下查询涉及两个表:store_sales 表,其中包含所有店铺的全部总销售额(按区域分区);以及 store_regions 表,其中包含每个国家/地区的区域映射。这些表包含有关分布于全球的存储的数据,但我们只查询北美的数据。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

如果没有动态分区修剪,此查询将读取所有区域,然后过滤出与子查询的结果匹配的区域子集。使用动态分区修剪,此查询将只读取和处理子查询中返回的区域的分区。这样,通过减少数据存储和处理较少的记录,节省了时间和资源。

展平标量子查询

这种优化功能通过对同一个表执行标量子查询来提高查询的性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以通过设置 Spark 属性来启用此功能。spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled从 Spark 中或当创建集群时。当此属性设置为 true 时,查询优化程序会展平使用相同关系的聚合标量子查询(如果可能)。标量子查询通过以下方法展平:将子查询中存在的任何谓词推送到聚合函数,然后执行一个聚合(针对所有聚合函数,按每个关系)。

以下示例是一个将受益于此优化的查询示例。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

此优化将之前的查询重写为:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

请注意,重写的查询将只读取一次 student 表,而三个子查询的谓词将推送到 avg 函数中。

交叉前不同

这种优化可优化使用 INTERSECT 时的联接。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.24.0 和 5.25.0,您可以通过设置 Spark 属性来启用此功能。spark.sql.optimizer.distinctBeforeIntersect.enabled从 Spark 中或当创建集群时。使用 INTERSECT 的查询会自动转换为使用左半联接。当此属性设置为 true 时,如果查询优化程序检测到 DISTINCT 运算符可以进行左半联接 BroadcastHashJoin(而非 SortMergeJoin),它会将 DISTINCT 运算符推送到 INTERSECT 的子级。

以下示例是一个将受益于此优化的查询示例。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

如果没有启用此属性 spark.sql.optimizer.distinctBeforeIntersect.enabled,则查询将被重写,如下所示。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

当您启用此属性 spark.sql.optimizer.distinctBeforeIntersect.enabled 时,查询将被重写,如下所示。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bloom 筛选条件

这种优化可以通过使用从联接另一端的值生成的 Bloom 筛选条件对联接的一端进行预筛选,来提高部分联接的性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.25.0,您可以通过设置 Spark 属性来启用此功能。spark.sql.bloomFilterJoin.enabledtrue从 Spark 中或当创建集群时。

下面是一个可以受益于 Bloom 筛选条件的示例查询。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

启用此功能后,Bloom 筛选条件将根据所有类别位于要查询的类别集中的项目ID构建。扫描销售表时,Bloom 筛选条件用于确定哪些销售属于肯定不在 Bloom 筛选条件定义的集中的项目。借此,可以尽早筛选出这些被标识的销售。

优化的联接重新排序

这项优化通过将涉及带筛选条件的表的联接进行重新排序来提高查询性能。对于 Amazon EMR 5.26.0,此功能已默认启用。借助 Amazon EMR 5.25.0,您可以通过设置 Spark 配置参数来启用此功能。spark.sql.optimizer.sizeBasedJoinReorder.enabled设置为 true。Spark 的默认行为是从左到右联接表,如查询中所列。此策略可能会错过首先使用筛选条件执行较小联接的机会,以便之后利用更昂贵的联接。

下面的示例查询报告了一个国家/地区所有商店的所有退回商品。如果不经过优化的联接重新排序,Spark 首先会联接两个大型表 store_salesstore_returns,然后将其与 store 联接,最终再联接 item

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

经过优化的联接重新排序,Spark 首先会联接 store_salesstore,因为 store 有一个筛选条件并且小于 store_returnsbroadcastable。然后,Spark 会联接 store_returns,最后联接 item。如果 item 有一个筛选条件并且可广播,则其也符合重新排序的条件,这会使 store_salesstore 联接,之后联接 item,并在最后联接 store_returns