快乐学习
前程无忧、中华英才非你莫属!

Spark调优101

Spark调优101

原创2017-04-14陆昕畅游DT时代

前言

Spark已经成为大数据行业最为倚重的分析工具之一,相较于其他分析系统,它拥有计算速度快、功能完善、部署灵活等优势。但是,作为一款开源的项目,Spark并不对其运行效率负责,也就是说完成Spark程序的开发仅仅是万里长征的第一步,更艰巨的挑战来自于如何对Spark程序进行调优,从而稳定高效地执行日常计算任务。

Spark运行时架构

Spark的上手实在是太容易了,以至于我们在写第一个Spark程序时可能并不关注它的运行架构。但当我们讨论如何在集群环境中优化分布式运行的Spark程序时,这一点就变得至关重要了。本节主要为大家梳理一下Spark程序运行时涉及的各种基本概念,熟悉的同学可以跳过。

 本文中的Spark的集群资源管理都是由YARN负责,使用其他资源管理器(例如Mesos)拥有相类似的架构。下图是以yarn-cluster模式提交的Spark运行时架构示意图。

1.  Driver:驱动进程用于启动一项或多项Spark作业(Job),向执行器(Executor)分配计算任务(Task),在每项任务完成后收集结果数据。

2.  Executor: 执行器进程具体执行对数据的计算任务,在程序运行的过程中,每个Executor会不断接收Diver下发的Task;一个Spark程序在运行时通常由一个驱动程序和多个执行器构成。

3.  Container:对于YARN来说,Spark程序的Driver和Executor都是一个容器(Container),每个Container拥有一定的CPU和内存资源。

4.  Application:向Spark系统打包提交的一个程序称为一个应用(Application),每个应用可以包含多个相关的Job,也可以是一个在后台长期运行的服务器(例如响应SQL查询请求的Spark Thrift Server)。

5.  Job:一项作业是对DAG(可以理解为执行计划)的具体实现,通常由程序中的Action算子触发,例如读取、写入数据各为一项独立的Job,而多个Transformation算子(例如Map、Reduce操作)可以共同构成一项Job。

6.  Stage:一个Job可以分割为多个阶段(Stage),每个Stage是一系列Task的集,通常执行一个或多个不包含Shuffle的Transformation算子;Stage的划分原则通常以Shuffle操作为边界,一个Stage的所有Task都执行完毕之后,会在各节点的磁盘文件中写入中间结果,然后由Driver调度运行下一个Stage。

7.  Task:Task 是Spark中最小的运算单元,每个Task对整个数据集中的一小部分数据执行完全相同的代码段,一个程序分配Task的数量即整个Spark任务的并行度(Parallelism),通常每个Executor会被分配多个Task,以此提高运算速度。

8.  Shuffle:Shuffle可以看作是一种特殊的Task,常常由groupByKey、reduceByKey等算子触发,执行此类算子需要从父 RDD 中的多个 Partition 中读取数据并打散重组,因此会消耗大量计算和IO资源,是一种需要尽量避免的操作。

9.  Partition:存放在RDD中的分区数据称为Partition,从HDFS上读取数据时,默认与分布式存放的Block一一对应,通常存放数据的HDFS块大小为128MB,因此每个Partition的数据大小也是128MB,并且这个数值也是一个Task处理的数据块大小,如果不另加指定,Spark分配的Task数量与数据的分区数量是直接相关的。

Spark优化策略

 针对不同场景的Spark应用而言,调优策略可能迥然不同,归纳而言有以下几个方面:

1.  基于Spark参数的调优:直接调整Spark集群配置和运行时参数可以极大地改善了程序执行效率。

2.  基于资源管理器的参数调优:在生产环境部署下,Spark往往接受外部的集群资源管理器的调度,例如YARN,因此需要针对性地进行Spark任务有利的配置。

3.  基于程序开发的调优:重点在于开发时数据结构和算子的选择,其中重要的一个方面是shuffle操作相关的优化,要求开发者对Spark原理有较深入的理解。

4.  针对Spark各模块的优化策略:例如对于SparkSQL而言,数据存储方式与SQL语句的优化也是优化重点。

在系统调优过程中,每一项调优策略就好比一件武器,而系统的性能就是一环环的靶标,神枪手总是要优先了解自己手中的武器,并在执行特定的任务时选用最优的武器。对于优化而言,有的任务要求正中靶心(把某项任务的性能提高到极致),这时就需要选择一把高精度的狙击步枪(例如可以从以上几个方面定制详细的优化策略);而有的任务则要求在任何情况下保证摧毁目标而不必在意精度(采取某些通用的策略可普遍提高多个应用的运行性能),此时就可以选择相对简单粗暴的武器(例如通过上述1、2条直接提高某些任务的资源配比)。下文中重点介绍的这种通用的策略。

YARN参数的选择

Spark on YARN是实际生产环境下最常见的部署方式,在这种部署方式下,YARN统一负责底层资源的调度,包括Spark、Hive、MapReduce、Storm等多种类型的分布式应用。此外还有一些分布式应用没有通过YARN实现资源管理,例如HDFS、Hbase和Impala等。对于Spark运行为主的集群而言,我们需要合理配置YARN的各项参数,使之在运行Spark程序时能够达到最佳性能,同时兼顾其他相关应用,这是在部署集群时就需要考虑清楚的问题。  

首先需要评估集群节点的硬件配置,为系统及运行在系统上的其他服务预留资源。下表中我们为操作系统、Web程序、HDFS和YARN NodeManger预留了必要的CPU和内存资源,剩下的资源交由YARN管理。

 注意在YARN中,CPU的核数称为VCore(虚拟核)。不同于物理核数的概念,VCore的设计初衷是考虑到不同节点的CPU性能有高低,为计算能力更高的CPU多配置几个虚拟核得以弥补这种差异。在我们的集群中,CPU的型号是一致的,不存在性能差异,因此相应的物理核数与YARN VCore系数定为1,读者可根据自身集群情况选择参数。

完成了集群节点的基础规划之后,我们就可以对YARN的相关参数进行配置了。考虑到Spark任务对内存的需求较高,在设置内存相关的限制时我们应尽量放开。注意区分NodeManger的参数和Scheduler的参数,前者对节点资源做了限制,后者对每项提交给YARN的任务资源做了限制。

最后,每个节点的可用资源数量乘上集群的节点数量,就是所有YARN可以利用的CPU和内存资源。

  • yarn.nodemanager.resource.memory-mbYARN可以为每个节点分配的最大内存,设为320 GB。

  • yarn.nodemanager.resource.cpu-VCoresYARN可以为每个节点分配的最大VCore数量,设为28个。

  • yarn.scheduler.maximum-allocation-mb:单个任务分配Container的最大物理内存,不宜设置过小,此处设为64GB。

  • yarn.scheduler.minimum-allocation-mb:单个任务分配Container的最小物理内存,设为1024MB。

  • yarn.scheduler.maximum-allocation-VCores:单个任务分配Container最大虚拟核数,设为28个。

  • yarn.scheduler.minimum-allocation-VCores :单个任务分配Container最小虚拟核数,设为1个。

Spark参数调优

上面已经介绍了Spark作业的基本运行原理,并且完成了YARN的配置工作,接下来的一项重要工作就是对Spark运行时参数的调优。这属于一种“简单粗暴”的武器,通过为不同应用分配合适的计算资源,提升并发程度,最大化地提高运算性能。以下介绍几个最重要的资源分配参数。

  • num-executors:集群为一个Spark应用分配执行器的个数。Driver进程会在集群节点上启动多个Executor实例,如果不对该参数配置,Spark只会默认启动少量的Executor,从而导致集群内节点的闲置。Executor的个数往往与集群内Worker节点的个数有关,确保每个节点至少有一个Executor,同时也需要考虑整体的内存和CPU资源低于节点可供分配的上限。

  • executor-cores:为每个Executor分配的VCore个数。显然这个参数不能超过YARN中每个Container可以获得的VCore个数上限(yarn.scheduler.maximum-allocation-mb),并且通常只会设置为2-5个。这是由于一个HDFS任务通常仅能支持5个并发线程,当VCore数量超过5个时就会遇到明显的读写瓶颈,严重降低单个Executor处理文件的速度。

  • executor-memory:为每个Executor分配的内存大小。同样这个参数不能超过YARN中为每个Container分配内存的上限(yarn.scheduler.maximum-allocation-mb)。从1.6.0版本开始,Spark采用了新的内存模型,为Executor分配的内存通常用于三个部分:用户自行管理的内存,用于存放用户定义的数据结构;Spark的执行内存,用于执行shuffle、join等操作;以及RDD缓存空间。该参数的设置很大程度上取决于用户对内存的规划,当然内存设置过高会导致明显的JVM垃圾回收时延(GC Time)。通常我们设置在4G-16G之间。

  • spark.default.parallelism:每个Stage默认分配的Task数量,这个参数决定了整个Spark程序最终的并行度。Spark官方建议设置该参数为num-executors* executor-cores的2~3倍,比如Executor的总VCore数量为300个,那么可以设置并发都在1000,此时可以充分地利用Spark集群的资源。事实上,在处理大数据集时这个参数并不需要去额外设置。举例而言,在处理100GB数据时,数据文件分布在HDFS上800个Block中(默认每个Block的大小128MB),而Task数量与数据存放在HDFS上的Block数量是一一对应的,也就是说Spark会启动相应的800个Task,对于上述设置而言亦处于合理区间。此时如果强制要求1000并行度,势必要切分一部分Block,引入额外的时延。

按照以上参数配置的Spark应用,会被提交给YARN进行调度,在集群资源充足的情况下,每个Executor会被YARN封装为一个Container,每个Container的核数与executor-cores的值保持一致,而每个Container的内存则会略高于executor-memory所声明的数值,超出部分用于JVM堆外内存及其他必要开销(通常占10%)。此外,当Spark应用按照YARN-cluster方式提交时,Driver程序同样以一个Container的形式运行在集群内某个节点上,占用1个VCore和指定的内存。至此,基于以上参数的优化,我们就可以估算出整体的资源消耗:

    total executors = num-executors + 1 

    total VCores = num-executors * executor-cores + 1

    total memories ≈ (num-executors * executor-memory +  driver-memory)* 1.10 

 举例而言,针对一个拥有22个节点的集群,每台节点可供YARN调度的资源有32 个虚拟核和320GB内存,共计704个Vcore和6.88TB的内存可供分配。针对某个Spark应用,我们希望充分占用所有节点进行运算,首先根据节点数确定Executor的数量,比如每台节点运行3个一共66个,其中留出1个用作Driver,因此提交参数num-executors = 65。接下来参考以上讨论的原则为每个Executor分配的资源,executor-memory = 16 GB,executor-cores = 3,driver-memory = 4GB。Parallelism由Spark自动分配。最终估算出来该应用总共需要占用Vcore数量为195个,总内存约1150GB,CPU资源占YARN可供分配总量的30%左右,内存资源占到约20%。此时该Spark应用能够达到我们的性能需求,同时集群尚有足够的资源执行其他分布式任务。

./spark-submit \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<master yarn-cluster \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<class com.test.Runner \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<driver-memory 4g \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<executor-memory 16g \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<executor-cores 3 \

<span style="margin:0;padding:0;max-width:100% !important;box-sizing:border-box !important;-webkit-box-sizing:border-box !important;word-wrap:break-word !important;color:rgb(0,128,255);font-size:14px"--<num-executors 65 \

sparkAppPath/spark-processor.jar

Spark参数配置方法

除了以上几个重要的性能参数以外,Spark还有很多运行状态可以通过参数调整,详细的参数说明可见官方说明:Spark Configuration。这些运行时参数有三种方式进行提交:

1. 在程序中通过SparkContext. SparkConf对象使用set()方法配置,例如:

val conf = new SparkConf()

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; .setMaster("yarn-cluster") &#xa0; &#xa0; &#xa0;

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; .setAppName("MySpark")

2. 在spark-default.conf添加默认配置,例如:

#开启记录spark运行日志

spark.eventLog.enabled true

#使用KryoSerializer进行序列化

spark.serializerorg.apache.spark.serializer.KryoSerializer

3. 在spark-submit时通过选项指定配置,例如:

&#xa0;./bin/spark-submit \&#xa0;

&#xa0;--name "Myapp" \

&#xa0;--master yarn-cluster \

&#xa0;--conf park.eventLog.enabled=false

获取Spark运行信息的获取

&#xa0;最后谈一谈如何通过Spark UI获取程序运行信息,熟练应用这些反馈能够为相关优化快速指明方向。下面简单介绍这些页面的各自的主要功能。

  • Jobs: 呈现Spark应用分割的Job列表,可以查询正在执行和已经完成的各项作业及其运行时间。

  • Stages:可用于查看每个Stage的具体运行情况,包括执行计划(DAG)、每个Executor的任务分配情况以及每个Task的执行细节,重点关注以下几个指标:

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;&#xa0;Duration:Task执行时长

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; &#xa0;GC Time:JVM垃圾回收时长

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; &#xa0;Input Size / Records:每个Task读取的数据大小

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; &#xa0;Write Time:每个Task写文件到磁盘的时长

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; &#xa0;Shuffle Write Size / Records:每个Task需要执行Shuffle的数据大小

&#xa0;&#xa0;&#xa0;&#xa0;&#xa0; &#xa0;Errors:Task执行过程中的报错信息

例如,过大或过小的Input Size意味着不合理的数据分区,并有可能造成部分Task执行时间过长;而GC Time时延过大可能意味着用户代码分配不足,需要调整相关设置,等等。结合这些信息往往可以定位大部分性能问题,从而有针对性地进行优化。

  • Storage:展示每个RDD缓存占用的内存和磁盘的空间,可以用于调整Spark程序中数据的缓存策略。

  • Environment:该页面下可以核查Spark应用提交的所有运行环境信息、依赖关系,以及自定义的参数等。 &#xa0; &#xa0; &#xa0; &#xa0; &#xa0; &#xa0; &#xa0;

  • Executors:该页面下包括所有Executor在执行整个应用过程中所有Task的完成情况,可用于核查是否充分占用集群资源,是否存在失败率明显偏高的Executor,并可以结合YARN Container执行的详细日志对程序进行排错。

结束语

&#xa0;&#xa0;&#xa0;&#xa0;本文从Spark运行时的原理架构入手,介绍了Spark程序的几个优化方向,重点展开说明了基于YARN和基于Spark参数的调优方法和步骤。这两项通用优化策略,重点在于充分利用集群计算资源,在大多数情况下都能适用。此外,正如文中所述,基于程序开发的调优和针对Spark专项模块的调优则属于更加精细的优化策略,这方面的相关技巧需要通过实践多加积累,希望未来有能力发表其中见解。

参考资料

[1] http://spark.apache.org/docs/latest/tuning.html

[2] Karau, H., Konwinski, A., Wendell, P., Zaharia, M., Learning Spark

[3]https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning.html

[4] http://blog.csdn.net/u012102306/article/details/51637366

-END-

声明:

本文为中国联通网研院网优网管部IT技术研究团队独家提供。

如需转载或合作,请联系管理员(luxin@dimpt.com)

长按既可添加关注

推荐公众号

打赏
赞(0) 打赏
未经允许不得转载:同乐学堂 » Spark调优101

特别的技术,给特别的你!

联系QQ:1071235258QQ群:710045715

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

error: Sorry,暂时内容不可复制!