首页 > 代码库 > 使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....
使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....
package com.huawei.bigdata.spark.examples import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql.types.DoubleType import org.apache.spark.{SparkConf, SparkContext} /** * Created by wulei on 2017/8/3. */ object PointCorrPredict { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("PointCorrPredict") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("use vio_offsite") //360111010002,360102029001 val dataFrame = sqlContext.sql("select kk_id,direct,day,hour,cnt,speed from kk_hour_scale").orderBy("day","hour") val newDataFrame = dataFrame.filter("kk_id = ‘3601110100‘and direct = ‘02‘") .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100) .rdd.map(row=>row.getAs[Double]("cnt")) /*val dd = newDataFrame.collect().take(3) dd.foreach(println)*/ val destinationDataFrame = sqlContext.sql("select origin_kakou,destination_kakou from kk_relation ") val newDestinationDataFrame = destinationDataFrame.filter("origin_kakou = ‘360111010002‘").select("destination_kakou").collect() for (i <- 0 until newDestinationDataFrame.length){ println(newDestinationDataFrame(i)) println(newDestinationDataFrame(i).toString().substring(1,11)) println(newDestinationDataFrame(i).toString().substring(11,13)) val tmpDataFrame = dataFrame.filter("kk_id = ‘"+ newDestinationDataFrame(i).toString().substring(1,11) +"‘ and direct = ‘"+newDestinationDataFrame(i).toString().substring(11,13)+"‘") .orderBy(dataFrame("day").desc,dataFrame("hour").desc).select(dataFrame.col("cnt").cast(DoubleType)).limit(100) .rdd.map(row=>row.getAs[Double]("cnt")) //tmpDataFrame.foreach(row => println(row)) var correlationPearson: Double = Statistics.corr(newDataFrame,tmpDataFrame)//计算不同数据之间的相关系数:皮尔逊 println("\ncorrelationPearson:" + correlationPearson) //打印结果 } println("11111") sc.stop() } }
实现代码如上,因为Statistics.corr(RDD[Double],RDD[Double]),所以SparkSQL读取后的数据生成的dataFrame必须转换,第一步是转换成RDD[Row],Row就相当于sql查询出来的一条数据,这里也转换过多次才成功,最后百度得到可以先.cast(DoubleType)的形式。问题自己接触的少,要先看本质,然后看API,然后看案例就快了。
很明显可以从问题的描述上看是组之间的元素个数对应不上,但我已经被Row=>Double转晕了头,没有静心思考琢磨,没有专注仔细的自我对话,导致自己盲目的修改代码,还依然从转换问题上改变,后来转念一想才醒悟,以此警戒自己。limit
使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。