首页 > 代码库 > (6)Spark编程进阶

(6)Spark编程进阶

6.1 简介

累加器:用来对信息进行聚合;

广播变量:用来高效分发较大的对象

 

6.2 累加器

通常在向Spark传递函数时,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark的两个共享变量,累加器和广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见的用途是在调试时对作业执行过程中的事件进行计数。

在Python中累加空行

file = sc.textFile(inputFile)
#创建累加器并初始化为0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines #访问全局变量
    if (line == " "):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" %blankLines.value

总结起来,累加器的用法如下所示:

  • 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]的对象,其中T是 初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值。
  • 驱动器程序可以调用累加器的value属性来访问累加器的值。

在Python中使用累加器进行错误报数

#创建用来验证呼号的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)

def validateSign(sign):
    global validSignCount, invalidSignCount
    if re.match(r".....", sign):
        validSignCount += 1
        return True
    else:
        invalidSignCount += 1
        return False

#对与每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign:(sign,1)).reduceByKey(lambda (x,y):x+y)

#强制求值计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
    contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
    print "Too many errors:%d in %d %(invalidSignCount.value, validSignCount.value)

 6.2.1 累加器与容错性

Spark会自动重新执行失败的或较缓慢的任务来应对有错误的或者比较慢的机器。

这种情况下,对于想要在行动操作中使用的累加器,Spark只会把每个任务对各累加器修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach这样的行动操作中。

6.2.2 自定义累加器

 

6.3 广播变量

广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。

在Python使用广播变量查询国家

#查询RDD contactCounts中的呼号的对应位置,将呼号前缀
#读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processesSignCount).reduceByKey((lambda x,y : x+y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

使用广播变量的过程:

  • 通过对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象。任何可序列化的类型都可以这么实现。
  • 通过value属性访问该对象的值。
  • 变量只会被发到各个节点一次,应作为只读值处理。

 

6.4 基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。Spark提供基于分区的map和foreach,让你的部分代码只对RDD的每个分区运行一次,这样可以帮助降低这些操作的代价。

在Python中使用共享连接池

def processCallSigns(signs):
    """使用连接池查询呼号"""
    #创建一个连接池
    http = urllib3.PoolManager()
    #与每条呼号记录相关联的URL
    urls = map(lambda x: "http://....." %x, signs)
    #创建请求(非阻塞)
    requests = map(lambda x: (x, http.request(GET, x)), urls)
    #获取结果
    result = map(lambda x:(x[0], json.loads(x[1].data)), requests)
    #删除空的结果并返回
    return filter(lambda x:x[1] is not None, result)
   
def fetchCallSigns(input):
    """获取呼号"""
    return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
contactsContactList = fetchCallSigns(validSigns)

 

6.5 与外部程序之间的管道

如果Java,Scala和Python都不能实现你需要的功能,那么Spark也为这种情况提供了一种通用机制,可以将数据通过管道传给用其他语言编写的程序,比如R语言脚本。

Spark在RDD上提供pipe方法,可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要他能读写Unix标准流就行。通过Pipe,你可以将RDD 猴子那个的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出,这个过程就是RDD的转化操作过程。

 

6.6 数值RDD的操作

Spark的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型,这些统计数据都会在调用stats时通过一次遍历数据计算出来,并以StatsCounter对象返回。

count          #RDD中元素的个数
mean           #元素的平均值
sum            #总和
max            #最大值
min            #最小值
variance       #元素的方差
sampleVariance #从采样中计算出的方差
stdev          #标准差
sampleStdev    #采样的标准差

用Python移除异常值

#要把String类型RDD转为数字数据,这样才能使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = stdts.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(lambda x: math.fabs(x-mean)<3*stddev)
print reasonableDistances.collect()

 

(6)Spark编程进阶