首页 > 代码库 > Spark 开发中遇到的一些问题

Spark 开发中遇到的一些问题

1.StackOverflowError

问题:简单代码记录 :

for (day <- days){

  rdd = rdd.union(sc.textFile(/path/to/day) .... )

}

大概场景就是我想把数量比较多的文件合并成一个大rdd,从而导致了栈溢出;

解决:很明显是方法递归调用太多,我之后改成了几个小任务进行了合并;这里union也可能会造成最终rdd分区数过多

2.java.io.FileNotFoundException: /tmp/spark-90507c1d-e98 ..... temp_shuffle_98deadd9-f7c3-4a12(No such file or directory) 类似这种 

报错:Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 76.0 failed 4 times, most recent failure: Lost task 0.3 in stage 76.0 (TID 341, 10.5.0.90): java.io.FileNotFoundException: /tmp/spark-90507c1d-e983-422d-9e01-74ff0a5a2806/executor-360151d5-6b83-4e3e-a0c6-6ddc955cb16c/blockmgr-bca2bde9-212f-4219-af8b-ef0415d60bfa/26/temp_shuffle_98deadd9-f7c3-4a12-9a30-7749f097b5c8 (No such file or directory)

场景:大概代码和上面差不多:

for (day <- days){

  rdd = rdd.union(sc.textFile(/path/to/day) .... )

}

rdd.map( ... )

解决:简单的map都会报错,怀疑是临时文件过多;查看一下rdd.partitions.length 果然有4k多个;基本思路就是减少分区数

可以在union的时候就进行重分区:

for (day <- days){

  rdd = rdd.union(sc.textFile(/path/to/day,numPartitions) .... )

} //这里因为默认哈希分区,并且分区数相同;所有最终union的rdd的分区数不会增多,贴一下源码以防说错

  /** Build the union of a list of RDDs. */
  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
    val partitioners = rdds.flatMap(_.partitioner).toSet
    if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
      /*这里如果rdd的分区函数都相同则会构建一个PartitionerAwareUnionRDD:m RDDs with p partitions each
 * will be unified to a single RDD with p partitions*/
      new PartitionerAwareUnionRDD(this, rdds)
    } else {
      new UnionRDD(this, rdds)
    }
  }

或者最后在重分区

for (day <- days){

  rdd = rdd.union(sc.textFile(/path/to/day) .... )

rdd.repartition(numPartitions)

 

Spark 开发中遇到的一些问题