Spark 结果片段缓存 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Spark 结果片段缓存

Amazon EMR 6.6.0 及更高版本包含可选的 Spark 结果片段缓存功能,该功能可自动缓存结果片段。这些结果片段是查询子树的结果的一部分,其存储在您选择的 Amazon S3 存储桶中。存储的查询结果片段将在后续查询执行时重复使用,从而加快查询速度。

结果片段缓存的工作原理是分析 Spark SQL 查询并将符合条件的结果片段缓存在指定的 S3 位置。在后续查询运行中,系统会自动检测并从 S3 中获取可用的查询结果片段。结果片段缓存不同于结果集缓存,其中,后续查询必须与原始查询完全匹配才能从缓存返回结果。当用于重复以静态数据子集为目标的查询时,结果片段缓存可显著提高性能。

请考虑以下查询,它计算 2022 年之前的订单:

select l_returnflag, l_linestatus, count(*) as count_order from lineitem where l_shipdate <= current_date and year(l_shipdate) == '2022' group by l_returnflag, l_linestatus

随着时间推移,此查询需要每天运行以报告当年的总销售额。如果没有结果片段缓存,则需要每天重新计算一年中所有日期的结果。随着时间推移,查询速度会变慢,并且在年底最慢,届时将需要重新计算所有 365 天的结果。

当您激活结果片段缓存时,将使用缓存中一年所有以前日期的结果。每天,该功能只能重新计算一天的结果。在该功能计算结果片段后,该功能将缓存片段。因此,启用缓存的查询时间很快,并且每次后续查询都保持不变。

启用 Spark 结果片段缓存

要启用 Spark 结果片段缓存,请执行以下步骤:

  1. 在 Amazon S3 中创建缓存存储桶并授权 EMRFS 的读/写访问。有关更多信息,请参阅授予对 Amazon S3 中的 EMRFS 数据的访问权

  2. 设置 EMR Spark 配置以启用该功能。

    spark.subResultCache.enabled = true spark.subResultCache.fs.root.path = s3://DOC-EXAMPLE-BUCKET/cache_dir/
  3. 为存储桶启用 S3 生命周期管理以自动清理缓存文件。

  4. 或者,配置 reductionRationThreshold 和 maxBufferSize 属性以进一步调整该功能。

    spark.sql.subResultCache.reductionRatioThreshold spark.sql.subResultCache.maxBufferSize

使用结果片段缓存时的注意事项

当您使用已缓存在 Amazon S3 中的结果而不是重新计算它们时,所节省的成本会随着使用相同缓存结果的次数而增加。对于具有大表扫描后跟筛选条件或散列聚合,并且将结果大小减少至少 8 倍(即输入大小:结果的比率至少为 8:1)的查询将从此功能受益最多。输入和结果之间的缩减率越大,成本效益就越大。只要生成结果的成本高于从 Amazon S3 获取结果的成本,缩减率较小、但在表扫描和筛选条件或聚合之间包含昂贵计算步骤的查询也将受益。默认情况下,结果片段缓存仅在检测到缩减率至少为 8:1 时才生效。

当查询重复使用缓存的结果时,此功能的好处最大。滚动和增量窗口查询就是很好的例子。例如,一个 30 天滚动窗口查询已经运行了 29 天,它只需要从其原始输入源提取 1/30 的目标数据,并将使用前 29 天的缓存结果片段。增量窗口查询将受益更多,因为窗口的开始保持固定:在每次调用查询时,需要从输入源读取的处理比例较小。

以下是使用结果片段缓存时的其他注意事项:

  • 如果查询的目标不是具有相同查询片段的相同数据,则缓存命中率较低,因此不会从此功能受益。

  • 如果查询的缩减率较低且不包含昂贵的计算步骤,则将导致缓存结果的读取开销与初始处理的开销大致相同。

  • 由于写入缓存的成本,第一个查询将始终显示较小的回归。

  • 结果片段缓存功能仅适用于 Parquet 文件。不支持其他文件格式。

  • 结果片段缓存功能缓冲区将仅尝试缓存文件拆分大小为 128 MB 或更大的扫描。在默认 Spark 配置下,如果扫描大小(正扫描的所有文件的总大小)除以执行程序内核数小于 128 MB,则结果片段缓存将被禁用。如果设置了下面所列的任何 Spark 配置,则文件拆分大小将为:

    min(maxPartitionBytes, max(openCostInBytes, scan size / minPartitionNum))
    • spark.sql.leafNodeDefaultParallelism(默认值为 spark.default.parallelism)

    • spark.sql.files.minPartitionNum(默认值为 spark.sql.leafNodeDefaultParallelism)

    • spark.sql.files.openCostInBytes

    • spark.sql.files.maxPartitionBytes

  • 结果片段缓存功能以 RDD 分区粒度缓存。前面描述的默认 8:1 缩减率是按每个 RDD 分区评估的。与每 RDD 缩减率始终低于 8:1 的工作负载相比,每 RDD 缩减率大于和低于 8:1 的工作负载的性能优势可能更小。

  • 默认情况下,结果片段缓存功能对缓存的每个 RDD 分区使用 16MB 写入缓冲区。如果每个 RDD 分区的缓存超过 16MB,则确定无法进行写入的成本可能会导致性能下降。

  • 默认情况下,结果片段缓存不会尝试缓存缩减率小于 8:1 的 RDD 分区结果,并将写入缓冲区限制为 16MB,但这两个值都可以通过以下配置进行调整:

    spark.sql.subResultCache.reductionRatioThreshold (default: 8.0) spark.sql.subResultCache.maxBufferSize (default: 16MB, max: 64MB)
  • 使用相同 EMR 发行版的多个集群可以共享同一个缓存位置。为了确保结果的正确性,结果片段缓存将不使用不同发行版的 Amazon EMR 写入的缓存结果。

  • 对于 Spark Streaming 用例或当使用 RecordServer、Apache Ranger 或 Amazon Lake Formation 时,将自动禁用结果片段缓存。

  • 结果片段缓存读/写使用 EMRFS 和 Amazon S3 存储桶。支持 CSE/SSE S3/SSE KMS 加密。