摸索性重写Spark的groupByKey(shuffle部分)

一开始实现这个特性的时候,我并不能预见其性能是否可以优化,但是不可以不尝试。
我姑且把它叫做工程师的调性. 

现有的工作与策略

上面的前言虽说出了口,却又觉得中二,给人一种鸣人站在所有boss面前嚎着:“这就是我的忍道!”的既视感。但总觉得不得不说出来,而且是认真的那种。尝试嘛,是一个可以让人投入他所有的激情和年华在一件事上的词,此事未必带来较大的意义,但此事在经过尝试给人以信念后,会让他这一生变得更加有趣,甚至是伟大。如果有人将此作为自己做任何事的方法论,那我必然是无比佩服他的。这些话不是我刻意在技术笔记中加上一些鸡汤(对于鸡汤这种事,我并不是向大多数网友那样厌恶,我觉得能在理性的范围内给人带来正能量的东西,它绝对差不到哪里去),恰恰是我在生活中遇到一些人和事,我需要把我的所想敞开在这儿,至于为什么不单独写,那纯属是因为我懒,写不了那么大篇幅的。当然,倘若以后心血来潮,那也保不准挤出很大段文字来谈谈我的柔情似水。

最近Deca要在论文中加入VST拆解的思想和实现。之前包括缓存RDD这类运行中定长的数据拆解已经实现,但是shuffle buffer这一类对象中存在很多变长的成员,因为在shuffle过程中,reduceBykey尚能拆解,但比如groupByKey这个算子,它是将相同key的所有value聚合起来,也就一个key后面跟一串value,在shuffle运算过程中这个KV对的V是变长的,值是不确定的。之前我们所做的工作就是对Spark应用中的对象进行拆解,转换成基本的原生类型,获取他们的类型,这样之后就可以把它们写成字节数组的形式。

Spark-1.4里groupByKey在shuffle write端可以利用到堆外的内存,也就是tungsten-sort,所有的数据都会写在堆外并在堆外排序,但是shuffle-read端Spark默认还是用的HashShuffleReader,所有的聚合操作都在堆内完成,这个我们已经实现了read端的堆外版本,聚合操作运行在堆外。大致介绍下原理,这里就用到了VST拆解的原理,我们知道shuffle read端读出来的(K,C)对的基本类型,于是先实现了一个简易的map(UnsafeUnfixedWidthAggregationFlintMap),嗯名字略长。。这是一个针对系统的定制的map,也是用到了Hash原理,不过所有操作都是在堆外进行。这个map用于存储key和valueAddress,这个valueAddress是一个long型值,我们会将K对应的一组Value在堆外开辟一片疆土用于存储他们,当然每次新来value时我们会检查是否扩容,若扩容会改变这块疆土(堆外空间)的起始地址,因为涉及到内存的拷贝,所以map中的valueAddress就是这块存储区域的初始偏移地址。valueAddress指向的存储区域结构为:

让groupByKey也有mapSideCombine

策略的依据

大家都知道的是reduceByKey是具有mapSideCombine特性的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}

for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem._1, elem._2)
}
}

这是HashShuffleWriter的一段代码,mapSideCombine参数会导致Map的结果调用aggregator,在写入record的时候,会判断mapSideCombine这个值,若其为真,则在shuffle的map端提前做聚合,然而这只适用于value为单个值,因为其长度和类型不变,若像groupByKey value为一组值时,这是value长度是会变化的,这对Unsafe是无法支持的。这时候有人谁说,那堆内可不可以实现mapSideCombine的groupByKey,答案是不可以,这里我看到了一段官方在代码中的注释

1
2
3
4
5
6
7
8
 * Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {

天呐,会导致OOM。但我这里还有点疑问,在map端groupByKey会引起大量相同的key的value在内存中驻留,但是这个操作在write端也会导致内存膨胀哎,略带不解。
既然这样,为何还要实现堆外版本的,我的解释是这样的:堆外所有数据都以字节数组存储,将大大压缩对象的大小,可能达到成倍的效果,其实也就是没有对象。若堆外内存充裕的时候,实现这个特性并不会导致内存爆掉,除非几十G中的数据只有一个key这种特殊情况。我们想实现堆外groupByKey的mapSideCombine一个重要原因就是,我考虑在map端也聚合的话可以减轻read端的操作和内存压力,否则reduceByKey也不会有这个特性,当然这只是猜想,性能是否有提升还是要实验数据说话。

具体的实现

首先是record的插入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void insertOrUpdateGroupRecord(
Object originKeyBaseObject,
Object keyBaseObject,
long keyBaseOffset,
int keyLengthInBytes,
int partitionId,
Object valueBaseObject,
int valueBaseOffset) throws IOException
{


if (isContain(keyBaseObject, keyBaseOffset, keyLengthInBytes, addressLength)) {
//compact buffer数组缓冲区
//key已经存在
long oldValueAddress = PlatformDependent.UNSAFE.getLong(loc.getPage(), loc.getPosition() + 4 + keyLengthInBytes);
long newValueAddress = oldValueAddress;
switch(valueType){
case 0:
newValueAddress = UnsafeBuffer.putInt(null,(Long)oldValueAddress,valueType,(Integer)valueBaseObject);
break;
case 1:
newValueAddress = UnsafeBuffer.putLong(null,(Long)oldValueAddress,valueType,(Long)valueBaseObject);
break;
case 2:
newValueAddress = UnsafeBuffer.putDouble(null,(Long)oldValueAddress,valueType,(Double)valueBaseObject);
break;

default :
assert(valueType < 2);
break;

}

//更新地址值
PlatformDependent.UNSAFE.putLong(loc.getPage(),loc.getPosition() + 4 + keyLengthInBytes,newValueAddress);
}else{

这里是key已经存在的操作,isContain类似于hashmap的contain方法,查找key是否存在,若存在key,获取到value,也就是对应key的那一组value的地址,而UnsafeBuffer就是实现那一组value存储区域的类,它负责区域的扩容与内存申请。插入记录可能会导致valueAddress的变化,所以每一次都要更新。

接着write方法会调用closeAndWriteOutput()这个方法,用于将排好序的KV写入磁盘文件中,用于下一个stage shuffle的读取,跟踪进去需要改写一下writeSortedFile这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = memoryManager.getPage(recordPointer);
final long recordOffsetInPage = memoryManager.getOffsetInPage(recordPointer);
int dataRemaining = PlatformDependent.UNSAFE.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length

int dataSize =dataRemaining;

//key+value/address
if(!needGroup) {
while (dataRemaining > 0) {
// logger.info("dataRemaining:"+dataRemaining);
//assert(dataRemaining == 8l);
final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
PlatformDependent.copyMemory(
recordPage,
recordReadPosition,
writeBuffer,
PlatformDependent.BYTE_ARRAY_OFFSET,
toTransfer);
writer.write(writeBuffer, 0, toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
}
}

原本只是往文件中一个一个的KV写入,但现在的V指的是一组value的堆外起始地址,根据这个valueAddress才能找到真实的value值,我们现在要先写入key,接着是value的总大小,后面跟对应该key的所有value,这时候就不需要写入valueAddress的地址了,因为read端不需要这个变量。也就是加上这一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//不用写address
dataRemaining-=8;
while (dataRemaining > 0) {
// logger.info("dataRemaining:"+dataRemaining);
//assert(dataRemaining == 8l);
final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
PlatformDependent.copyMemory(
recordPage,
recordReadPosition,
writeBuffer,
PlatformDependent.BYTE_ARRAY_OFFSET,
toTransfer);
writer.write(writeBuffer, 0, toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
}
// /**
// by kzx
//暂定int型key so 为4
int keySize = dataSize - 8;
long valueAddress = PlatformDependent.UNSAFE.getLong(recordPage, recordOffsetInPage + 4 + keySize);
//skip第一个size
long valuePosition = valueAddress + 4 ;
int valueSize = PlatformDependent.UNSAFE.getInt(null, valuePosition)-4;//减去第一个size
//结构为size,realValue...
int valueRemaining = valueSize;
while (valueRemaining > 0) {
final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, valueRemaining);
PlatformDependent.copyMemory(
null,
valuePosition,
writeBuffer,
PlatformDependent.BYTE_ARRAY_OFFSET,
toTransfer);
writer.write(writeBuffer, 0, toTransfer);
valuePosition += toTransfer;
valueRemaining -= toTransfer;
}
//记住这里写进去的valueSize没有减4
//add by kzx,free memory
PlatformDependent.UNSAFE.freeMemory(valueAddress);
}

切记不要忘了在写完一条记录之后要释放掉对应的堆外内存!

然后就是read端的改写,这里花了有点时间。众所周知,Spark是惰性执行,代码中充满着各种各样的迭代器,追踪代码时都不知道哪个迭代器被调用了。最后调试发现调用路径是这样的FlintHashShuffleReader-> BlockStoreShuffleFetcher-> ShuffleBlockFetcherIterator.这个iterator里面的next会有一个获取Key-Value对的地方,其实也就是做对应的反序列化,我要改的地方也就在读取反序列化读取Value这儿。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Reads the object representing the value of a key-value pair. */
override def readValue[T: ClassTag](): T = {
val redundantSize = 4+4 //包含了buffer中的两个size
if (multiValue) {
val valueSize = ldis.readInt()
if(valueType == 0){
//int
val valueNum = (valueSize-redundantSize)/4
val valueArray = new Array[Integer](valueNum)
var index = 0
while(index < valueNum){
val temp = ldis.readInt()
valueArray(index) = temp
index+=1
}
valueArray.asInstanceOf[T]
}else if(valueType == 1){
...
}

从读单个value改写成读一组value。这里的size有点绕,起先我遇到的bug就是这个点造成的。

测试

事情往往总是事与愿违的,也是出乎意料的。优化的效果并不好,做了与加这个特性之前的Deca比较实验:
read端的确快了一点,原因有二:

1.减少了大量的key的查询,因为很多相同keyvalue已经在value端聚合在一起了。
2.shuffle文件比原有的size要小,减少了网络的传输。

不过效果算得上微乎其微。最要命的是,write端由于多余的聚合操作,运行时间从2.8min提高到约4min。。平心而论,算得上失败的一次尝试了。

后续

对于优化效果我是存有疑问的,read端不至于优化效果如此微小,这方面我暂时不想达到微小的工作这个水平。
//12月5日更新
后来我在代码中加了一些log,发现shuffle的map端内存中聚合是写文件花费时间的好几倍,这里的原因是原数据文件中相同key的value并不多,我查了一下大概就四十多个,也就是不存在hot key。所以shuffle文件的大小在加不加mapsideCombine的两种情况下相差并不大,所以当key的数量很少时,有mapSideCombine的groupByKey性能才回更有优势,insertSorter和write之间需要达到一种平衡。这个我正在做测试。
不过也给了一种启发,如果每个partition相同的key不多,而且每个key存在大量value时,采用mapsideCombine的groupBykey是一个不错的选择。