首页 > 代码库 > <Spark><Advanced Programming>
<Spark><Advanced Programming>
Introduction
- 介绍两种共享变量的方式:
- accumulators:聚集信息
- broadcast variables:高效地分布large values
- 介绍对高setup costs任务的批操作,比如查询数据库时连接数据的消耗。
Accumulators
- 当我们向Spark传送函数时(比如map()函数或给filter()的condition),他们可以使用driver program中在他们定义之外的变量。但是cluster中的每个task都get a new copy of each variable,并且更新那些副本而不会传播到driver。
- Spark的共享变量--accumulators和broadcast variables,通过两种通信模式(聚集结果和广播)放松了这种限制。
- Accumulators提供了一种简单的语法,用于从worker聚集values返回到driver program。
- Accumulators的一个最广泛的用例就是统计在job执行期间发生的events数,用于debugging。
-
val sc = new SparkContext(...) val file = sc.textFile("file.txt") val blankLines = sc.accumulator(0) // create an Accumulator[Int] initialized to 0 val callSigns = file.flatMap(line => { if (line == ""){ blankLines += 1 // Add to the accumulator } line.split(" ") })
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)注意只有在调用saveAsTextFile之后才能得到正确的count of blankLines,因为transfomation是lazy的。
- Summary of accumulators:
- 我们提供调用SparkContext.accumulator(initialValue)方法创建一个accumulator,它会返回一个org.apache.spark.Accumulator[T]对象,T是initialValue的类型;
- Spark闭包中的worker code可以通过 += 来add accumulator;
- driver program可以call accumulator的value property来访问accumulator的值。
- 注意worker node不可以访问accumulator的value --> 因为从tasks的角度来看,accumulators是write-only的变量。这样的设定使得accumulators可以被高效地实现,而不需要每次更新的时候都通信。
Accumulators and Fault Tolerance
- Spark通过re-executikng failed or slow tasks来处理failed or slow machines。
- 那么错误与Accumulators之间呢? --> 对于在actions中使用的Accumulators,Spark仅仅对每个Accumulator执行一次每个task的更新。因此,如果我们想要一个绝对可靠的value counter,而不用考虑failures或者多次赋值,那么我们必须将操作放到类似foreach()这样的操作中。
- 对于在transformation中使用的Accumulators,这种保证是不存在的。在transformation中对Accumulators的更新可能执行多次。所以对transformation中的Accumulators最好只在debug时用。[for version1.2.0]
Custom Accumulators
- Spark支持Double、Long、Float、Int等类型的Accumulators。同时还提供了一个自定义Accumulators type的API,来实现自定义Accumulators types以及支持自定义聚集操作(比如找最大值而不是add)。
- 可支持的操作必须是commutative and associative的。[感觉就像是顺序对操作不重要就可以了]
- An operation op is commutative if a op b = b op a for all values a, b.
-
An operation op is associative if (a op b) op c = a op (b op c) for all values a, b, and c.
Broadcast Variables
- Spark支持的另一种共享变量的方式:broadcast variables。允许程序高效地发送大的,只读的value给所有worker nodes。
- 你可能会想到说Spark会自动地发送闭包中所引用的变量到worker node。这是方便的,但同时也是很不高效的。因为:
- 缺省的task launching 机制是对small task sizes优化的;
- 你可能在多个并行操作中使用相同的变量,但是Spark会分别为每个Operation发送一次。考虑下面的操作:
# Look up the locations of the call signs on the # RDD contactCounts. We load a list of call sign # prefixes to country code to support this lookup.
signPrefixes = loadCallSignTable() def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts .map(processSignCount) .reduceByKey((lambda x, y: x+ y)))
上面的代码中,如果signPrefixes是一个很大的table,那么将该表从master传到每个slave将是很昂贵的。而且如果后期还要用到signPrefixes,它将会被再次发送到每个节点。通过将signPrefixes变成broadcast变量可以解决这个问题。如下:
// Look up the countries for each call sign for the // contactCounts RDD. We load an array of call sign // prefixes to country code to support this lookup. val signPrefixes = sc.broadcast(loadCallSignTable()) val countryContactCounts = contactCounts.map{case (sign, count) => val country = lookupInArray(sign, signPrefixes.value) (country, count) }.reduceByKey((x, y) => x + y) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
- 总的来说,broadcast变量的使用分为以下几步:
- 创建一个Broadcast[T]的SparkContext.broadcast对象of type T。T可以是任何类型,只要它是Serializable的。
- 同value property访问该值
- 该变量只会被传送给每个node一次,而且被当做read-only来使用,也就是说更新不会propagated到其他nodes。(最简单的满足read-only方式的方法是broadcast a primitive value or a reference to an immutable object)。
- broadcast variable就是一个spark.broadcast.Broadcast[T] 类型的对象,它wraps一个类型T的值。我们可以在tasks中访问该值。该值只发送到每个节点一次,通过使用高效的BitTorrent-like communication mechanism。
Optimizing Broadcasts
- 当你broadcast a large values, it‘s important to choose a data serialization format that is both fast and compact。Scala缺省使用的JAVA Serialization库是很低效的(JAVA Serialization只对原生类型的数组高效)。因此你可以通过选择一个不同的序列化库(通过使用spark.serializer property)来优化。
Working on a Per-Partition Basis
<Spark><Advanced Programming>
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。