首页 > 代码库 > spark1.0.0 mllib机器学习库使用初探

spark1.0.0 mllib机器学习库使用初探

本文机器学习库使用的部分代码来源于spark1.0.0官方文档

mllib是spark对机器学习算法和应用的实现库,包括分类、回归、聚类、协同过滤、降维等,本文的主要内容为如何使用scala语言创建sbt工程实现机器学习算法,并进行本地和集群的运行。(初学者建议先在RDD交互式模式下按行输入代码,以熟悉scala架构)若想了解SBT等相关信息,可参见这里

1.SVM(linear support vector machine)

  • 新建SimpleSVM目录,在SimpleSVM目录下,创建如下的目录结构:

        

  • simple.sbt文件内容如下:
name := "SimpleSVM Project"version := "1.0"scalaVersion := "2.10.4"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.0.0"resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

PS:由于该应用需要调用mllib,因此要特别注意在libraryDependencies加入spark-mllib,否则会编译不通过的哦。

  • SimpleApp.scala文件内容如下:
import org.apache.spark.SparkContextimport org.apache.spark.mllib.classification.SVMWithSGDimport org.apache.spark.mllib.evaluation.BinaryClassificationMetricsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.util.MLUtilsimport org.apache.spark.SparkContext._import org.apache.spark.SparkConfobject SimpleApp{  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("SimpleSVM Application")    val sc = new SparkContext(conf)       val data = MLUtils.loadLibSVMFile(sc, "mllib/test50.txt")    val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)    val training = splits(0).cache()    val test = splits(1)        val numIterations = 100    val model = SVMWithSGD.train(training, numIterations)    model.clearThreshold()    val scoreAndLabels = test.map { point =>      val score = model.predict(point.features)      (score, point.label)    }    val metrics = new BinaryClassificationMetrics(scoreAndLabels)    val auROC = metrics.areaUnderROC()        println("Area under ROC = " + auROC)  }}

PS:由于我们之前在spark配置过程中将hadoop路径配置好了,因此这里的输入路径mllib/test50.txt

实际上为HDFS文件系统中的文件,存储位置与hadoop配置文件core-site.xml中的<name>相关(具体可参见这里,这个地方很容易出错)。因此需要先将test50.txt文件puthdfs上面,另外test50.txt文件为libsvm文件的输入格式,实例如下:

  • 编译:

     cd ~/SimpleSVM

  sbt package     #打包过程,时间可能会比较长,最后会出现[success]XXX

  PS:成功后会生成许多文件 target/scala-2.10/simplesvm-project_2.10-1.0.jar

  • 本地运行:

  spark-submit --class "SimpleApp" --master local target/scala-2.10/simplesvm-project_2.10-1.0.jar

  • 集群运行:

      spark-submit --class "SimpleApp" --master spark://master:7077 target/scala-2.10/simplesvm-project_2.10-1.0.jar

  • 结果:

PS:若希望在算法中添加正则项因子,可将SimpleApp.scala文件修改如下:

import org.apache.spark.mllib.optimization.L1Updaterval svmAlg = new SVMWithSGD()svmAlg.optimizer.  setNumIterations(200).  setRegParam(0.1).  setUpdater(new L1Updater)val modelL1 = svmAlg.run(training)

2.逻辑回归(Logistic Regression)

同理,若要实现逻辑回归算法则只需将SimpleApp.scala文件中的SVMWithSGD替换为 LogisticRegressionWithSGD

3. 协同过滤(Collaborative filtering)

文件系统如上所示,协同过滤算法可以将只需将SimpleApp.scala文件进行如下修改:

import org.apache.spark.mllib.recommendation.ALSimport org.apache.spark.mllib.recommendation.Ratingimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConfobject SimpleApp{  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("SimpleCF Application")    val sc = new SparkContext(conf)    val data = sc.textFile("mllib/test.data")    val ratings = data.map(_.split(,) match { case Array(user, item, rate) =>        Rating(user.toInt, item.toInt, rate.toDouble)       })        val rank = 10    val numIterations = 5    val model = ALS.train(ratings, rank, numIterations, 0.01)    val usersProducts = ratings.map { case Rating(user, product, rate) =>       (user, product)    }    val predictions =        model.predict(usersProducts).map { case Rating(user, product, rate) =>           ((user, product), rate)     }    val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>        ((user, product), rate)    }.join(predictions)    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>        val err = (r1 - r2)       err * err    }.mean()    println("Mean Squared Error = " + MSE)  }}

PS:同理,mllib/test.data存储于HDFS文件系统,为示例数据:

  • 本地运行:

  spark-submit --class "SimpleApp" --master local target/scala-2.10/simplecf-project_2.10-1.0.jar

  • 集群运行:

      spark-submit --class "SimpleApp" --master spark://master:7077 target/scala-2.10/simplecf-project_2.10-1.0.jar

  • 结果:

PS:可以加入alpha参数控制:

val alpha = 0.01val model = ALS.trainImplicit(ratings, rank, numIterations, alpha)
同理聚类算法、降维方法代码可参见这里

本文为原创博客,若转载请注明出处。