本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:配置 Apache Cassandra Spark Connector
Apache Spark 是一种通用计算平台,可以用不同的方式进行配置。要配置 Spark 和 Spark Cassandra Connector 以便与 Amazon Keyspaces 集成,我们建议您从下一节介绍的最低配置设置开始,然后根据您的工作负载提高配置设置。
-
创建大小小于 8 的 Spark 分区MBs。
在 Spark 中,分区代表可以并行运行的原子数据块。当您使用 Spark Cassandra Connector 向 Amazon Keyspaces 写入数据时,Spark 分区越小,任务要写入的记录量就越小。如果 Spark 任务遇到多个错误,则在进行指定次数的重试后,任务将会失败。为避免重新执行大型任务和重新处理大量数据,请将 Spark 分区的大小保持在较小的范围内。
-
每个执行程序采用较低的并发写入次数和较高的重试次数。
如果操作超时,Amazon Keyspaces 会向 Cassandra 驱动程序返回容量不足错误。您无法通过更改配置的超时持续时间来解决因容量不足而导致的超时问题,因为 Spark Cassandra Connector 会尝试使用
MultipleRetryPolicy
以透明方式重试请求。为确保重试不会让驱动程序的连接池不堪重负,请让每个执行程序采用较低的并发写入次数和较高的重试次数。以下代码段是一个这方面的示例。spark.cassandra.query.retry.count = 500 spark.cassandra.output.concurrent.writes = 3
-
分解总吞吐量并将其分配到多个 Cassandra 会话中。
-
Cassandra Spark Connector 会为每个 Spark 执行程序创建一个会话。我们可以把这一会话视为确定所需吞吐量和所需连接数量的扩展单元。
-
在定义每个执行程序的内核数和每个任务的内核数时,请从低开始并根据需要增加数量。
-
设置 Spark 任务失败次数以便在出现暂时错误时继续处理。在您熟悉应用程序的流量特征和要求后,我们建议将
spark.task.maxFailures
设置为一个有界值。 -
例如,以下配置可以让每个执行程序在每个会话中处理两个并发任务:
spark.executor.instances = configurable -> number of executors for the session. spark.executor.cores = 2 -> Number of cores per executor. spark.task.cpus = 1 -> Number of cores per task. spark.task.maxFailures = -1
-
-
关闭批处理。
-
我们建议您关闭批处理以改进随机访问模式。以下代码段是一个这方面的示例。
spark.cassandra.output.batch.size.rows = 1 (Default = None) spark.cassandra.output.batch.grouping.key = none (Default = Partition) spark.cassandra.output.batch.grouping.buffer.size = 100 (Default = 1000)
-
-
将
SPARK_LOCAL_DIRS
设置为具有足够空间的本地高速磁盘。-
默认情况下,Spark 会将地图输出文件和弹性分布式数据集 (RDDs) 保存到
/tmp
文件夹。根据您的 Spark 主机的配置,这可能会导致设备没有剩余空间这一类型的错误。 -
要将
SPARK_LOCAL_DIRS
环境变量设置为名为/example/spark-dir
的目录,您可以使用以下命令。export SPARK_LOCAL_DIRS=/example/spark-dir
-