首页 > 代码库 > Spark开发指南

Spark开发指南

Spark开发指南

从高的层面来看,事实上每个Spark的应用,都是一个Driver类,通过执行用户定义的main函数。在集群上执行各种并发操作和计算

Spark提供的最基本的抽象,是一个弹性分布式数据集(RDD),它是一种特殊集合。能够分布在集群的节点上。以函数式编程操作集合的方式,进行各种各样的并发操作。它能够由hdfs上的一个文件创建而来,或者是Driver程序中,从一个已经存在的集合转换而来。

用户能够将数据集缓存在内存中,让它被有效的重用。进行并发操作。

最后,分布式数据集能够自己主动的从结点失败中恢复。再次进行计算。

Spark的第二个抽象。是并行计算中使用的共享变量。默认来说,当Spark并发执行一个函数时。它是以多个的task,在不同的结点上执行,它传递每个变量的一个拷贝,到每个独立task使用到的函数中。因此这些变量并不是共享的。然而有时候,我们须要在任务中可以被共享的变量。或者在任务与驱动程序之间共享。

Spark支持两种类型的共享变量:

     广播变量:      能够在内存的全部结点中被訪问,用于缓存变量(仅仅读)

     累加器:          仅仅能用来做加法的变量,比如计数和求和

本指南通过一些例子展示这些特征。

读者最好是熟悉Scala,尤其是闭包的语法。请留意。Spark能够通过Spark-Shell的解释器进行交互式执行。你可能会须要它。

接入Spark

为了写一个Spark的应用,你须要将Spark和它的依赖。增加到CLASSPATH中。

最简单的方法。就是执行sbt/sbt assembly来编译Spark和它的依赖,打到一个Jar里面core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后将它增加到你的CLASSPATH中。或者你能够选择将spark公布到maven的本地缓存中,使用sbt/sbt publish。

它将在组织org.spark-project下成为一个spark-core.

另外,你会须要导入一些Spark的类和隐式转换。 将以下几行增加到你程序的顶部

import spark.SparkContext

import SparkContext._

初始化Spark

写Spark程序须要做的第一件事情,就是创建一个SparkContext对象,它将告诉Spark怎样訪问一个集群。这个一般是通过以下的构造器来实现的:

new SparkContext(master, jobName, [sparkHome], [jars])

Master參数是一个字符串。指定了连接的Mesos集群。或者用特殊的字符串“local”来指明用local模式执行。如以下的描写叙述一般,JobName是你任务的名称。当在集群上执行的时候,将会在Mesos的Web UI监控界面显示。后面的两个參数,是用在将你的代码。部署到mesos集群上执行时使用的,后面会提到。

在Spark的解释器中。一个特殊的SparkContext变量已经为你创建。变量名字叫sc。创建你自己的SparkContext是不会生效的。

你能够通过设置MASTER环境变量。来让master连接到须要的上下文。

MASTER=local; ./spark-shell

Master的命名

Master的名字能够是下面3个格式中的一种

Master Name

Meaning

local

本地化执行Spark,使用一个Worker线程(没有并行)

local[K]

本地化执行Spark。使用K个Worker线程(依据机器的CPU核数设定)

HOST:PORT

将Spark连接到指定的Mesos Master,在集群上执行。Host參数是Mesos Master的Hostname。 port是master配置的port。默觉得5050.

注意:在早期的Mesos版本号(spark的old-mesos分支)。你必须使用master@HOST:PORT.

集群部署

假设你想你的任务执行在一个集群上,你须要指定2个可选參数:

  • SparkHome:Spark在集群机器上的安装路径(必须所有一致)
  • Jars:在本地机器上。包括了你任务的代码和依赖的Jars文件列表。 Spark会把它们部署到全部的集群结点上。

    你须要使用自己的编译系统将你的作业,打包成一套jars文件。比如,假设你使用sbt,那么sbt-assembly插件是一个好方法。将你的代码和依赖,变成一个单一的jar文件。

    假设有一些类库是公用的,须要在不同的作业间共享。你可能须要手工复制到mesos的结点上,在conf/spark-env中,通过设置SPARK_CLASSPATH环境变量指向它们。

    具体信息能够參考配置

    分布式数据集 

    Spark环绕的核心概念。是弹性分布式数据集(RDD),一个有容错机制,能够被并行操作的集合。眼下有两种类型的RDD: 并行集合(Parrallelized Collections),接收一个已经存在的Scala集合,在它上面执行各种并发计算; Hadoop数据集(Hadoop DataSets),在一个文件的每条记录上,执行各种函数。仅仅要文件系统是Hdfs,或者hadoop支持的随意存储系统。

    这两种RDD都能够通过同样的方式进行操作。

    并行集合

    并行集合是通过调用SparkContext的parallelize方法。在一个已经存在的Scala集合(仅仅要是seq对象就能够)上创建而来。集合的对象将会被拷贝来创建一个分布式数据集,能够被并行操作。以下通过spark解释器的样例。展示怎样从一个数组创建一个并发集合

    scala> val data = http://www.mamicode.com/Array(1, 2, 3, 4, 5)

    data: Array[Int] = Array(1, 2, 3, 4, 5)

    scala> val distData = http://www.mamicode.com/sc.parallelize(data)

    distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

    一旦被创建,分布数据集(distData)能够被并行操作。

    比如,我们能够调用distData.reduce(_ +_) 来将数组的元素相加。

    我们会在兴许的分布式数据集做进一步描写叙述。

    创建并行集合的一个重要參数,是slices的数目,它指定了将数据集切分为几份。

    在集群模式中,Spark将会在一份slice上起一个Task。典型的,你能够在集群中的每一个cpu上。起2-4个Slice (也就是每一个cpu分配2-4个Task)。一般来说,Spark会尝试依据集群的状况,来自己主动设定slices的数目。

    然而,你也能够手动的设置它,通过parallelize方法的第二个參数(比如:sc.parallelize(data, 10)).

    Hadoop数据集

    Spark能够创建分布式数据集,从不论什么存储在HDFS文件系统或者Hadoop支持的其他文件系统(包含本地文件,Amazon S3, Hypertable。 HBase等等)上的文件。 Spark能够支持Text File, SequenceFiles 及其他不论什么Hadoop输入格式

    文本文件的RDDs能够通过SparkContext的textFile方法创建,该方法接受文件的URI地址(或者机器上的文件本地路径,或者一个hdfs://, sdn://,kfs://,其他URI).这里是一个调用样例:

    scala> val distFile = sc.textFile(“data.txt”)

    distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

    一旦被创建。distFile能够进行数据集操作。

    比如。我们能够使用例如以下的map和reduce操作将全部行数的长度相加:

    distFile.map(_.size).reduce(_ + _ )

    方法也接受可选的第二參数。来控制文件的分片数目。默认来说,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),可是你能够通过传入一个更大的值来指定很多其它的分片。注意。你不能指定一个比块个数更少的片值(和hadoop中,Map数不能小于Block数一样)

        对于SequenceFiles。使用SparkContext的sequenceFile[K, V]方法,K和V是文件里的key和values类型。

    他们必须是Hadoop的Writable的子类。比如IntWritable和Text。另外,Spark同意你指定几种原生的通用Writable类型,比如:sequencFile[Int, String]会自己主动读取IntWritable和Texts

    最后,对于其它类型的Hadoop输入格式,你能够使用SparkContext.hadoopRDD方法,它能够接收随意类型的JobConf和输入格式类,键类型和值类型。

    依照对Hadoop作业一样的方法。来设置输入源就能够了。

    分布式数据集操作

    分布式数据集支持两种操作:

         转换(transformation):依据现有的数据集创建一个新的数据集

         动作(actions):在数据集上执行计算后,返回一个值给驱动程序

    比如,Map是一个转换。将数据集的每个元素。都经过一个函数进行计算后,返回一个新的分布式数据集作为结果。而还有一方面。Reduce是一个动作,将数据集的全部元素,用某个函数进行聚合,然后将终于结果返回驱动程序。而并行的reduceByKey还是返回一个分布式数据集

    全部Spark中的转换都是惰性的。也就是说。并不会立即发生计算。

    相反的,它仅仅是记住应用到基础数据集上的这些转换(Transformation)。而这些转换(Transformation),仅仅会在有一个动作(Action)发生。要求返回结果给驱动应用时,才真正进行计算。这个设计让Spark更加有效率的执行。比如,我们能够实现。通过map创建一个数据集。然后再用reduce,而仅仅返回reduce的结果给driver。而不是整个大的数据集。

    spark提供的一个重要转换操作是Caching。当你cache一个分布式数据集时,每一个节点会存储该数据集的全部片,并在内存中计算。并在其他操作中重用。这将会使得兴许的计算更加的高速(一般是10倍),缓存是spark中一个构造迭代算法的关键工具,也能够在解释器中交互使用。

    以下的表格列出眼下支持的转换和动作:

    转换(Transformations)

    Transformation

    Meaning

    map(func)

    返回一个新的分布式数据集,由每一个原元素经过func函数转换后组成

    filter(func)

    返回一个新的数据集,由经过func函数后返回值为true的原元素组成

    flatMap(func)

    类似于map,可是每个输入元素,会被映射为0到多个输出元素(因此。func函数的返回值是一个Seq。而不是单一元素)

    sample(withReplacement, frac, seed)

    依据给定的随机种子seed,随机抽样出数量为frac的数据

    union(otherDataset)

    返回一个新的数据集,由原数据集和參数联合而成

    groupByKey([numTasks])

    在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下。使用8个并行任务进行分组,你能够传入numTask可选參数。依据数据量设置不同数目的Task

    (groupByKey和filter结合,能够实现类似Hadoop中的Reduce功能)

    reduceByKey(func, [numTasks])

    在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key同样的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是能够通过第二个可选參数来配置的。

    join(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每一个key中的全部元素都在一起的数据集

    groupWith(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集。组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其他框架,称为CoGroup

    cartesian(otherDataset)

    笛卡尔积。

    但在数据集T和U上调用时。返回一个(T,U)对的数据集,全部元素交互进行笛卡尔积。

    sortByKey([ascendingOrder])

    在类型为( K, V )的数据集上调用,返回以K为键进行排序的(K,V)对数据集。升序或者降序由boolean型的ascendingOrder參数决定

    (类似于Hadoop的Map-Reduce中间阶段的Sort,按Key进行排序)

    Actions(动作)

    Action

    Meaning

    reduce(func)

    通过函数func聚集数据集中的全部元素。Func函数接受2个參数。返回一个值。这个函数必须是关联性的。确保能够被正确的并发运行

    collect()

    在Driver的程序中,以数组的形式。返回数据集的全部元素。这一般会在使用filter或者其他操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,非常可能会让Driver程序OOM

    count()

    返回数据集的元素个数

    take(n)

    返回一个数组。由数据集的前n个元素组成。

    注意,这个操作眼下并不是在多个节点上,并行运行,而是Driver程序所在机器,单机计算全部的元素

    (Gateway的内存压力会增大,须要慎重使用)

    first()

    返回数据集的第一个元素(类似于take(1))

    saveAsTextFile(path)

    将数据集的元素。以textfile的形式,保存到本地文件系统,hdfs或者不论什么其他hadoop支持的文件系统。Spark将会调用每一个元素的toString方法。并将它转换为文件里的一行文本

    saveAsSequenceFile(path)

    将数据集的元素,以sequencefile的格式,保存到指定的文件夹下。本地系统,hdfs或者不论什么其他hadoop支持的文件系统。RDD的元素必须由key-value对组成。并都实现了Hadoop的Writable接口,或隐式能够转换为Writable(Spark包含了基本类型的转换,比如Int,Double,String等等)

    foreach(func)

    在数据集的每个元素上。执行函数func。这通经常使用于更新一个累加器变量,或者和外部存储系统做交互

    缓存

        调用RDD的cache()方法,能够让它在第一次计算后,将结果保持存储在内存。数据集的不同部分。将会被存储在计算它的不同的集群节点上。让兴许的数据集使用更快。

    缓存是有容错功能的,假设任一分区的RDD数据丢失了,它会被使用原来创建它的转换,再计算一次(不须要所有又一次计算,仅仅计算丢失的分区)

    Shared Variables

    共享变量

    一般来说,当一个函数被传递给Spark操作(比如map和reduce)。一般是在集群结点上执行。在函数中使用到的全部变量,都做分别拷贝,供函数操作,而不会互相影响。这些变量会被复制到每一台机器,而在远程机器上。在对变量的全部更新。都不会被传播回Driver程序。然而,Spark提供两种有限的共享变量,供两种公用的使用模式:广播变量和累加器

    广播变量

    广播变量同意程序猿保留一个仅仅读的变量,缓存在每一台机器上。而非每一个任务保存一份拷贝。他们能够使用,比如,给每一个结点一个大的输入数据集。以一种高效的方式。Spark也会尝试,使用一种高效的广播算法。来降低沟通的损耗。

        广播变量是从变量V创建的,通过调用SparkContext.broadcast(v)方法。这个广播变量是一个v的分装器,它的仅仅能够通过调用value方法获得。

    例如以下的解释器模块展示了怎样应用:

        scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

    broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

    scala> broadcastVar.value

    res0: Array[Int] = Array(1, 2, 3)

    在广播变量被创建后,它能在集群执行的不论什么函数上,被代替v值进行调用。从而v值不须要被再次传递到这些结点上。另外。对象v不能在被广播后改动,是仅仅读的。从而保证全部结点的变量,收到的都是一模一样的。

    累加器

    累加器是仅仅能通过组合操作“加”起来的变量,能够高效的被并行支持。他们能够用来实现计数器(如同MapReduce中)和求和。

    Spark原生就支持Int和Double类型的计数器,程序猿能够加入新的类型。

    一个计数器。能够通过调用SparkContext.accumulator(V)方法来创建。

    执行在集群上的任务。能够使用+=来加值。然而,它们不能读取计数器的值。当Driver程序须要读取值的时候,它能够使用.value方法。

    例如以下的解释器。展示了怎样利用累加器,将一个数组里面的全部元素相加

    scala> val accum = sc.accumulator(0)

    accum: spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

    scala> accum.value

    res2: Int = 10

  • Spark开发指南