首页 > 代码库 > Spark 累加器实验
Spark 累加器实验
以下代码用 Pyspark + IPython 完成
统计日志空行的数量:
读取日志,创建RDD:
myrdd = sc.textFile("access.log")
不使用累加器:
In [68]: s = 0 In [69]: def f(x): ...: global s ...: if len(x) == 0: ...: s += 1 ...: In [70]: myrdd.foreach(f) In [71]: print (s)
得出结果为:
0
原因是python 的变量,即使是全局变量不能应用在各个计算进程(线程)中同步数据,所以需要分布式计算框架的变量来同步数据,Spark 中采用累加器来解决:
使用累加器
In [64]: s = sc.accumulator(0) In [65]: def f(x): ...: global s ...: if len(x) == 0: ...: s += 1 ...: In [66]: myrdd.foreach(f) In [67]: print (s)
得出正确结果:
14
本文出自 “恩墨大数据学院-孟硕” 博客,请务必保留此出处http://enmobda.blog.51cto.com/11983635/1940586
Spark 累加器实验
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。