首页 > 代码库 > 使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

package com.huawei.bigdata.spark.examples

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by wulei on 2017/8/3.
  */
object PointCorrPredict {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("PointCorrPredict")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("use vio_offsite")
    //360111010002,360102029001
    val dataFrame = sqlContext.sql("select kk_id,direct,day,hour,cnt,speed from kk_hour_scale").orderBy("day","hour")
    val newDataFrame = dataFrame.filter("kk_id = ‘3601110100‘and direct = ‘02‘")
                      .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
      .rdd.map(row=>row.getAs[Double]("cnt"))
    /*val dd =  newDataFrame.collect().take(3)
   dd.foreach(println)*/
    val destinationDataFrame = sqlContext.sql("select origin_kakou,destination_kakou from kk_relation ")
    val newDestinationDataFrame = destinationDataFrame.filter("origin_kakou = ‘360111010002‘").select("destination_kakou").collect()
    for (i <- 0 until newDestinationDataFrame.length){
      println(newDestinationDataFrame(i))
      println(newDestinationDataFrame(i).toString().substring(1,11))
      println(newDestinationDataFrame(i).toString().substring(11,13))
      val tmpDataFrame = dataFrame.filter("kk_id = ‘"+ newDestinationDataFrame(i).toString().substring(1,11)
                         +"‘ and direct = ‘"+newDestinationDataFrame(i).toString().substring(11,13)+"‘")
                        .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100)
        .rdd.map(row=>row.getAs[Double]("cnt"))
      //tmpDataFrame.foreach(row => println(row))
      var correlationPearson: Double = Statistics.corr(newDataFrame,tmpDataFrame)//计算不同数据之间的相关系数:皮尔逊
      println("\ncorrelationPearson:" + correlationPearson) //打印结果
    }
    println("11111")


  sc.stop()
  }
}

实现代码如上,因为Statistics.corr(RDD[Double],RDD[Double]),所以SparkSQL读取后的数据生成的dataFrame必须转换,第一步是转换成RDD[Row],Row就相当于sql查询出来的一条数据,这里也转换过多次才成功,最后百度得到可以先.cast(DoubleType)的形式。问题自己接触的少,要先看本质,然后看API,然后看案例就快了。

很明显可以从问题的描述上看是组之间的元素个数对应不上,但我已经被Row=>Double转晕了头,没有静心思考琢磨,没有专注仔细的自我对话,导致自己盲目的修改代码,还依然从转换问题上改变,后来转念一想才醒悟,以此警戒自己。limit

使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....