首页 > 代码库 > (3)RDD编程

(3)RDD编程

1.RDD基础

弹性分布式数据集,简称RDD,是一个不可变的分布式对象集合。在Spark中,对数据的所有操作不外乎创建RDD,转化已有RDD以及调用RDD操作进行求值。

每一个RDD都被分为多个分区,这些分区运行在集群中的不同节点上,RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义对象。

用户可以使用两种方法创建RDD:读取一个外部数据集,或者在驱动程序里分发驱动器程序中的对象集合。

创建出来以后,RDD支持两种类型的操作:转化操作和行动操作。转化操作和行动操作的区别在于Spark计算RDD的方式不同,虽然你可以在任何时候定义新的RDD,但是Spark只会惰性计算这些RDD,他们只有在第一次在一个行动操作中用到时,才会真正计算。

转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

在实际操作中,你会经常用到persist来把数据的一部分读取到内存中,并反复查询这部分数据。

pythonlines.persist()

总的来说,每个Spark程序或shell挥霍都按如下方式工作:

1.从外部数据创建出输入RDD;

2.使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD;

3.告诉Spark对需要被重用的中间结果RDD执行persist操作;

4.使用行动操作来触发一次并行计算,Spark会对计算进行优化后再执行

 

2. 创建RDD

把程序中一个已有的集合传给SparkContext的parallelize方法。

lines = sc.parallelize(["pandas", "i like pandas"])

或者从外部创建RDD:

lines = sc.textFile("/path/to/README.md")

 

3. RDD操作

3.1 转化操作

RDD的转化操作是返回新的RDD的操作,转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算。

filter转化操作:

1 inputRDD = sc.textFile("log.txt")
2 errorsRDD = inputRDD.filter(lambda x: "error" in x)

filter操作不会改变已有的inputRDD中的数据,实际上,该操作会返回一个全新的RDD,inputRDD在后面的程序中还可以继续使用。

还有另一个转化操作union来打印出包含error或warning的行数:

1 errorsRDD = inputRDD.filter(lambda x: "error" in x)
2 warningRDD = inputRDD.filter(lambda x: "warning" in x)
3 badlinesRDD = errorsRDD.union(warningRDD)

由此可见,转化操作对于RDD数量是没有限制的。

3.2 行动操作

我们用count来返回计数结果,用take来收集RDD中的一些元素:

1 print "Input had" + badlinesRDD.count() + "concerning lines"
2 print "Here are 10 examples:"
3 for line in badLinesRDD.take(10):
4     print line

我们也可以用collect函数来获取整个RDD中的数据,但是这个函数一般不能用在大规模的数据上。

3.3 惰性求值

惰性求值意味着RDD在被调用行动操作之前Spark不会开始计算。当我们对RDD进行转化操作时,操作不会立即执行,想法Spark会在内部记录下所有要求执行的操作的相关信息。

3.4 向Spark传递数据(Python)

传递较短的函数时,可以使用lambda表达式来传递,

word = rdd.filter(lambda s: "error" in s)
def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

3.5 常见的转化操作和行动操作

3.5.1针对各个元素的转化操作

转化操作map接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素的值。map的返回值类型不需要和输入类型一样。

nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x*x).collcet()
for num in squared:
    print "%i" % (num)

如果我们希望对每一个输入元素生成多个输出元素,实现该功能的操作叫做flatMap。

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() #返回"hello"

flatMap返回的是由各个列表中的元素组成的RDD,而不是一个由列表组成的RDD。

3.5.2伪集合操作

一个数据对{1,2,3,4}的RDD进行基本的RDD转化操作

map()                #将函数应用于RDD中的每个元素,将返回值构成新的RDD
flatMap()            #将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
filter()             #返回一个由通过传给filter的函数的元素组成的RDD
distinct()           #去重
sample()             #对RDD采样,以及是否替换
union()              #生成一个包含两个RDD钟所有元素的RDD
intersection()       #求两个RDD共同的元素的RDD
subtract()           #移除一个RDD中的内容
cartestian()         #与另一个RDD的笛卡尔积

3.5.3行动操作

reduce函数的作用是接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据病返回一个同样类型的新元素。

sum = rdd.reduce(lambda x,y: x+y)

 

3.6持久化(缓存)

RDD是惰性求值的,当我们希望能够多次使用同一个RDD的时候,如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外的大,因为迭代算法常常会多次使用同一组数据。为了避免这种多次计算,可以让Spark对数据进行持久化。

当我们让Spark持久化储存一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。出于不同的目的,我们可以为RDD选择不同的持久化的级别。在Python中,我们会始终序列化要持久化储存的数据,所以持久化级别默认值就是以序列化后的对象在JVM维空间中

(3)RDD编程