首页 > 代码库 > 【转载】Spark学习 & 机器学习
【转载】Spark学习 & 机器学习
继续Spark学习,开始的文章:http://www.cnblogs.com/charlesblc/p/6106603.html
参考了这个系列的文章:
http://www.cnblogs.com/shishanyuan/p/4699644.html
《倾情大奉送--Spark入门实战系列》实验数据下载在上面那篇开始的文章有说明。
先看了上手实验的一部分,因为之前Spark已经安装好了,见 http://www.cnblogs.com/charlesblc/p/6014158.html
上手实验,参考:http://www.cnblogs.com/shishanyuan/p/4721102.html
启动Spark Shell
发现有的参数比如--executor-memory不能用,所以直接:
$ ./bin/spark-shell --master spark://10.117.146.12:7077
得到以下输出:
然后在 dashboard 也能够看到任务信息:
3.3 转换与操作
3.3.1 并行化集合例子演示
在该例子中通过parallelize方法定义了一个从1~10的数据集,然后通过map(_*2)对数据集每个数乘以2,接着通过filter(_%3==0)过滤被3整除的数字,最后使用toDebugString显示RDD的LineAge,并通过collect计算出最终的结果。
val num=sc.parallelize(1 to 10) val doublenum = num.map(_*2) val threenum = doublenum.filter(_ % 3 == 0) threenum.toDebugString threenum.collect
在下图运行结果图中,我们可以看到RDD的LineAge演变,通过paralelize方法建立了一个ParalleCollectionRDD,使用map()方法后该RDD为MappedRDD,接着使用filter()方法后转变为FilteredRDD。
但是我的spark-shell运行失败,不能成功。我觉得是我机器的shell版本太低导致的。
因为用下面的命令是成功的:
bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
所以决定用scala打包成jar来运行。需要再Intellij里面使用scala插件。在欢迎页面的configure里面选择plugin, 点击左下角的"Install Jetbrain plugin",然后在搜索框输入scala,能够看到scala插件。
但是有时候网络不好,或者fuck GFW的原因。需要先下下来,然后点最右边的 "Install plugin from disk"... Fuck GFW.
这时候,注意版本一定要合适,可以参考看到的scala插件图里面的版本。
安装好之后,新建项目的地方,就有Scala的选项了:
SBT应该是Scala的一个构建工具。 我们建项目的时候,先不选择SBT吧。
建好之后,出来了项目目录结构:
然后建一个package: com.spark.my,然后建一个文件(因为没找到scala class)Hello.scala。Intellij自动检测出没有配置Scala库。这时候,就要下一个scala的库。http://www.scala-lang.org/download/ 这里下载了 scala-2.12.0.tgz,解压到 Data/Work/Installed里面。然后再Intellij的项目里面,配置scala类库到刚刚解压目录里面的lib目录即可。
然后再Hello.scala里面写代码:
package com.spark.my object Hello { def main(args: Array[String]): Unit = { println("hello world") } }
运行,会稍微慢一点,出结果:
hello world
Process finished with exit code 0
然后改成Spark里面要用到的。需要先导入jar包。
File->Project Structure->Libraries->点绿色‘+‘->java->找到spark-assembly-1.0.0-hadoop2.2.0.jar->OK
按照类似的方法导入scala-compiler.jar, scala-library.jar, scala-reflect.jar 这些位于scala的安装目录下的lib目录。
但是发现Spark 2以上,不再包含 spark-assembly 这个jar了。也不知道依赖关系会多复杂,所以还是用maven来处理吧。
新建了一个maven项目,把刚刚的代码拷贝了一遍,是可以的。
另外,发现scala要从object入口,从class入口是不行的。
其中pom.xml内容:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.spark.my</groupId> <artifactId>scala-demo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.1</version> </dependency> </dependencies> <repositories> <repository> <id>aliyunmaven</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> </repositories> </project>
目录结构:
Count.scala内容如下:
package com.spark.my import org.apache.spark.{SparkConf, SparkContext} /** * Created by baidu on 16/11/28. */ object Count { def main(args: Array[String]): Unit = { //println("hello"); val conf = new SparkConf() val sc = new SparkContext(conf) val num=sc.parallelize(1 to 10) val doublenum = num.map(_*2) val threenum = doublenum.filter(_ % 3 == 0) threenum.toDebugString threenum.collect } }
配置artifacts
File->Project Structure->Artifacts->点绿色‘+‘->jar->From modules ...->在Main Classes中输入HdfsWC->OK
如下图所示将Extracted xxxx选中,点红色‘-‘,将这些移除->OK
然后发现报错,找不到xxx类。所以只好把所有的jar都打包进去(同时确保了jar包的版本都正确)。打包好是一个90多M的jar包,拷贝到机器上。
同时加上一个打印语句
println("threenum: %s".format(threenum.toDebugString))
加在了 threenum.collect 前面。看看效果。
结果:可以打印:
threenum: (2) MapPartitionsRDD[2] at filter at Count.scala:19 [] | MapPartitionsRDD[1] at map at Count.scala:17 [] | ParallelCollectionRDD[0] at parallelize at Count.scala:15 []
然后再加对输出数组的打印:
threenum.collect println("threenum size=" + threenum.count()) for (elem <- threenum) { print(elem + ",") } println("all elements done.")
输出结果:
16/11/28 03:51:21 INFO scheduler.DAGScheduler: Job 1 finished: count at Count.scala:25, took 0.038559 s threenum size=3 result:6 result:12 result:18 all elements done. 16/11/28 03:51:21 INFO spark.SparkContext: Invoking stop() from shutdown hook
以下语句和collect一样,都会触发作业运行
num.reduce (_ + _)
num.take(5)
num.first
num.count
num.take(5).foreach(println)
另外,如果要支持 java 和 scala混合build,可以看看这篇文章:
http://www.cnblogs.com/yjmyzz/p/4694219.html
Scala和Java实现Word Count,参考:
http://blog.csdn.net/bluejoe2000/article/details/41556979
要看spark sql怎么写,参考:
http://blog.csdn.net/kinger0/article/details/46562473
然后看的是机器学习这一块,因为偏理论,可以先看完。其他的实践,再看。
http://www.cnblogs.com/shishanyuan/p/4747761.html
【转载】Spark学习 & 机器学习