spark 参数调优(version 2.2.0) 持续更新...
1 Shuffle Behavior
—conf "spark.shuffle.compress=false"
默认true 设置shuffle 网络io 是否压缩 若网络io成为瓶颈建议true 若为cpu计算密集型 建议false
如果下游的Task通过网络获取上游Shuffle Map Task的结果的网络IO成为瓶颈,那么就需要考虑将它设置为true:通过压缩数 据来减少网络IO。由于上游Shuffle Map Task和下游的Task现阶段是不会并行处理的,即上游Shuffle Map Task处理完成, 然后下游的Task才会开始执行。因此如果需要压缩的时间消耗就是Shuffle MapTask压缩数据的时间 + 网络传输的时间 + 下游Task解压的时间;而不需要压缩的时间消耗仅仅是网络传输的时间。因此需要评估压缩解压时间带来的时间消耗和因为数据 压缩带来的时间节省。如果网络成为瓶颈,比如集群普遍使用的是千兆网络,那么可能将这个选项设置为true是合理的; 如果计算是CPU密集型的,那么可能将这个选项设置为false才更好。
--conf "spark.shuffle.spill.compress=false"
默认true 设置磁盘 disk io 是否压缩 若磁盘io成为瓶颈则建议设置true
如果设置为true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。 因此要综合考虑CPU由于引入压缩解压的消耗时间和Disk IO因为压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下, 这个被设置为true可能比较合适;如果本地硬盘是SSD,那么这个设置为false可能比较合适。
2 Spark UI
--conf "spark.ui.retainedJobs=20"
ui上保存的job总数 默认为1000
--conf "spark.ui.retainedStages=40"
ui上保存的stages总数 默认为1000
--conf "spark.sql.ui.retainedExecutions=0"
ui sql tab上保存的sql记录数,如果不需要调优平时可以直接关掉
--conf "spark.streaming.ui.retainedBatches=40"
ui streaming tab上保存的批处理日志数,默认为1000
--conf "spark.ui.retainedStages=40"
ui上保存的stages总数 默认为1000
全部调低大概可以节省几百M左右内存
3 Compression and Serialization
--conf "spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec"
默认lz4
用于压缩内部数据的编解码器,例如RDD分区,事件日志,广播变量和随机输出。默认情况下, Spark提供三种编解码器:lz4,lzf和snappy。还可以使用完全限定的类名来指定编解码器,
org.apache.spark.io.LZ4CompressionCodec (非常快速的压缩算法)
org.apache.spark.io.LZFCompressionCodec
org.apache.spark.io.SnappyCompressionCodec (快速压缩和解压缩)
-
snappy 从各个方面是相对均衡的压缩算法,使用场景比较多
-
lz4 是了解到各方面都比较优越的算法,但是hbase 由于许可证的原因,至少目前用不了这个东西
4 Spark Streaming
--conf "spark.streaming.stopGracefullyOnShutdown=true"
优雅的关闭spark streaming 任务 这样可以在diver 退出之前处理完所有已经已经接收的数据
--conf "spark.streaming.kafka.consumer.cache.enabled=false"
如果spark的批次时间batchTime超过了kafka的心跳时间(30s),需要增加hearbeat.interval.ms以及 session.timeout.ms。加入batchTime是5min,那么就需要调整group.max.session.timeout.ms。 消费者缓存默认为最大64条,如果希望处理超过(64*executor数量)kafka的分区, 可以调节spark.streaming.kafka.consumer.cache.maxCapacity 这个参数。 另外,可以调节spark.streaming.kafka.consumer.cache.enable=false来禁止缓存,可以解决Spark-19185的bug。
--conf "spark.streaming.backpressure.enabled=true"
Spark Streaming Back Pressure (反压) spark 1.5 引入
- spark.streaming.backpressure.initialRate: 启用反压机制时每个接收器接收第一批数据的初始最大速率。默认值没有设置。
- spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为 pid ,目前 Spark 只支持这个,大家可以根据自己的需要实现。
- spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的更改)。默认值为1,只能设置成非负值。
- spark.streaming.backpressure.pid.integral:错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2 ,只能设置成非负值。
- spark.streaming.backpressure.pid.derived:对错误趋势的响应权重。 这可能会引起 batch size 的波动,可以帮助快速增加/减少容量。默认值为0,只能设置成非负值。
- spark.streaming.backpressure.pid.minRate:可以估算的最低费率是多少。默认值为 100,只能设置成非负值。
5 Memory Management 内存管理
堆外内存 Spark 2.0 引入
spark 2.0 后 加入了堆外内存 也就是jvm以外的操作 避免了gc问题 以及jvm 内的类型转换 所以在spark 数据类型设置的时候 最好初始化就配置好尽量不要更改 列入 string format date , int to string 这种类型转换操作 因为所有的算子操作都是基于 堆外内存二进制数据执行计算的
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同, 所有运行中的并发任务共享存储内存和执行内存。
动态内存分配 spark 1.6 引入
Spark中内存主要用于两类目的:执行计算和数据存储。执行计算的内存主要用于Shuffle、关联(join)、排序(sort)以及聚合(aggregation),而数据存储的内存主要用于缓存和集群内部数据传播。Spark中执行计算和数据存储都是共享同一个内存区域(M)。 如果执行计算没有占用内存,那么数据存储可以申请占用所有可用的内存,反之亦然。执行计算可能会抢占数据存储使用的内存,并将存储于内存的数据逐出内存,直到数据存储占用的内存比例降低到一个指定的比例(R)。 换句话说,R是M基础上的一个子区域,这个区域的内存数据永远不会被逐出内存。然而,数据存储不会抢占执行计算的内存。
这样设计主要有这么几个需要考虑的点。首先,不需要缓存数据的应用可以把整个空间用来执行计算,从而避免频繁地把数据吐到磁盘上。其次,需要缓存数据的应用能够有一个数据存储比例(R)的最低保证,也避免这部分缓存数据被全部逐出内存。最后,这个实现方式能够在默认情况下,为大多数使用场景提供合理的性能,而不需要专家级用户来设置内存使用如何划分。
虽然有两个内存划分相关的配置参数,但一般来说,用户不需要设置,因为默认值已经能够适用于绝大部分的使用场景:
spark.memory.fraction:表示上面M的大小,其值为相对于JVM堆内存的比例(默认0.75)。剩余的25%是为其他用户的数据结构、Spark内部元数据以及避免OOM错误的安全预留空间。 spark.memory.storageFraction:表示上面R的大小,其值为相对于M的一个比例(默认0.5)。R是M中专门用于缓存数据块的部分,这部分数据块永远不会因执行计算任务而逐出内存。
last 疑难杂症解决
spark on yarn cluster ==>file.encoding 字符集是 ANSI_X3.4-1968
--conf 'spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8'
--conf 'spark.driver.extraJavaOptions=-Dfile.encoding=utf-8'