首页 > 代码库 > sparkSQL1.1入门之八:sparkSQL之综合应用
sparkSQL1.1入门之八:sparkSQL之综合应用
Spark之所以万人瞩目,除了内存计算,还有其ALL-IN-ONE的特性,实现了One stack rule them all。下面简单模拟了几个综合应用场景,不仅使用了sparkSQL,还使用了其他Spark组件:
最后使用R做示意图,用3种不同的颜色表示不同的类别。
- 店铺分类,根据销售额对店铺分类
- 货品调拨,根据货品的销售数量和店铺之间的距离进行货品调拨
前者将使用sparkSQL+MLlib的聚类算法,后者将使用sparkSQL+GraphX算法。本实验采用IntelliJ IDEA调试代码,最后生成doc.jar,然后使用spark-submit提交给集群运行。
1:店铺分类
分类在实际应用中非常普遍,比如对客户进行分类、对店铺进行分类等等,对不同类别采取不同的策略,可以有效的降低企业的营运成本、增加收入。机器学习中的聚类就是一种根据不同的特征数据,结合用户指定的类别数量,将数据分成几个类的方法。下面举个简单的例子,对第五小结中的hive数据,按照销售数量和销售金额这两个特征数据,进行聚类,分出3个等级的店铺。
在IDEA中建立一个object:SQLMLlib
package doc import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors object SQLMLlib { def main(args: Array[String]) { //屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //设置运行环境 val sparkConf = new SparkConf().setAppName("SQLMLlib") val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) //使用sparksql查出每个店的销售数量和金额 hiveContext.sql("use saledata") val sqldata = http://www.mamicode.com/hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tblStock a join tblstockdetail b on a.ordernumber=b.ordernumber group by a.locationid")>编译打包后运行:
运行过程,可以发现聚类过程都是使用200个partition:
运行完毕,使用getmerge将结果转到本地文件,并查看结果:
2:货品调拨
在商业活动中,如何将货品放在最需要的地点是一个永恒的命题。在Spark中,可以通过图计算来解决这样的问题:将销售点做为图的顶点,其属性可以是货品的销量、库存等特征;将调拨因素作为边,如距离、使用时间、调拨费用等。通过货品的轮询、调拨点的轮询来获取货品调拨的信息。下面给出一段使用sparksql和graphX综合使用的代码框架:
package doc //由于暂时手上缺少数据,本例只给出框架,以后有机会补上 import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD object SQLGraphX { def main(args: Array[String]) { //屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //设置运行环境 val sparkConf = new SparkConf().setAppName("SQLGraphX") val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) //切换到销售数据库 hiveContext.sql("use saledata") //使用sparksql查出店铺的销量和库存,作为图的顶点 //其中locationid为VertexID,(销量,库存)为VD,一般为(Int,Int)类型 val vertexdata = http://www.mamicode.com/hiveContext.sql("select a.locationid, b.saleQty, b.InvQty From a join b on a.col1=b.col2 where conditions")>
3:小结
通过上面的代码,可以看出,程序除了最后有磁盘落地外,都是在内存中计算的。避免了多个系统中交互数据的落地过程,提高了效率。这才是spark生态系统真正强大之处:One stack rule them all。另外sparkSQL+sparkStreaming可以架构当前非常热门的Lambda架构体系,为CEP提供解决方案。也正是如此强大,才吸引了广大开源爱好者的目光,促进了spark生态的告诉发展。
最近将在炼数成金开课Spark大数据快速计算平台(第三期),本资料为新课素材。本篇近几日再完善一下。
sparkSQL1.1入门之八:sparkSQL之综合应用
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。