Spark - Task的执行过程(二)- UnsafeShuffleWriter

上一篇讲了BypassMergeSortShuffleWriter实现方式,并且知道选择BypassMergeSortShuffleWriter的时候,分区数是不能超过200的,因为每次执行的时候,会根据分区数量,先生成临时文件,如果分区数很多的话,那就会很有很多的临时文件,磁盘性能就非常不好。
UnsafeShuffleWriter也是不具备聚合功能的,但是他使用Tungsten的内存作为缓存,这样磁盘的性能就得到了大大的提升。

流程

拿到RDD的结果后,迭代每条记录,把记录的键值对写入到SerializationStream的输出流中,SerializationStream是包装了名为serBuffer的MyByteArrayOutputStream对象。

然后把字节数组交给ShuffleExternalSorter,一个专门用于对Shuffle数据进行排序的外部排序器。外部排序器拿到字节数组,发现此时没有page(即MemoryBlock),于是就创建了一个page,把字节数组放在page中。另外需要把创建的page加入到已经分配的Page列表allocatedPages,并把创建的page作为currentPage。

同时还要记录元数据信息,存放在ShuffleInMemorySorter的长整型数组中用于排序,其中高24位存储分区ID,中间13位存储页号,低27位存储偏移量。

重复着上面的步骤,迭代RDD每条记录,直至page没有足够的空间。

此时就要申请分配新的Page,把这个page加入到已经分配的Page列表allocatedPages,并把新创建的page作为currentPage,新的字节数组就往新的page里放。

RDD的结果继续迭代,ShuffleInMemorySorter中的记录数超过设定的值,或者长整型数组已经不够用但是无法扩展的时候,就需要内存中的数据将被溢出到磁盘。
首先是对内存中的数据根据分区进行排序,我们下面用0或1表示分区,XY标识页号和偏移量。经过排序后,内存中的数据就根据分区有序的排列起来。

然后根据分区,把数据写入文件中,并把文件信息写在SpillInfo中,SpillInfo记录了这个文件的信息、blockId信息以及每个分区对应文件里的长度(比如4,4)。这个SpillInfo最终存放在元数据信息的列表spills中。

数据都保存在磁盘后,那内存中的数据就可以清除回收了,包括已经分配的Page列表allocatedPages、每个page、currentPage、ShuffleInMemorySorter。

假设RDD的结果在写完3次file时结束了,那此时spills列表就有3个SpillInfo,每个SpillInfo都指向着文件,并且记录每个分区对应的长度(假设为(4,4),(3,2),(5,1))。

最终根据spilss列表中的3个SpillInfo,根据每个分区对应的长度,把每个文件分区0的内容写入到新的文件,并记录分区0的长度为12,再把每个文件分区1的内容写入到新的文件,并记录分区1的长度为7,根据两个分区的长度,生成索引文件。

和BypassMergeSortShuffleWriter比起来,假设有1万个分区,BypassMergeSortShuffleWriter是会先生成1万个临时文件,最后再进行合并,而UnsafeShuffleWriter是根据RDD结果集的数据大小,生成临时文件,大大减少了临时文件的个数。

作者:大军原文地址:https://segmentfault.com/a/1190000040505645

%s 个评论

要回复文章请先登录注册