本文共 7106 字,大约阅读时间需要 23 分钟。
由于大部分Spark计算都是在内存中完成的,所以Spark程序的瓶颈可能由集群中任意一种资源导致,如:CPU、网络带宽、或者内存等。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存()。本文将主要涵盖两个主题:1.数据序列化(这对于优化网络性能极为重要);2.减少内存占用以及内存调优。同时,我们也会提及其他几个比较小的主题。
序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库:
接口的对象,都能被序列化。同时,你还可以通过扩展
来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。
Serializable
接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。要切换到使用 Kryo,你可以在 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。
Spark对一些常用的Scala核心类型(包括在 库的AllScalaRegistrar中)自动使用Kryo序列化格式。
如果你的自定义类型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注册:
val conf = new SparkConf().setMaster(...).setAppName(...)conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))val sc = new SparkContext(conf)
Kryo的文档( )中有详细描述了更多的高级选项,如:自定义序列化代码等。
如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer 配置项()。其值至少需要大于最大对象的序列化长度。
最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。
内存占用调优主要需要考虑3点:1.数据占用的总内存(你多半会希望整个数据集都能装进内存吧);2.访问数据集中每个对象的开销;3.垃圾回收的开销(如果你的数据集中对象周转速度很快的话)。
一般,Java对象的访问时很快的,但同时Java对象会比原始数据(仅包含各个字段值)占用的空间多2~5倍。主要原因有:
本节只是Spark内存管理的一个概要,下面我们会更详细地讨论各种Spark内存调优的具体策略。特别地,我们会讨论如何评估数据的内存使用量,以及如何改进 – 要么改变你的数据结构,要么以某种序列化格式存储数据。最后,我们还会讨论如何调整Spark的缓存大小,以及如何调优Java的垃圾回收器。
Spark中内存主要用于两类目的:执行计算和数据存储。执行计算的内存主要用于混洗(Shuffle)、关联(join)、排序(sort)以及聚合(aggregation),而数据存储的内存主要用于缓存和集群内部数据传播。Spark中执行计算和数据存储都是共享同一个内存区域(M)。如果执行计算没有占用内存,那么数据存储可以申请占用所有可用的内存,反之亦然。执行计算可能会抢占数据存储使用的内存,并将存储于内存的数据逐出内存,直到数据存储占用的内存比例降低到一个指定的比例(R)。换句话说,R是M基础上的一个子区域,这个区域的内存数据永远不会被逐出内存。然而,数据存储不会抢占执行计算的内存(否则实现太复杂了)。
这样设计主要有这么几个需要考虑的点。首先,不需要缓存数据的应用可以把整个空间用来执行计算,从而避免频繁地把数据吐到磁盘上。其次,需要缓存数据的应用能够有一个数据存储比例(R)的最低保证,也避免这部分缓存数据被全部逐出内存。最后,这个实现方式能够在默认情况下,为大多数使用场景提供合理的性能,而不需要专家级用户来设置内存使用如何划分。
虽然有两个内存划分相关的配置参数,但一般来说,用户不需要设置,因为默认值已经能够适用于绝大部分的使用场景:
确定一个数据集占用内存总量最好的办法就是,创建一个RDD,并缓存到内存中,然后再到web UI上”Storage”页面查看。页面上会展示这个RDD总共占用了多少内存。
要评估一个特定对象的内存占用量,可以用 SizeEstimator.estimate 方法。这个方法对试验哪种数据结构能够裁剪内存占用量比较有用,同时,也可以帮助用户了解广播变量在每个执行器堆上占用的内存量。
减少内存消耗的首要方法就是避免过多的Java封装(减少对象头和额外辅助字段),比如基于指针的数据结构和包装对象等。以下有几条建议:
中设置这个参数。
如果经过上面的调整后,存储的数据对象还是太大,那么你可以试试将这些对象以序列化格式存储,所需要做的只是通过 设置好存储级别,如:MEMORY_ONLY_SER。Spark会将RDD的每个分区以一个巨大的字节数组形式存储起来。以序列化格式存储的唯一缺点就是访问数据会变慢一点,因为Spark需要反序列化每个被访问的对象。如果你需要序列化缓存数据,我们强烈建议你使用Kryo(),和Java序列化相比,Kryo能大大减少序列化对象占用的空间(当然也比原始Java对象小很多)。
JVM的垃圾回收在某些情况下可能会造成瓶颈,比如,你的RDD存储经常需要“换入换出”(新RDD抢占了老RDD内存,不过如果你的程序没有这种情况的话那JVM垃圾回收一般不是问题,比如,你的RDD只是载入一次,后续只是在这一个RDD上做操作)。当Java需要把老对象逐出内存的时候,JVM需要跟踪所有的Java对象,并找出那些对象已经没有用了。概括起来就是,垃圾回收的开销和对象个数成正比,所以减少对象的个数(比如用 Int数组取代 LinkedList),就能大大减少垃圾回收的开销。当然,一个更好的方法就如前面所说的,以序列化形式存储数据,这时每个RDD分区都只包含有一个对象了(一个巨大的字节数组)。在尝试其他技术方案前,首先可以试试用序列化RDD的方式()评估一下GC是不是一个瓶颈。
如果你的作业中各个任务需要的工作内存和节点上存储的RDD缓存占用的内存产生冲突,那么GC很可能会出现问题。下面我们将讨论一下如何控制好RDD缓存使用的内存空间,以减少这种冲突。
衡量GC的影响
GC调优的第一步是统计一下,垃圾回收启动的频率以及GC所使用的总时间。给JVM设置一下这几个参数(参考Spark配置指南 – ,查看Spark作业中的Java选项参数):-verbose:gc -XX:+PrintGCDetails,就可以在后续Spark作业的worker日志中看到每次GC花费的时间。注意,这些日志是在集群worker节点上(在各节点的工作目录下stdout文件中),而不是你的驱动器所在节点。
高级GC调优
为了进一步调优GC,我们就需要对JVM内存管理有一个基本的了解:
Spark GC调优的目标就是确保老生代(Old generation )只保存长生命周期RDD,而同时新生代(Young generation )的空间又能足够保存短生命周期的对象。这样就能在任务执行期间,避免启动full GC。以下是GC调优的主要步骤:
我们的很多经验表明,GC调优的效果和你的程序代码以及可用的总内存相关。网上还有不少调优的选项说明(),但总体来说,就是控制好full GC的启动频率,就能有效减少垃圾回收开销。
一般来说集群并不会满负荷运转,除非你吧每个操作的并行度都设得足够大。Spark会自动根据对应的输入文件大小来设置“map”类算子的并行度(当然你可以通过一个SparkContext.textFile等函数的可选参数来控制并行度),而对于想 groupByKey 或reduceByKey这类 “reduce” 算子,会使用其各父RDD分区数的最大值。你可以将并行度作为构建RDD第二个参数(参考 ),或者设置 spark.default.parallelism 这个默认值。一般来说,评估并行度的时候,我们建议2~3个任务共享一个CPU。
如果RDD比内存要大,有时候你可能收到一个OutOfMemoryError,但其实这是因为你的任务集中的某个任务太大了,如reduce任务groupByKey。Spark的混洗(Shuffle)算子(sortByKey,groupByKey,reduceByKey,join等)会在每个任务中构建一个哈希表,以便在任务中对数据分组,这个哈希表有时会很大。最简单的修复办法就是增大并行度,以减小单个任务的输入集。Spark对于200ms以内的短任务支持非常好,因为Spark可以跨任务复用执行器JVM,任务的启动开销很小,因此把并行度增加到比集群中总CPU核数还多是没有任何问题的。
使用SparkContext中的广播变量相关功能()能大大减少每个任务本身序列化的大小,以及集群中启动作业的开销。如果你的Spark任务正在使用驱动器(driver)程序中定义的巨大对象(比如:静态查询表),请考虑使用广播变量替代之。Spark会在master上将各个任务的序列化后大小打印出来,所以你可以检查一下各个任务是否过大;通常来说,大于20KB的任务就值得优化一下。
数据本地性对Spark作业往往会有较大的影响。如果代码和其所操作的数据在统一节点上,那么计算速度肯定会更快一些。但如果二者不在一起,那必然需要挪动其中之一。一般来说,挪动序列化好的代码肯定比挪动一大堆数据要快。Spark就是基于这个一般性原则来构建数据本地性的调度。
数据本地性是指代码和其所处理的数据的距离。基于数据当前的位置,数据本地性可以划分成以下几个层次(按从近到远排序):
Spark倾向于让所有任务都具有最佳的数据本地性,但这并非总是可行的。某些情况下,可能会出现一些空闲的执行器(executor)没有待处理的数据,那么Spark可能就会牺牲一些数据本地性。有两种可能的选项:a)等待已经有任务的CPU,待其释放后立即在同一台机器上启动一个任务;b)立即在其他节点上启动新任务,并把所需要的数据复制过去。
而通常,Spark会等待一小会,看看是否有CPU会被释放出来。一旦等待超时,则立即在其他节点上启动并将所需的数据复制过去。数据本地性各个级别之间的回落超时可以单独配置,也可以在统一参数内一起设定;详细请参考 中的 spark.locality 相关参数。如果你的任务执行时间比较长并且数据本地性很差,你就应该试试调大这几个参数,不过默认值一般都能适用于大多数场景了。
本文是一个简短的Spark调优指南,列举了Spark应用调优一些比较重要的考虑点 – 最重要的就是,数据序列化和内存调优。对于绝大多数应用来说,用Kryo格式序列化数据能够解决大多数的性能问题。如果您有其他关于性能调优最佳实践的问题,欢迎邮件咨询( )。
该文转自
官方英文地址
本文转自shishanyuan博客园博客,原文链接: http://www.cnblogs.com/shishanyuan/p/8481854.html ,如需转载请自行联系原作者