spark2原理分析—shuffle框架的实现概要分析

概述

本文分析spark2的shuffle过程的实现的一个概要。

shuffle过程介绍

shuffle总体流程

spark2的shuffle过程可以分为shuffle write和shuffle read。shuffle write把map阶段计算完成的数据写入到本地。而shuffle read是从不同的计算节点获取shuffle write计算出来的数据,这样就会发生网络的数据传输和磁盘的i/o。

为什么shuffle的代价很大

由于shuffle过程有可能要做以下一些事情:

  • 重新进行数据分区
  • 数据传输
  • 数据压缩
  • 磁盘I/O

spark2的shuffle框架介绍

spark2中的框架主要包括以下几个部分:

  • ShuffleManager

    这是一个接口,负责管理shuffle相关的组件,比如:通过它来注册shuffle的操作函数,获取writer和reader等。在sparkenv中注册,通过sprkconf进行配置,配置参数是:spark.shuffle.manager,默认是sort,也就是:SortShuffleManager类。在早期的spark版本中,也实现过hashmanager后来全部统一成sort。

  • ShuffleReader

    在reduce任务中去获取来自多个mapper任务的合并记录数据。实现该接口的类只有一个:BlockStoreShuffleReader。

  • ShuffleWriter

    在mapper任务中把记录到shuffle系统。这是一个抽象类,实现该抽象类的有:SortShuffleWriter,UnsafeShuffleWriter,BypassMergeSortShuffleWriter三个。

  • ShuffleBlockResolver

    该接口的实现类需要理解:如何为逻辑的shuffle块标识(map,reduce,shuffle等)获取数据。实现者可以通过文件或文件片段来封装shuffle数据。当获取到shuffle数据时,BlockStore使用它来抽象不同的shuffle实现。该接口的实现类为:IndexShuffleBlockResolver。

ShuffleManager的实现

ShuffleManager接口

private[spark] trait ShuffleManager {

  /**
   * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** Get a writer for a given partition. Called on executors by map tasks. */
  def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

  /**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]

  /**
   * Remove a shuffle's metadata from the ShuffleManager.
   * @return true if the metadata removed successfully, otherwise false.
   */
  def unregisterShuffle(shuffleId: Int): Boolean

  /**
   * Return a resolver capable of retrieving shuffle block data based on block coordinates.
   */
  def shuffleBlockResolver: ShuffleBlockResolver

  /** Shut down this ShuffleManager. */
  def stop(): Unit
}

在spark2.4中实现该接口的类只有一个:SortShuffleManager。下面我们来看一下该类的具体实现。

ShuffleHandle

shuffle的一个不透明句柄(handle),由ShuffleManager用于将有关它的信息传递给任务(task)。

ShuffleBlockFetcherIterator

一个获取多个块的迭代器。 对于本地块,它从本地块管理器(local blocks manager)获取。对于远程块,它使用提供的BlockTransferService服务来获取。

它会创建(BlockID,InputStream)元组的迭代器,以便调用者可以在收到块时以流水线方式处理块。

SortShuffleManager类介绍

实现概要

在基于排序的shuffle中,输入的数据会根据其目标分区ID进行排序,然后写入单个map输出文件。 Reducers获取此文件的连续区域,以便读取它们map的输出部分。 如果map输出数据太大而无法放入内存,则可以将输出的已排序数据的一部分写到磁盘,并合并那些磁盘上的文件以生成最终输出文件。

基于排序的shuffle有两个不同的写路径用于生成其映射输出文件:

  • 序列化排序:在满足以下所有三个条件时使用:

    • shuffle依赖项指定没有聚合或输出顺序
    • shuffle序列化程序支持重新定位序列化值(目前这是由KryoSerializer和Spark SQL的自定义序列化程序支持)。
    • shuffle产生的输出分区少于16777216。
  • 反序列化排序:用于处理所有其他情况。

序列化排序模式

在序列化排序模式下,输入记录会在它被传递给shuffler writer时被序列化,并在排序期间以序列化形式进行缓冲。 该写入模式实现了几个优化:

  • 它的排序操作是序列化的二进制数据而不是Java对象,这样可以减少内存消耗和GC开销。 此优化要求记录序列化程序具有某些属性,以允许重新排序序列化记录,而无需反序列化。 有关详细信息,请参见SPARK-4550,其中首次提出并实施了此优化。
  • 它使用专门的缓存高效排序器([[ShuffleExternalSorter]])来排序压缩记录指针和分区ID的数组。 通过在排序数组中每个记录仅使用8个字节的空间,这样就可以把更多的数组放到内存中。
  • 溢出合并过程对属于同一分区的序列化记录块进行操作,并且在合并期间不需要对记录进行反序列化。
  • 当溢出压缩编解码器支持压缩数据的连接时,溢出合并只是简单地连接序列化和压缩的溢出分区以生成最终输出分区。 这允许使用高效的数据复制方法,如NIO的transferTo,并避免在合并期间分配解压缩或复制缓冲区的需要。

ShuffleWriter的实现

ShuffleWriter抽象类

该接口有三个实现类:

  • BypassMergeSortShuffleWriter
  • SortShuffleWriter
  • UnsafeShuffleWriter

该抽象类的实现代码如下:

private[spark] abstract class ShuffleWriter[K, V] {
  /** Write a sequence of records to this task's output */
  @throws[IOException]
  def write(records: Iterator[Product2[K, V]]): Unit

  /** Close this writer, passing along whether the map completed */
  def stop(success: Boolean): Option[MapStatus]
}

SortShuffleWriter类介绍

在mapper任务中把记录到shuffle系统。在该类中,写出数据时会使用外部排序对数据进行排序。

ShuffleReader的实现

ShuffleReader抽象类

该抽象类实现:在reduce任务中获取以从映射器中读取组合记录。目前只有一个实现类:BlockStoreShuffleReader。

private[spark] trait ShuffleReader[K, C] {
  /** Read the combined key-values for this reduce task */
  def read(): Iterator[Product2[K, C]]

  /**
   * Close this reader.
   * TODO: Add this back when we make the ShuffleReader a developer API that others can implement
   * (at which point this will likely be necessary).
   */
  // def stop(): Unit
}

BlockStoreShuffleReader介绍

该类实现了ShuffleReader抽象类,其实就只是实现了一个read函数,该函数返回一个迭代器。
该实现类的主要功能是:当其他节点的块存储(block store)请求分区时,将会读取一个shuffle过程中的多个分区,分区的范围是[startPartition,endPartition]。

总结

本文讲述了spark2的shuffle框架的实现概要分析。主要分析了spark2中的shuffle框架的实现接口和相关实现类的大概实现逻辑。接下来的文章会对shuffle框架的三个部分的详细实现进行分析。


更多精彩内容