首页 > 代码库 > 自定义spark GraphX中的collectNeighborIds方法
自定义spark GraphX中的collectNeighborIds方法
/**
* 自定义收集VertexId的neighborIds
* @author TongXueQiang
*/
def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = {
val nbrs = graph.mapReduceTriplets[Array[VertexId]](
//map函数
edgeTriplets => {
val msgTosrc = http://www.mamicode.com/(edgeTriplets.srcId,Array(edgeTriplets.dstId));
val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId));
edgeDirection match {
case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst)
case EdgeDirection.Out => Iterator(msgTosrc)
case EdgeDirection.In => Iterator(msgTodst)
case EdgeDirection.Both => throw new SparkException("It doesn‘t make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)")
}
},_ ++ _)//reduce函数
nbrs
}
测试:
object Test {
System.setProperty("hadoop.home.dir","D://hadoop-2.6.2");
val conf = new SparkConf().setMaster("local").setAppName("SparkGraph");
val sc = new SparkContext(conf);
def main(args:Array[String]):Unit = {
val graph = GraphGenerators.logNormalGraph(sc,numVertices = 100).map((id,_) => id.toDouble);
collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;});
}
}
自定义spark GraphX中的collectNeighborIds方法
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。