Spark中的聚合算子

1. combineByKeyWithClassTag

最终实现上用到combineByKeyWithClassTag的算子有如下(RDD[(K, V)]):

算子\特点

备注

是否有map端合并

中间结果类型

输出

reduceByKey

V

(K, V)

groupByKey

CompactBuffer[V]

(K, Iterable[V])

combineByKey

C

(K, C)

foldByKey

带有初始化0值的reduceByKey

V

(K, V)

aggregateByKey

带有不同类型的初始化0值的foldByKey

U

(K, U)

1.1 combineByKeyWithClassTag的实现

def combineByKeyWithClassTag[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean = true,

serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {

require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0

if (keyClass.isArray) {

if (mapSideCombine) {

throw new SparkException("Cannot use map-side combining with array keys.")

}

if (partitioner.isInstanceOf[HashPartitioner]) {

throw new SparkException("HashPartitioner cannot partition array keys.")

}

}

val aggregator = new Aggregator[K, V, C](

self.context.clean(createCombiner),

self.context.clean(mergeValue),

self.context.clean(mergeCombiners))

if (self.partitioner == Some(partitioner)) {

self.mapPartitions(iter => {

val context = TaskContext.get()

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

}, preservesPartitioning = true)

} else {

new ShuffledRDD[K, V, C](self, partitioner)

.setSerializer(serializer)

.setAggregator(aggregator)

.setMapSideCombine(mapSideCombine)

}

}

从以上3个实现可以看出 createCombiner, mergeValue mergeCombiners 最终都用作初始化 aggregator

createCombiner

mergeValue

mergeCombiners

从上可以看出是否有shuffle 其实是通过 self.partitioner == Some(partitioner) 这个来判断的, 及该算子上要加的partition是否和RDD上的partition 相同

1.2 CompactBuffer

CompactBuffer 的功能类似ArrayBuffer, 在较小的数据集上有更好的的内存利用率

ArrayBuffer: 总是初始分配一个具有16个元素的数组, 当实际中的数据如果远小于16时, 会造成较大的空间浪费 (array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1)))

CompactBuffer: 默认只有2个元素, 如果groupBy下key对应的value较少, 则空间利用会更好

CompactBuffer 代码如下:

private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {

// First two elements

private var element0: T = _

private var element1: T = _

// Number of elements, including our two in the main object

private var curSize = 0

// Array for extra elements

private var otherElements: Array[T] = null

def apply(position: Int): T = {

if (position < 0 || position >= curSize) {

throw new IndexOutOfBoundsException

}

if (position == 0) {

element0

} else if (position == 1) {

element1

} else {

otherElements(position - 2)

}

}

private def update(position: Int, value: T): Unit = {

if (position < 0 || position >= curSize) {

throw new IndexOutOfBoundsException

}

if (position == 0) {

element0 = value

} else if (position == 1) {

element1 = value

} else {

otherElements(position - 2) = value

}

}

def += (value: T): CompactBuffer[T] = {

val newIndex = curSize

if (newIndex == 0) {

element0 = value

curSize = 1

} else if (newIndex == 1) {

element1 = value

curSize = 2

} else {

growToSize(curSize + 1)

otherElements(newIndex - 2) = value

}

this

}

def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {

values match {

// Optimize merging of CompactBuffers, used in cogroup and groupByKey

case compactBuf: CompactBuffer[T] =>

val oldSize = curSize

// Copy the other buffer's size and elements to local variables in case it is equal to us

val itsSize = compactBuf.curSize

val itsElements = compactBuf.otherElements

growToSize(curSize + itsSize)

if (itsSize == 1) {

this(oldSize) = compactBuf.element0

} else if (itsSize == 2) {

this(oldSize) = compactBuf.element0

this(oldSize + 1) = compactBuf.element1

} else if (itsSize > 2) {

this(oldSize) = compactBuf.element0

this(oldSize + 1) = compactBuf.element1

// At this point our size is also above 2, so just copy its array directly into ours.

// Note that since we added two elements above, the index in this.otherElements that we

// should copy to is oldSize.

System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)

}

case _ =>

values.foreach(e => this += e)

}

this

}

override def length: Int = curSize

override def size: Int = curSize

override def iterator: Iterator[T] = new Iterator[T] {

private var pos = 0

override def hasNext: Boolean = pos < curSize

override def next(): T = {

if (!hasNext) {

throw new NoSuchElementException

}

pos += 1

apply(pos - 1)

}

}

/** Increase our size to newSize and grow the backing array if needed. */

private def growToSize(newSize: Int): Unit = {

// since two fields are hold in element0 and element1, an array holds newSize - 2 elements

val newArraySize = newSize - 2

val arrayMax = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH

if (newSize < 0 || newArraySize > arrayMax) {

throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")

}

val capacity = if (otherElements != null) otherElements.length else 0

if (newArraySize > capacity) {

var newArrayLen = 8L

while (newArraySize > newArrayLen) {

newArrayLen *= 2

}

if (newArrayLen > arrayMax) {

newArrayLen = arrayMax

}

val newArray = new Array[T](newArrayLen.toInt)

if (otherElements != null) {

System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)

}

otherElements = newArray

}

curSize = newSize

}

}

1.3 Aggregator

Aggregator 具体实现如下, 可以看做是ExternalAppendOnlyMap操作的一个代理, ExternalAppendOnlyMap 会有spill数据的过程

case class Aggregator[K, V, C] (

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C) {

def combineValuesByKey(

iter: Iterator[_ <: Product2[K, V]],

context: TaskContext): Iterator[(K, C)] = {

val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)

combiners.insertAll(iter)

updateMetrics(context, combiners)

combiners.iterator

}

def combineCombinersByKey(

iter: Iterator[_ <: Product2[K, C]],

context: TaskContext): Iterator[(K, C)] = {

val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)

combiners.insertAll(iter)

updateMetrics(context, combiners)

combiners.iterator

}

/** Update task metrics after populating the external map. */

private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {

Option(context).foreach { c =>

c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)

c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)

c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)

}

}

}

2. treeAggregate

2.1 源码解读

def treeAggregate[U: ClassTag](zeroValue: U)(

seqOp: (U, T) => U,

combOp: (U, U) => U,

depth: Int = 2): U = withScope {

require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")

if (partitions.length == 0) {

Utils.clone(zeroValue, context.env.closureSerializer.newInstance())

} else {

val cleanSeqOp = context.clean(seqOp)

val cleanCombOp = context.clean(combOp)

val aggregatePartition =

(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))

var numPartitions = partiallyAggregated.partitions.length

val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)

// If creating an extra level doesn't help reduce

// the wall-clock time, we stop tree aggregation.

// Don't trigger TreeAggregation when it doesn't save wall-clock time

while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {

numPartitions /= scale

val curNumPartitions = numPartitions

partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {

(i, iter) => iter.map((i % curNumPartitions, _))

}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values

}

val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)

}

}

从上面的代码可以大概将计算分为3块:

分区数据和合并

分区间的合并 (foldByKey将数据进行重分区)

最后数据的合并

2.2 scala的aggregate和spark的aggregate/treeAggregate

数据集: Seq(1, 2, 3), 由于 0值 的选择造成结果会有差异

代码

结果

备注

Seq(1, 2, 3).aggregate(1)((acc, e) => acc + e, (e1, e2) => e1 + e2)

7

data.repartition(1).aggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e)

8

driver端再聚合

data.repartition(1).treeAggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e)

9

driver端会聚合2次

以上代码中的repartition 直接关系到scala版本和spark版本最终结果的差值的大小

3. 代码片段

3.1 lazy的序列化对象

def foldByKey(

zeroValue: V,

partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {

// Serialize the zero value to a byte array so that we can get a new clone of it on each key

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

val zeroArray = new Array[Byte](zeroBuffer.limit)

zeroBuffer.get(zeroArray)

// When deserializing, use a lazy val to create just one instance of the serializer per task

lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()

val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

val cleanedFunc = self.context.clean(func)

combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),

cleanedFunc, cleanedFunc, partitioner)

}

Copyright © 2088 神游网游活动圈 All Rights Reserved.
友情链接