spark3.x 生产调优笔记

1 spark sql写入mysql非常慢

  • 有这样一个业务场景:需要将通过Spark处理之后的数据写入MySQL,并在在网页端进行可视化输出。Spark处理之后有大概40万条数据,写入MySQL却要耗费将近30分钟,这也太慢了!
  • 后来翻看了Spark向JDBC数据源写数据的那部分源码,虽然源码中的实现使用的确实是 PreparedStatement 的addBatch()方法和executeBatch()方法,但是我们再去翻看executeBatch()方法的实现后发现,它并不是每次执行一批插入,而是循环的去执行每条insert插入语句,这就造成只插入一条数据,而不是一批数据,导致大多数的时间都耗费在了与数据库的交互连接上了
  • 解决方法:
jdbc.saas.url=jdbc:mysql://172.25.1.*/saas-hospital?characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true

2 spark sql jdbc并发分区

  • jdbcDF.rdd.partitions.size # 结果返回 1。该操作的并发度为1,你所有的数据都会在一个partition中进行操作,意味着无论你给的资源有多少,只有一个task会执行任务,执行效率可想而之,并且在稍微大点的表中进行操作分分钟就会OOM。
def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根据该字段分区,需要为整形,比如id等
  lowerBound: Long,      # 分区的下界
  upperBound: Long,      # 分区的上界
  numPartitions: Int,    # 分区的个数
  connectionProperties: Properties): DataFrame

# 指定字段区间分区
val predicates =
    Array(
      "2015-09-16" -> "2015-09-30",
      "2015-10-01" -> "2015-10-15",
      "2015-10-16" -> "2015-10-31",
      "2015-11-01" -> "2015-11-14",
      "2015-11-15" -> "2015-11-30",
      "2015-12-01" -> "2015-12-15"
    ).map {
      case (start, end) =>
        s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
    }

// 取得该表数据
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)

3 SparkSQL与Parquet格式兼容性

  • spark.sql.parquet.writeLegacyFormat 默认是false。如果设置为true 数据会以spark 1.4和更早的版本的格式写入。比如,decimal类型的值会被以apache parquet的fixed-length byte array格式写出,该格式是其他系统例如hive,impala等使用的。如果是false,会使用parquet的新版格式。例如,decimals会以int-based格式写出。

4 RDD复用并序列化存储

  • 必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据
    缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据
  • RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中
    public static final StorageLevel MEMORY_ONLY_SER_2 
    public static final StorageLevel MEMORY_AND_DISK_SER_2 

5 RDD合理设置并行度

  • Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark作业运行的效率。
val conf = new SparkConf().set("spark.default.parallelism", "500")

6 广播大变量

  • 默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。

7 Kryo 序列化

  • Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用
    Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注
    册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字
    符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了
public class MyKryoRegistrator implements KryoRegistrator {
     @Override
     public void registerClasses(Kryo kryo){
        kryo.register(StartupReportLogs.class);
     }
 }

//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "MyKryoRegistrator");

8 foreachPartition 优化数据库操作

  • 在生产环境中,通常使用 foreachPartition 算子来完成数据库的写入,通过 foreachPartition
    算子的特性,可以优化写数据库的性能。
  • 对于我们写的 function 函数,一次处理一整个分区的数据;
  • 对于一个分区内的数据,创建唯一的数据库连接;
  • 只需要向数据库发送一次 SQL 语句和多组参数;

9 repartition 解决 SparkSQL 低并行度问题

  • Spark SQL 的并行度不允许用户自己指定,Spark SQL 自己会默认根据 hive 表对应的
    HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通
    spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL 的 stage 中生效。
  • Spark SQL 这一步的并行度和 task 数量肯定是没有办法去改变了,但是,对于
    Spark SQL 查询出来的 RDD,立即使用 repartition 算子,去重新进行分区,这样可
    以重新分区为多个 partition,从 repartition 之后的 RDD 操作,由于不再设计 Spark
    SQL,因此 stage 的并行度就会等于你手动设置的值,这样就避免了 Spark SQL 所在
    的 stage 只能用少量的 task 去处理大量数据并执行复杂的算法逻辑。


    spark sql重分区

10 reduceByKey 预聚合

  • 本地聚合后,在 map 端的数据量变少,减少了磁盘 IO,也减少了对磁盘空间的占用;
  • 本地聚合后,下一个 stage 拉取的数据量变少,减少了网络传输的数据量;
  • 本地聚合后,在 reduce 端进行数据缓存的内存占用减少;
  • 本地聚合后,在 reduce 端进行聚合的数据量减少。
  • 基于 reduceByKey 的本地聚合特征,我们应该考虑使用 reduceByKey 代替其他的 shuffle 算
    子,例如 groupByKey。

11 故障:shuffle file not found

  • 原因:Shuffle 操作中,后面 stage 的 task 想要去上一个 stage 的
    task 所在的 Executor 拉取数据,结果对方正在执行 GC,执行 GC 会导致 Executor 内所有的工作现场全部停止,比如 BlockManager、基于 netty 的网络通信等,这就会导致后面的 task拉取数据拉取了半天都没有拉取到,就会报出 shuffle file not found 的错误,而第二次再次执行就不会再出现这种错误。
# 调整 reduce 端拉取数据重试次数和 reduce 端拉取数据时间间隔这两个参数
val conf = new SparkConf()
 .set("spark.shuffle.io.maxRetries", "60")
 .set("spark.shuffle.io.retryWait", "60s")

12 故障:OOM

  • Shuffle map端:map 端缓冲的默认配置是 32KB,如果每个 task 处理 640KB 的数据,那么会发生 640/32 = 20 次溢写,如果每个 task 处理 64000KB 的数据,机会发生 64000/32=2000 此溢写,这对于性能的影响是非常严重的。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
  • shuffle reduce端: reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量。reduce 端数据拉取缓冲区的大小可以通过 park.reducer.maxSizeInFlight 参数进行设置,默认为 48MB
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")

13 Executor 堆外内存故障:OOM与task lost

  • Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行 Spark作业会时不时地报错,例如 shuffle output file cannot find,executor lost,task lost,out of memory等,这可能是 Executor 的堆外内存不太够用,导致 Executor 在运行的过程中内存溢出。
  • Executor 的堆外内存主要用于程序的共享库、Perm Space、 线程 Stack 和一些 Memory mapping 等, 或者类 C 方式 allocate object。
  • stage 的 task 在运行的时候,可能要从一些 Executor 中去拉取 shuffle map output 文件,但是 Executor 可能已经由于内存溢出挂掉了,其关联的 BlockManager 也没有了,这就可能会报出 shuffle output file cannot find,executor lost,task lost,out of memory 等错误
# Executor 堆外内存的配置需要在 spark-submit :
--conf spark.yarn.executor.memoryOverhead=2048

14 spark数据倾斜

1. 数据倾斜的表现

  • Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出
    现了数据倾斜,作业可以运行,但是运行得非常慢;
  • Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM,
    反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正
    常运行。

2. 定位数据倾斜问题

  • 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;
  • 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个;

3. 解决方法1 - 过滤产生数据倾斜的key

  • 在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行
    过滤,滤除可能导致数据倾斜的 key 对应的数据

4. 解决方法2 - 聚合原始数据

  • 如果 Spark 作业的数据来源于 Hive 表,那么可以先在 Hive 表中对数据进行聚合,例如按照 key 进行分组,将同一 key 对应的所有 value 用一种特殊的格式拼接到一个字符串里去, 尚硅谷大数据技术之 Spark 优化 这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行处理时,只需要进行 map 操作即可,无需再进行任何的 shuffle 操作。通过上述方式就避免了执行 shuffle 操作,也就不可能会发生任何的数据倾斜问题。

5. 解决方法3 - 提高 shuffle 操作中的 reduce 并行度

  • 在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如 reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着
    创建指定数量的 reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。

6. 解决方法4 - 聚合算子:使用随机 key 实现双重聚合

  • 首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一
    样的 key 变成不一样的 key,然后进行第一次聚合,这样就可以让原本被一个 task 处理的数
    据分散到多个 task 上去做局部聚合;随后,去除掉每个 key 的前缀,再次进行聚合。

7. 解决方法5 - join算子:将 reduce join 转换为 map join

  • 将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。

8. 解决方法6 - join算子:sample 采样对倾斜 key 单独进行 join

  • 当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数
    据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来。

9. 解决方法7 - join算子:使用随机数扩容进行 join

  • 我们会将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理
    后的“不同 key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。这一种
    方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,需要对整个
    RDD 进行数据扩容,对内存资源要求很高。
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容