本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:配置 Apache Cassandra Spark 连接器
Apache Spark 是一个通用计算平台,您可以通过不同的方式进行配置。要配置 Spark 和 Spark Cassandra 连接器以与亚马逊 Keyspaces 集成,我们建议您从下一节中描述的最低配置设置开始,然后根据您的工作负载相应增加这些设置。
-
创建大小小于 8 MB 的 Spark 分区。
在 Spark 中,分区代表可以并行运行的原子数据块。当你使用 Spark Cassandra 连接器向亚马逊密钥空间写入数据时,Spark 分区越小,任务要写入的记录量就越少。如果 Spark 任务遇到多个错误,则在用尽指定的重试次数后它将失败。为避免重玩大型任务和重新处理大量数据,请保持 Spark 分区的大小。
-
在重试次数较大的情况下,每个执行器使用较低的并发写入次数。
由于操作超时,Amazon Keyspaces 会将容量不足的错误返回给 Cassandra 驱动程序。您无法通过更改配置的超时时间来解决容量不足导致的超时问题,因为 Spark Cassandra Connector 会尝试使用透明地重试请求
MultipleRetryPolicy
。为确保重试不会使驱动程序的连接池不堪重负,请使用较低的并发写入次数和大量重试次数。以下代码片段就是一个例子。spark.cassandra.query.retry.count = 500 spark.cassandra.output.concurrent.writes = 3
-
分解总吞吐量并将其分配到多个 Cassandra 会话中。
-
Cassandra Spark Connector 为每个 Spark 执行者创建一个会话。可以将此会话视为确定所需吞吐量和所需连接数量的规模单位。
-
定义每个执行器的内核数和每个任务的内核数时,从低处开始,然后根据需要增加。
-
将 Spark 任务失败设置为允许在出现暂时错误时进行处理。在您熟悉应用程序的流量特征和要求后,我们建议设置
spark.task.maxFailures
转换为有界值。 -
例如,以下配置可以处理每个执行器、每个会话的两个并发任务:
spark.executor.instances = configurable -> number of executors for the session. spark.executor.cores = 2 -> Number of cores per executor. spark.task.cpus = 1 -> Number of cores per task. spark.task.maxFailures = -1
-
-
关闭批处理。
-
我们建议您关闭批处理以改善随机访问模式。以下代码片段就是一个例子。
spark.cassandra.output.batch.size.rows = 1 (Default = None) spark.cassandra.output.batch.grouping.key = none (Default = Partition) spark.cassandra.output.batch.grouping.buffer.size = 100 (Default = 1000)
-
-
设置
SPARK_LOCAL_DIRS
到具有足够空间的快速本地磁盘。-
默认情况下,Spark 将地图输出文件和弹性分布式数据集 (RDD) 保存到
/tmp
文件夹。根据您的 Spark 主机的配置,这可能会导致设备上没有剩余空间风格错误。 -
要设置
SPARK_LOCAL_DIRS
名为的目录的环境变量/example/spark-dir
,你可以使用以下命令。export SPARK_LOCAL_DIRS=/example/spark-dir
-