首页 > 代码库 > (3)RDD编程
(3)RDD编程
1.RDD基础
弹性分布式数据集,简称RDD,是一个不可变的分布式对象集合。在Spark中,对数据的所有操作不外乎创建RDD,转化已有RDD以及调用RDD操作进行求值。
每一个RDD都被分为多个分区,这些分区运行在集群中的不同节点上,RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义对象。
用户可以使用两种方法创建RDD:读取一个外部数据集,或者在驱动程序里分发驱动器程序中的对象集合。
创建出来以后,RDD支持两种类型的操作:转化操作和行动操作。转化操作和行动操作的区别在于Spark计算RDD的方式不同,虽然你可以在任何时候定义新的RDD,但是Spark只会惰性计算这些RDD,他们只有在第一次在一个行动操作中用到时,才会真正计算。
转化操作返回的是RDD,而行动操作返回的是其他的数据类型。
在实际操作中,你会经常用到persist来把数据的一部分读取到内存中,并反复查询这部分数据。
pythonlines.persist()
总的来说,每个Spark程序或shell挥霍都按如下方式工作:
1.从外部数据创建出输入RDD;
2.使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD;
3.告诉Spark对需要被重用的中间结果RDD执行persist操作;
4.使用行动操作来触发一次并行计算,Spark会对计算进行优化后再执行
2. 创建RDD
把程序中一个已有的集合传给SparkContext的parallelize方法。
lines = sc.parallelize(["pandas", "i like pandas"])
或者从外部创建RDD:
lines = sc.textFile("/path/to/README.md")
3. RDD操作
3.1 转化操作
RDD的转化操作是返回新的RDD的操作,转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算。
filter转化操作:
1 inputRDD = sc.textFile("log.txt") 2 errorsRDD = inputRDD.filter(lambda x: "error" in x)
filter操作不会改变已有的inputRDD中的数据,实际上,该操作会返回一个全新的RDD,inputRDD在后面的程序中还可以继续使用。
还有另一个转化操作union来打印出包含error或warning的行数:
1 errorsRDD = inputRDD.filter(lambda x: "error" in x) 2 warningRDD = inputRDD.filter(lambda x: "warning" in x) 3 badlinesRDD = errorsRDD.union(warningRDD)
由此可见,转化操作对于RDD数量是没有限制的。
3.2 行动操作
我们用count来返回计数结果,用take来收集RDD中的一些元素:
1 print "Input had" + badlinesRDD.count() + "concerning lines" 2 print "Here are 10 examples:" 3 for line in badLinesRDD.take(10): 4 print line
我们也可以用collect函数来获取整个RDD中的数据,但是这个函数一般不能用在大规模的数据上。
3.3 惰性求值
惰性求值意味着RDD在被调用行动操作之前Spark不会开始计算。当我们对RDD进行转化操作时,操作不会立即执行,想法Spark会在内部记录下所有要求执行的操作的相关信息。
3.4 向Spark传递数据(Python)
传递较短的函数时,可以使用lambda表达式来传递,
word = rdd.filter(lambda s: "error" in s) def containsError(s): return "error" in s word = rdd.filter(containsError)
3.5 常见的转化操作和行动操作
3.5.1针对各个元素的转化操作
转化操作map接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素的值。map的返回值类型不需要和输入类型一样。
nums = sc.parallelize([1,2,3,4]) squared = nums.map(lambda x: x*x).collcet() for num in squared: print "%i" % (num)
如果我们希望对每一个输入元素生成多个输出元素,实现该功能的操作叫做flatMap。
lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() #返回"hello"
flatMap返回的是由各个列表中的元素组成的RDD,而不是一个由列表组成的RDD。
3.5.2伪集合操作
一个数据对{1,2,3,4}的RDD进行基本的RDD转化操作
map() #将函数应用于RDD中的每个元素,将返回值构成新的RDD flatMap() #将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD filter() #返回一个由通过传给filter的函数的元素组成的RDD distinct() #去重 sample() #对RDD采样,以及是否替换 union() #生成一个包含两个RDD钟所有元素的RDD intersection() #求两个RDD共同的元素的RDD subtract() #移除一个RDD中的内容 cartestian() #与另一个RDD的笛卡尔积
3.5.3行动操作
reduce函数的作用是接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据病返回一个同样类型的新元素。
sum = rdd.reduce(lambda x,y: x+y)
3.6持久化(缓存)
RDD是惰性求值的,当我们希望能够多次使用同一个RDD的时候,如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外的大,因为迭代算法常常会多次使用同一组数据。为了避免这种多次计算,可以让Spark对数据进行持久化。
当我们让Spark持久化储存一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。出于不同的目的,我们可以为RDD选择不同的持久化的级别。在Python中,我们会始终序列化要持久化储存的数据,所以持久化级别默认值就是以序列化后的对象在JVM维空间中
(3)RDD编程