首页 > 代码库 > spark ml 的例子
spark ml 的例子
一、关于spark ml pipeline与机器学习
一个典型的机器学习构建包含若干个过程
1、源数据ETL
2、数据预处理
3、特征选取
4、模型训练与验证
以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果。因此,对以上多个步骤、进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效、易用。
受 scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习问题的弊端(主要为工作繁杂,流程不清晰),旨在向用户提供基于DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用。一个pipeline 在结构上会包含一个或多个Stage,每一个 Stage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的Stage 在 ML 里按照处理问题类型的不同都有相应的定义和实现。两个主要的stage为Transformer和Estimator。Transformer主要是用来操作一个DataFrame 数据并生成另外一个DataFrame 数据,比如svm模型、一个特征提取工具,都可以抽象为一个Transformer。Estimator 则主要是用来做模型拟合用的,用来生成一个Transformer。可能这样说比较难以理解,下面就以一个完整的机器学习案例来说明spark ml pipeline是怎么构建机器学习工作流的。
二、使用spark ml pipeline构建机器学习工作流
在此以Kaggle数据竞赛Display Advertising Challenge的数据集(该数据集为利用用户特征进行广告点击预测)开始,利用spark ml pipeline构建一个完整的机器学习工作流程。
Display Advertising Challenge的这份数据本身就不多做介绍了,主要包括3部分,numerical型特征集、Categorical类型特征集、类标签。
首先,读入样本集,并将样本集划分为训练集与测试集:
[java] view plain copy
print?
- //使用file标记文件路径,允许spark读取本地文件
- String fileReadPath = "file:\\D:\\dac_sample\\dac_sample.txt";
- //使用textFile读入数据
- SparkContext sc = Contexts.sparkContext;
- RDD<String> file = sc.textFile(fileReadPath,1);
- JavaRDD<String> sparkContent = file.toJavaRDD();
- JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
- public Row call(String string) {
- String tempStr = string.replace("\t",",");
- String[] features = tempStr.split(",");
- int intLable= Integer.parseInt(features[0]);
- String intFeature1 = features[1];
- String intFeature2 = features[2]; String CatFeature1 = features[14];
- String CatFeature2 = features[15];
- return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
- }
- });
- double[] weights = {0.8, 0.2};
- Long seed = 42L;
- JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);
[java] view plain copy
print?
- List<StructField> fields = new ArrayList<StructField>();
- fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
- fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
- //and so on
- StructType schema = DataTypes.createStructType(fields);
- DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//训练数据
- dfTrain.registerTempTable("tmpTable1");
- DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//测试数据
- dfTest.registerTempTable("tmpTable2");
首先,将intFeature由string转为double,cast()方法将表中指定列string类型转换为double类型,并生成新列并命名为intFeature1Temp,
之后,需要删除原来的数据列 并将新列重命名为intFeature1,这样,就将string类型的特征转换得到double类型的特征了。
[java] view plain copy
print?
- //Cast integer features from String to Double
- dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
- dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");
[java] view plain copy
print?
- /*特征转换,部分特征需要进行分箱,比如年龄,进行分段成成年未成年等 */
- double[] splitV = {0.0,16.0,Double.MAX_VALUE};
- Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);
缺失值处理方面,可以使用全局的NA来统一标记缺失值:
[java] view plain copy
print?
- /*将categoricalb类型的变量的缺失值使用NA值填充*/
- String[] strCols = {"CatFeature1","CatFeature2"};
- dfTrain = dfTrain.na().fill("NA",strCols);
- dfTest = dfTest.na().fill("NA",strCols);
这一任务:
[java] view plain copy
print?
- // StringIndexer oneHotEncoder 将 categorical变量转换为 numerical 变量
- // 如某列特征为星期几、天气等等特征,则转换为七个0-1特征
- StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
- OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
- StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
- OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");
可以使用VectorAssembler类完成这一任务:
[java] view plain copy
print?
- /*转换为特征向量*/
- String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
- VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");
利用特征与类标签之间的相关性,进行特征选取:
[java] view plain copy
print?
- /*特征较多时,使用卡方检验进行特征选择,主要是考察特征与类标签的相关性*/
- ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
- .setOutputCol("selectedFeature");
[java] view plain copy
print?
- /* 设置最大迭代次数和正则化参数 setElasticNetParam=0.0 为L2正则化 setElasticNetParam=1.0为L1正则化*/
- /*设置特征向量的列名,标签的列名*/
- LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
- .setFeaturesCol("selectedFeature").setLabelCol("lable");
[java] view plain copy
print?
- /*将特征转换,特征聚合,模型等组成一个管道,并调用它的fit方法拟合出模型*/
- PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
- Pipeline pipline = new Pipeline().setStages(pipelineStage);
- PipelineModel pModle = pipline.fit(dfTrain);
[java] view plain copy
print?
- //拟合得到模型的transform方法进行预测
- DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
- DataFrame prediction = output.select("label", "prediction");
- prediction.show();
[java] view plain copy
print?
- /*测试集合上的准确率*/
- long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("‘prediction"))).count();
- long total = prediction.count();
- double accuracy = correct / (double)total;
- System.out.println(accuracy);
[java] view plain copy
print?
- String pModlePath = ""file:\\D:\\dac_sample\\";
- pModle.save(pModlePath);
三,梳理和总结:
上述,借助代码实现了基于spark ml pipeline的机器学习,包括数据转换、特征生成、特征选取、模型定义及模型学习等多个stage,得到的pipeline
模型后,就可以在新的数据集上进行预测,总结为两部分并用流程图表示如下:
训练阶段:
预测阶段:
借助于Pepeline,在spark上进行机器学习的数据流向更加清晰,同时每一stage的任务也更加明了,因此,无论是在模型的预测使用上、还是模型后续的改进优化上,都变得更加容易。
spark ml 的例子
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。