首页 > 代码库 > 【转载】Spark学习 & 机器学习

【转载】Spark学习 & 机器学习

继续Spark学习,开始的文章:http://www.cnblogs.com/charlesblc/p/6106603.html

参考了这个系列的文章:

http://www.cnblogs.com/shishanyuan/p/4699644.html

《倾情大奉送--Spark入门实战系列》实验数据下载在上面那篇开始的文章有说明。

 

先看了上手实验的一部分,因为之前Spark已经安装好了,见 http://www.cnblogs.com/charlesblc/p/6014158.html

上手实验,参考:http://www.cnblogs.com/shishanyuan/p/4721102.html

启动Spark Shell

发现有的参数比如--executor-memory不能用,所以直接:

$ ./bin/spark-shell --master spark://10.117.146.12:7077 

得到以下输出:

技术分享

 

然后在 dashboard 也能够看到任务信息:

技术分享

3.3 转换与操作

3.3.1 并行化集合例子演示

在该例子中通过parallelize方法定义了一个从1~10的数据集,然后通过map(_*2)对数据集每个数乘以2,接着通过filter(_%3==0)过滤被3整除的数字,最后使用toDebugString显示RDD的LineAge,并通过collect计算出最终的结果。

val num=sc.parallelize(1 to 10)

val doublenum = num.map(_*2)

val threenum = doublenum.filter(_ % 3 == 0)

threenum.toDebugString

threenum.collect

 

在下图运行结果图中,我们可以看到RDD的LineAge演变,通过paralelize方法建立了一个ParalleCollectionRDD,使用map()方法后该RDD为MappedRDD,接着使用filter()方法后转变为FilteredRDD。

技术分享

 

但是我的spark-shell运行失败,不能成功。我觉得是我机器的shell版本太低导致的。 

因为用下面的命令是成功的:

bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999

 

所以决定用scala打包成jar来运行。需要再Intellij里面使用scala插件。在欢迎页面的configure里面选择plugin, 点击左下角的"Install Jetbrain plugin",然后在搜索框输入scala,能够看到scala插件。

技术分享

但是有时候网络不好,或者fuck GFW的原因。需要先下下来,然后点最右边的 "Install plugin from disk"... Fuck GFW.

这时候,注意版本一定要合适,可以参考看到的scala插件图里面的版本。

技术分享

 

安装好之后,新建项目的地方,就有Scala的选项了:

技术分享

 

SBT应该是Scala的一个构建工具。 我们建项目的时候,先不选择SBT吧。 

建好之后,出来了项目目录结构:

技术分享

然后建一个package: com.spark.my,然后建一个文件(因为没找到scala class)Hello.scala。Intellij自动检测出没有配置Scala库。这时候,就要下一个scala的库。http://www.scala-lang.org/download/ 这里下载了 scala-2.12.0.tgz,解压到 Data/Work/Installed里面。然后再Intellij的项目里面,配置scala类库到刚刚解压目录里面的lib目录即可。

 

然后再Hello.scala里面写代码:

package com.spark.my

object Hello {
  def main(args: Array[String]): Unit = {
    println("hello world")
  }

}

运行,会稍微慢一点,出结果:

hello world

Process finished with exit code 0

 

然后改成Spark里面要用到的。需要先导入jar包。

File->Project Structure->Libraries->点绿色‘+‘->java->找到spark-assembly-1.0.0-hadoop2.2.0.jar->OK

按照类似的方法导入scala-compiler.jar,  scala-library.jar, scala-reflect.jar  这些位于scala的安装目录下的lib目录。

 

但是发现Spark 2以上,不再包含 spark-assembly 这个jar了。也不知道依赖关系会多复杂,所以还是用maven来处理吧。

新建了一个maven项目,把刚刚的代码拷贝了一遍,是可以的。

另外,发现scala要从object入口,从class入口是不行的。

 

其中pom.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.spark.my</groupId>
    <artifactId>scala-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.1</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

</project>

目录结构:

技术分享

 

Count.scala内容如下:

package com.spark.my

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by baidu on 16/11/28.
  */
object Count {
  def main(args: Array[String]): Unit = {
    //println("hello");
    val conf = new SparkConf()

    val sc = new SparkContext(conf)

    val num=sc.parallelize(1 to 10)

    val doublenum = num.map(_*2)

    val threenum = doublenum.filter(_ % 3 == 0)

    threenum.toDebugString

    threenum.collect
  }
}

配置artifacts

File->Project Structure->Artifacts->点绿色‘+‘->jar->From modules ...->在Main Classes中输入HdfsWC->OK

如下图所示将Extracted xxxx选中,点红色‘-‘,将这些移除->OK

 

然后发现报错,找不到xxx类。所以只好把所有的jar都打包进去(同时确保了jar包的版本都正确)。打包好是一个90多M的jar包,拷贝到机器上。

同时加上一个打印语句

println("threenum: %s".format(threenum.toDebugString))

 加在了 threenum.collect 前面。看看效果。

结果:可以打印:

threenum: (2) MapPartitionsRDD[2] at filter at Count.scala:19 []
 |  MapPartitionsRDD[1] at map at Count.scala:17 []
 |  ParallelCollectionRDD[0] at parallelize at Count.scala:15 []

 

然后再加对输出数组的打印:

threenum.collect

    println("threenum size=" + threenum.count())
    for (elem <- threenum) {
      print(elem + ",")
    }
    println("all elements done.")

输出结果:

16/11/28 03:51:21 INFO scheduler.DAGScheduler: Job 1 finished: count at Count.scala:25, took 0.038559 s
threenum size=3
result:6
result:12
result:18
all elements done.
16/11/28 03:51:21 INFO spark.SparkContext: Invoking stop() from shutdown hook

 

以下语句和collect一样,都会触发作业运行

num.reduce (_ + _)

num.take(5)

num.first

num.count

num.take(5).foreach(println)

 

另外,如果要支持 java 和 scala混合build,可以看看这篇文章:

http://www.cnblogs.com/yjmyzz/p/4694219.html

Scala和Java实现Word Count,参考:

http://blog.csdn.net/bluejoe2000/article/details/41556979

要看spark sql怎么写,参考:

http://blog.csdn.net/kinger0/article/details/46562473

 

 

然后看的是机器学习这一块,因为偏理论,可以先看完。其他的实践,再看。

http://www.cnblogs.com/shishanyuan/p/4747761.html

 

【转载】Spark学习 & 机器学习