首页 > 代码库 > scala spark 机器学习初探

scala spark 机器学习初探

Transformer: 是一个抽象类包含特征转换器, 和最终的学习模型, 需要实现transformer方法 通常transformer为一个RDD增加若干列, 最终转化成另一个RDD, 1. 特征转换器通常处理一个dataset, 把其中一列数据转化成一列新的数据。 并且把新的数据列添加到dataset后面,产生一个新的dataset输出。 2. 一个学习模型转换器用来处理一个数据集, 读取包含特征向量的那一列数据, 为每一个特征向量预测一个结果标签, 把预测结果标签作为一个新的数据列里添加到数据集合后面, 并且输出结果。

Estimator: 机器学习算法抽象类, 需要实现fit()方法, fit方法会处理一个RDD, 产生一个Transformer。 例如LogistricRegression是一个Estimator, 调用fit方法训练出来一个logistricRegressionModel对象, 这个是一个Transformer. Transformer和Estimator都是无状态的。 每个实例都有一个唯一ID 流水线作业 机器学习领域, 普遍采用一组算法来处理和学习数据, 例如一个简单的文本文档处理过程包括以下几个步骤 分词 把文本词转换成数字特征向量 用特征向量和标签训练一个模型出来。

Spark ML可以用PipeLine表示这些过程。PipeLine是工程性质的东西, 感觉类似于工厂模式,可以把整个过程, 还有每个步骤上的transformer, estimator拼装起来。

在spark-shell命令行里直接输入下面的代码, 就可以执行了。StringIndexer可以把一个属性列里的值映射成数值类型。但是逻辑回归分类器默认数据数据是连续的,并且是有序的, 所以StringIndexer生成的数字, 还需要进一步处理。  这里用OneHotEncoder,独热编码即 One-Hot 编码,又称一位有效编码,其方法是使用N位状态寄存器来对N个状态进行编码,每个状态都由他独立的寄存器位,并且在任意时候,其中只有一位有效。

可以这样理解,对于每一个特征,如果它有m个可能值,那么经过独热编码后,就变成了m个二元特征。并且,这些特征互斥,每次只有一个激活。因此,数据会变成稀疏的。

这样做的好处主要有:

  1. 解决了分类器不好处理属性数据的问题

  2. 在一定程度上也起到了扩充特征的作用

 

import org.apache.spark.ml.feature._

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.mllib.linalg.{Vector, Vectors}

import org.apache.spark.mllib.regression.LabeledPoint

 

val df=    sqlContext.createDataFrame(Seq(

      (0, "a"),

      (1, "b"),

      (2, "c"),

      (3, "a"),

      (4, "a"),

      (5, "c"),

      (6, "d"))).toDF("id", "category")

    val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").fit(df)

    val indexed = indexer.transform(df)

    indexed.select("category", "categoryIndex").show()

    val encoder = new OneHotEncoder().setInputCol("categoryIndex").setOutputCol("categoryVec")

    val encoded = encoder.transform(indexed)

    val data = http://www.mamicode.com/encoded.rdd.map { x =>

      {

        val featureVector = Vectors.dense(x.getAs[org.apache.spark.mllib.linalg.SparseVector]("categoryVec").toArray)

        val label = x.getAs[java.lang.Integer]("id").toDouble

        LabeledPoint(label, featureVector)

      }

    } 

var result = sqlContext.createDataFrame(data)

scala> result.show()

+-----+-------------+

|label|     features|

+-----+-------------+

|  0.0|[1.0,0.0,0.0]|

|  1.0|[0.0,0.0,1.0]|

|  2.0|[0.0,1.0,0.0]|

|  3.0|[1.0,0.0,0.0]|

|  4.0|[1.0,0.0,0.0]|

|  5.0|[0.0,1.0,0.0]|

|  6.0|[0.0,0.0,0.0]|

+-----+-------------+

整个features列就变成了一个稀疏矩阵。

scala spark 机器学习初探