快乐学习
前程无忧、中华英才非你莫属!

Spark编程模型:spark-shell与IDEA实战

1、Spark编程模型

作者:石山园  出处:http://www.cnblogs.com/shishanyuan/

1.1 术语定义

l应用程序(Application: 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor

l驱动程序(Driver Program:运行Applicationmain()函数并且创建SparkContext,通常用SparkContext代表Driver Program

l执行单元(Executor: 是为某Application运行在Worker Node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的Executors

l集群管理程序(Cluster Manager): 在集群上获取资源的外部服务(例如:StandaloneMesosYarn)

l操作(Operation:作用于RDD的各种操作分为TransformationAction

1.2 模型组成

Spark应用程序可分两部分:Driver部分和Executor部分

1.2.1 Driver部分

Driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。

1.2.2 Executor部分

Spark应用程序的Executor部分是对数据的处理,数据分三种:

1.2.2.1 原生数据

包含原生的输入数据和输出数据

l对于输入原生数据,Spark目前提供了两种:

Ø  Scala集合数据集:如Array(1,2,3,4,5)Spark使用parallelize方法转换成RDD

Ø  Hadoop数据集:Spark支持存储在hadoop上的文件和hadoop支持的其他文件系统,如本地文件、HBaseSequenceFileHadoop的输入格式。例如Spark使用txtFile方法可以将本地文件或HDFS文件转换成RDD

l对于输出数据,Spark除了支持以上两种数据,还支持scala标量

Ø  生成Scala标量数据,如count(返回RDD中元素的个数)、reducefold/aggregate;返回几个标量,如take(返回前几个元素)。

Ø  生成Scala集合数据集,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。

Ø  生成hadoop数据集,如saveAsTextFilesaveAsSequenceFile

1.2.2.2 RDD

RDD具体在下一节中详细描述,RDD提供了四种算子:

l输入算子:将原生数据转换成RDD,如parallelizetxtFile

l转换算子:最主要的算子,是Spark生成DAG图的对象,转换算子并不立即执行,在触发行动算子后再提交给driver处理,生成DAG -->  Stage --> Task  </span--<> Worker执行。

l缓存算子:对于要多次使用的RDD,可以缓冲加快运行速度,对重要数据可以采用多备份缓存。

l行动算子:将运算结果RDD转换成原生数据,如countreducecollectsaveAsTextFile等。

1.2.2.3 共享变量

Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:

l广播变量(Broadcast Variables:可以缓存到各个节点的共享变量,通常为只读

– 广播变量缓存到各个节点的内存中,而不是每个 Task

– 广播变量被创建后,能在集群中运行的任何函数调用

– 广播变量是只读的,不能在被广播后修改

– 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本

使用方法:

val broadcastVar = sc.broadcast(Array(1, 2, 3))

l累计器:只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用“+=”操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。

使用方法:

val accum = sc.accumulator(0)

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum  + = x)

accum.value

val num=sc.parallelize(1 to 100)

2RDD

2.1 术语定义

l弹性分布式数据集(RDD): Resillient Distributed DatasetSpark的基本计算单元,可以通过一系列算子进行操作(主要有TransformationAction操作);

l有向无环图(DAG):Directed Acycle graph,反应RDD之间的依赖关系;

l有向无环图调度器(DAG Scheduler):根据Job构建基于StageDAG,并提交StageTaskScheduler

l任务调度器(Task Scheduler):将Taskset提交给worker(集群)运行并回报结果;

l窄依赖(Narrow dependency):子RDD依赖于父RDD中固定的data partition

l宽依赖(Wide Dependency):子RDD对父RDD中的所有data partition都有依赖。

2.2 RDD概念

RDDSpark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDDSpark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法交互式数据挖掘来说,效率提升非常大。

RDD 最适合那种在数据集上的所有元素都执行相同操作的批处理式应用。在这种情况下, RDD 只需记录血统中每个转换就能还原丢失的数据分区,而无需记录大量的数据操作日志。所以 RDD 不适合那些需要异步、细粒度更新状态的应用 ,比如 Web 应用的存储系统,或增量式的 Web 爬虫等。对于这些应用,使用具有事务更新日志和数据检查点的数据库系统更为高效。

2.2.1 RDD的特点

1.来源:一种是从持久存储获取数据,另一种是从其他RDD生成

2.只读:状态不可变,不能修改

3.分区:支持元素根据 Key 来分区 ( Partitioning ) ,保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统

4.路径:在 RDD 中叫世族或血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的

5.持久化:可以控制存储级别(内存、磁盘等)来进行持久化

6.操作:丰富的动作 ( Action ) ,如CountReduceCollectSave 

2.2.2 RDD基础数据类型

目前有两种类型的基础RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。这两种类型的RDD都可以通过相同的方式进行操作,从而获得子RDD等一系列拓展,形成lineage血统关系图。

1. 并行化集合

并行化集合是通过调用SparkContextparallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组创建一个并行集合。

例如:val rdd = sc.parallelize(Array(1 to 10)) 根据能启动的executor的数量来进行切分多个slice,每一个slice启动一个Task来进行处理。

val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的数量

2. Hadoop数据集

Spark可以将任何Hadoop所支持的存储资源转化成RDD,如本地文件(需要网络文件系统,所有的节点都必须能访问到)、HDFSCassandraHBaseAmazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。

(1)使用textFile()方法可以将本地文件或HDFS文件转换成RDD

支持整个文件目录读取,文件可以是文本或者压缩文件(gzip等,自动执行解压缩并加载数据)。如textFile(”file:///dfs/data”)

支持通配符读取,例如:

val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");

val rdd2=rdd1.map(_.split("t")).filter(_.length==6)

rdd2.count()

......

14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903

......

textFile()可选第二个参数slice,默认情况下为每一个block分配一个slice。用户也可以通过slice指定更多的分片,但不能使用少于HDFS block的分片数。

(2)使用wholeTextFiles()读取目录里面的小文件,返回(用户名、内容)对

(3)使用sequenceFile[K,V]()方法可以将SequenceFile转换成RDDSequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)

(4)使用SparkContext.hadoopRDD方法可以将其他任何Hadoop输入类型转化成RDD使用方法。一般来说,HadoopRDD中每一个HDFS block都成为一个RDD分区。

此外,通过Transformation可以将HadoopRDD等转换成FilterRDD(依赖一个父RDD产生)和JoinedRDD(依赖所有父RDD)等。

2.2.3 例子:控制台日志挖掘

假设网站中的一个 WebService 出现错误,我们想要从数以 TB  HDFS 日志文件中找到问题的原因,此时我们就可以用 Spark 加载日志文件到一组结点组成集群的 RAM 中,并交互式地进行查询。以下是代码示例:

首先行 1  HDFS 文件中创建出一个 RDD ,而行 2 则衍生出一个经过某些条件过滤后的 RDD 。行 3 将这个 RDD errors 缓存到内存中,然而第一个 RDD lines 不会驻留在内存中。这样做很有必要,因为 errors 可能非常小,足以全部装进内存,而原始数据则会非常庞大。经过缓存后,现在就可以反复重用 errors 数据了。我们这里做了两个操作,第一个是统计 errors 中包含 MySQL 字样的总行数,第二个则是取出包含 HDFS 字样的行的第三列时间,并保存成一个集合。

这里要注意的是前面曾经提到过的 Spark 的延迟处理。 Spark 调度器会将 filter  map 这两个转换保存到管道,然后一起发送给结点去计算。

2.3 转换与操作

对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD

l转换(Transformations) (如:map, filter, groupBy, join)Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

l操作(Actions) (如:count, collect, save)Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

2.3.1 转换

reduce(func)

通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行

collect()

Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDDCollect返回,很可能会让Driver程序OOM

count()

返回数据集的元素个数

take(n)

返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)

first()

返回数据集的第一个元素(类似于take1

saveAsTextFile(path)

将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本

saveAsSequenceFile(path)

将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了HadoopWritable接口,或隐式可以转换为WritableSpark包括了基本类型的转换,例如IntDoubleString等等)

foreach(func)

在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

2.3.2 操作

map(func)

返回一个新的分布式数据集,由每个原元素经过func函数转换后组成

filter(func)

返回一个新的数据集,由经过func函数后返回值为true的原元素组成

flatMap(func)

类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

flatMap(func)

类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

sample(withReplacement,  frac, seed)

根据给定的随机种子seed,随机抽样出数量为frac的数据

union(otherDataset)

返回一个新的数据集,由原数据集和参数联合而成

groupByKey([numTasks])

在一个由(K,V)对组成的数据集上调用,返回一个(KSeq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task

reduceByKey(func,  [numTasks])

在一个(KV)对的数据集上使用,返回一个(KV)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。

join(otherDataset,  [numTasks])

在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

groupWith(otherDataset,  [numTasks])

在类型为(K,V)(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup

cartesian(otherDataset)

笛卡尔积。但在数据集TU上调用时,返回一个(TU)对的数据集,所有元素交互进行笛卡尔积。

flatMap(func)

类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

2.4 依赖类型

 RDD 中将依赖划分成了两种类型:窄依赖 (Narrow Dependencies) 和宽依赖 (Wide Dependencies) 。窄依赖是指 父 RDD 的每个分区都只被子RDD 的一个分区所使用 。相应的,那么宽依赖就是指父 RDD 的分区被多个子 RDD 的分区所依赖。例如, Map 就是一种窄依赖,而 Join 则会导致宽依赖 ( 除非父 RDD  hash-partitioned ,见下图 ) 

l窄依赖(Narrow Dependencies 

Ø  RDD 的每个分区依赖于常数个父分区(即与数据规模无关)

Ø  输入输出一对一的算子,且结果RDD 的分区结构不变,主要是map flatMap

Ø  输入输出一对一,但结果RDD 的分区结构发生了变化,如union coalesce

Ø  从输入中选择部分元素的算子,如filter distinct subtract sample

l宽依赖(Wide Dependencies 

Ø  RDD 的每个分区依赖于所有父RDD 分区

Ø  对单个RDD 基于Key 进行重组和reduce,如groupByKey reduceByKey 

Ø  对两个RDD 基于Key 进行join 和重组,如join

2.5 RDD缓存

Spark可以使用 persist  cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,可以通过构建它的transformation自动重构。被缓存的 RDD 被使用的时,存取速度会被大大加速。一般的executor内存60% cache, 剩下的40%task

Spark中,RDD类可以使用cache()  persist() 方法来缓存。cache()persist()的特例,将该RDD缓存到内存中。而persist可以指定一个StorageLevelStorageLevel的列表可以在StorageLevel 伴生单例对象中找到:

object StorageLevel {

  val NONE = new StorageLevel(false, false, false, false)

  val DISK_ONLY = new StorageLevel(true, false, false, false)

  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon

}

 

// 其中,StorageLevel 类的构造器参数如下:

class StorageLevel private(  private var useDisk_ : Boolean,  private var useMemory_ : Boolean,  private var useOf

Spark的不同StorageLevel ,目的满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:

l如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快;

l如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问;

l尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快;

l如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算;

l如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法;

l在不使用cached RDD的时候,及时使用unpersist方法来释放它。

3RDD动手实战

在这里我们将对RDD的转换与操作进行动手实战,首先通过实验我们能够观测到转换的懒执行,并通过toDebugString()去查看RDDLineAge,查看RDD在运行过程中的变换过程,接着演示了从文件读取数据并进行大数据经典的单词计数实验,最后对搜狗提供的搜索数据进行查询,在此过程中演示缓存等操作。

3.1 启动Spark Shell

3.1.1 启动Hadoop

在随后的实验中将使用到HDFS文件系统,需要进行启动

$cd /app/hadoop/hadoop-2.2.0/sbin

$./start-dfs.sh

3.1.2 启动Spark

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

3.1.3 启动Spark Shell

spark客户端(这里在hadoop1节点),使用spark-shell连接集群,各个Excetor分配的核数和内存可以根据需要进行指定

$cd /app/hadoop/spark-1.1.0/bin

$./spark-shell --master spark://hadoop1:7077 --executor-memory 1024m --driver-memory 1024m

启动后查看启动情况,如下图所示:

3.2 上传测试数据

搜狗日志数据可以从http://download.labs.sogou.com/dl/q.html 下载,其中完整版大概2GB左右,文件中字段分别为:访问时间\t用户ID\t[查询词]\tURL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。其中SogouQ1.txtSogouQ2.txtSogouQ3.txt分别是用head -n 或者tail -nSogouQ数据日志文件中截取,分别包含100万,200万和1000万笔数据,这些测试数据也放在该系列配套资源的data\sogou目录下。

搜狗日志数据放在data\sogou下,把该目录下的SogouQ1.txtSogouQ2.txtSogouQ3.txt解压,然后通过下面的命令上传到HDFS/sogou目录中

cd /home/hadoop/upload/

ll sogou

tar -zxf *.gz

hadoop fs -mkdir /sogou

hadoop fs -put sogou/SogouQ1.txt /sogou

hadoop fs -put sogou/SogouQ2.txt /sogou

hadoop fs -put sogou/SogouQ3.txt /sogou

hadoop fs -ls /sogou

3.3 转换与操作

3.3.1 并行化集合例子演示

在该例子中通过parallelize方法定义了一个从1~10的数据集,然后通过map(_*2)对数据集每个数乘以2,接着通过filter(_%3==0)过滤被3整除的数字,最后使用toDebugString显示RDDLineAge,并通过collect计算出最终的结果。

val num=sc.parallelize(1 to 10)

val doublenum = num.map(_*2)

val threenum = doublenum.filter(_ % 3 == 0)

threenum.toDebugString

threenum.collect

在下图运行结果图中,我们可以看到RDDLineAge演变,通过paralelize方法建立了一个ParalleCollectionRDD,使用map()方法后该RDDMappedRDD,接着使用filter()方法后转变为FilteredRDD

 

在下图中使用collect方法时触发运行作业,通过任务计算出结果

以下语句和collect一样,都会触发作业运行

num.reduce (_ + _)

num.take(5)

num.first

num.count

num.take(5).foreach(println)

运行的情况可以通过页面进行监控,在Spark Stages页面中我们可以看到运行的详细情况,包括运行的Stage id号、Job描述、提交时间、运行时间、Stage情况等,可以点击作业描述查看更加详细的情况:

在这个页面上我们将看到三部分信息:作业的基本信息、Executor信息和Tasks的信息。特别是Tasks信息可以了解到作业的分片情况,运行状态、数据获取位置、耗费时间及所处的Executor等信息

3.3.2 Shuffle操作例子演示

在该例子中通过parallelize方法定义了K-V键值对数据集合,通过sortByKey()进行按照Key值进行排序,然后通过collect方法触发作业运行得到结果。groupByKey()为按照Key进行归组,reduceByKey(_+_)为按照Key进行累和,这三个方法的计算和前面的例子不同,因为这些RDD类型为宽依赖,在计算过程中发生了Shuffle动作。

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

kv1.sortByKey().collect

kv1.groupByKey().collect

kv1.reduceByKey(_+_).collect

调用groupByKey()运行结果

调用reduceByKey ()运行结果

我们在作业运行监控界面上能够看到:每个作业分为两个Stage,在第一个Stage中进行了Shuffle Write,在第二个Stage中进行了Shuffle Read

Stage详细运行页面中可以观察第一个Stage运行情况,内容包括:Stage运行的基本信息、每个Executor运行信息和任务的运行信息,特别在任务运行中我们可以看到任务的状态、数据读取的位置、机器节点、耗费时间和Shuffle Write时间等。

在下面进行了distinctunionjoincogroup等操作中涉及到Shuffle过程

val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))

kv2.distinct.collect

kv1.union(kv2).collect

 

val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))

kv1.join(kv3).collect

kv1.cogroup(kv3).collect

3.3.3文件例子读取

这个是大数据经典的例子,在这个例子中通过不同方式读取HDFS中的文件,然后进行单词计数,最终通过运行作业计算出结果。本例子中通过toDebugString可以看到RDD的变化,

第一步   按照文件夹读取计算每个单词出现个数

在该步骤中RDD的变换过程为:HadoopRDD->MappedRDD-> FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

val text = sc.textFile("hdfs://hadoop1:9000/class3/directory/")

text.toDebugString

val words=text.flatMap(_.split(" "))

val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)

wordscount.toDebugString

wordscount.collect

RDD类型的变化过程如下:

l  首先使用textFile()读取HDFS数据形成MappedRDD,这里有可能有疑问,从HDFS读取的数据不是HadoopRDD,怎么变成了MappedRDD。回答这个问题需要从Spark源码进行分析,在sparkContext类中的textFile()方法读取HDFS文件后,使用了map()生成了MappedRDD

l  然后使用flatMap()方法对文件内容按照空格拆分单词,拆分形成FlatMappedRDD

l  其次使用map(x=>(x(1),1))对上步骤拆分的单词形成(单词,1)数据对,此时生成的MappedRDD,最后使用reduceByKey()方法对单词的频度统计,由此生成ShuffledRDD,并由collect运行作业得出结果。

 

第二步   按照匹配模式读取计算单词个数

val rdd2 = sc.textFile("hdfs://hadoop1:9000/class3/directory/*.txt")

rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

第三步   读取gz压缩文件计算单词个数

val rdd3 = sc.textFile("hdfs://hadoop1:8000/class2/test.txt.gz")

rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

3.3.4 搜狗日志查询例子演示

搜狗日志数据可以从http://download.labs.sogou.com/dl/q.html 下载,其中完整版大概2GB左右,文件中字段分别为:访问时间\t用户ID\t[查询词]\tURL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。其中SogouQ1.txtSogouQ2.txtSogouQ3.txt分别是用head -n 或者tail -nSogouQ数据日志文件中截取,分别包含100万,200万和1000万笔数据,这些测试数据也放在该系列配套资源的data\sogou目录下。

第一步   上传测试数据

搜狗日志数据放在data\sogou下,把该目录下的SogouQ1.txtSogouQ2.txtSogouQ3.txt解压,然后通过下面的命令上传到HDFS/sogou目录中

cd /home/hadoop/upload/

ll sogou

tar -zxf *.gz

hadoop fs -mkdir /sogou

hadoop fs -put sogou/SogouQ1.txt /sogou

hadoop fs -put sogou/SogouQ2.txt /sogou

hadoop fs -put sogou/SogouQ3.txt /sogou

hadoop fs -ls /sogou

第二步   查询搜索结果排名第1点击次序排在第2的数据

val rdd1 = sc.textFile("hdfs://hadoop1:9000/sogou/SogouQ1.txt")

val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)

rdd2.count()

val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)

rdd3.count()

rdd3.toDebugString

该命令运行的过程如下:

l  首先使用textFile()读入SogouQ1.txt文件,读入后由HadoopRDD转变为MadppedRDD

l  然后通过rdd1.map(_.split("\t"))对读入数据使用\t分隔符进行拆分,拆分后RDD类型不变即为MadppedRDD,对这些拆分后的数据使用filter(_.length==6)过滤每行为6个字段的数据,这时数据变为FilteredRDD

l  运行rdd2.count()启动对rdd2计数的作业,通过运行结果可以看到该数据集为100条;

l  rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)表示对rdd2的数据的第4个字段搜索结果排名第一,第5个字段点击次序排在第二的数据进行过滤,通过count()方法运行作业得出最终的结果;

使用toDebugString可以查看rdd3RDD详细变换过程,如下图所示:

第三步   Session查询次数排行榜并把结果保存在HDFS

val rdd4 = rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)). sortByKey(false).map(x=>(x._2,x._1))

rdd4.toDebugString

rdd4.saveAsTextFile("hdfs://hadoop1:9000/class3/output1")

该命令运行的过程如下:

l  rdd4的生成比较复杂,我们分步骤进行解析,轴线map(x=>(x(1),1))是获取每行的第二个字段(用户Session)计数为1,然后reduceByKey(_+_)是安排Key进行累和,即按照用户Session号进行计数求查询次数,其次map(x=>(x._2,x._1))是把KeyValue位置互换,为后面排序提供条件,使用sortByKey(false)对数据进行按Key值进行倒排,此时需要注意的是Key为查询次数,最后通过map(x=>(x._2,x._1)再次交换KeyValue位置,得到了(用户Session号,查询次数)结果。该过程RDD的变化如下图所示:

l  计算的结果通过如下命令可以查看到,可以看到由于输入数据存放在2个节点上,所以结果也分为两个文件

hadoop fs -ls /class3/output1

这是使用HDFSgetmerge合并这两个文件并进行查看

$cd /app/hadoop/hadoop-2.2.0/bin

$hdfs dfs -getmerge hdfs://hadoop1:9000/class3/output1 /home/hadoop/upload/result

$cd /home/hadoop/upload/

$head result

4、 安装IntelliJ IDEA

IDEA 全称 IntelliJ IDEA,是java语言开发的集成环境,IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手、代码自动提示、重构、J2EE支持、AntJUnitCVS整合、代码审查、创新的GUI设计等方面的功能可以说是超常的。IDEAJetBrains公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨著称的东欧程序员为主。

IDEA每个版本提供CommunityUltimate两个版本,如下图所示,其中Community是完全免费的,而Ultimate版本可以使用30天,过这段时间后需要收费。从安装后使用对比来看,下载一个Community版本足够了。

4.1  安装软件

4.1.1 下载IDEA安装文件

可以到Jetbrains官网http://www.jetbrains.com/idea/download/,选择最新的安装文件。由于以后的练习需要在Linux开发Scala应用程序,选择Linux系统IntelliJ IDEA14,如下图所示:

】在该系列配套资源的install目录下分别提供了ideaIC-14.0.2.tar.gz(社区版)ideaIU-14.0.2.tar.gz(正式版)安装文件,对于Scala开发来说两个版本区别不大

4.1.2 解压缩并移动目录

把下载的安装文件上传到目标机器,用如下命令解压缩IntelliJ IDEA安装文件,并迁移到/app目录下:

cd /home/hadoop/upload

tar -zxf ideaIU-14.0.2.tar.gz

sudo mv idea-IU-139.659.2 /app/idea-IU

4.1.3配置/etc/profile环境变量

使用如下命令打开/etc/profile文件:

sudo vi /etc/profile

确认JDK配置变量正确配置(参见第2节《Spark编译与部署》中关于基础环境搭建介绍):

export JAVA_HOME=/usr/lib/java/jdk1.7.0_55

export PATH=$PATH:$JAVA_HOME

4.2 配置Scala环境

4.2.1 启动IntelliJ IDEA

可以通过两种方式启动IntelliJ IDEA

l  IntelliJ IDEA安装所在目录下,进入bin目录双击idea.sh启动IntelliJ IDEA

l  在命令行终端中,进入$IDEA_HOME/bin目录,输入./idea.sh进行启动

IDEA初始启动目录如下,IDEA默认情况下并没有安装Scala插件,需要手动进行安装,安装过程并不复杂,下面将演示如何进行安装。

4.2.2 下载Scala插件

参见上图,在启动界面上选择“Configure-->Plugins"选项,然后弹出插件管理界面,在该界面上列出了所有安装好的插件,由于Scala插件没有安装,需要点击”Install JetBrains plugins"进行安装,如下图所示:

待安装的插件很多,可以通过查询或者字母顺序找到Scala插件,选择插件后在界面的右侧出现该插件的详细信息,点击绿色按钮"Install plugin”安装插件,如下图所示:

安装过程将出现安装进度界面,通过该界面了解插件安装进度,如下图所示:

 

安装插件后,在启动界面中选择创建新项目,弹出的界面中将会出现"Scala"类型项目,选择后将出现提示创建的项目是仅Scala代码项目还是SBT代码项目,如下图所示:

4.2.3 设置界面主题

IntelliJ IDEA12开始起推出了Darcula 主题的全新用户界面,该界面以黑色为主题风格得到很多开发人员的喜爱,下面我们将介绍如何进行配置。在主界面中选择File菜单,然后选择Setting子菜单,如下图所示:

 

在弹出的界面中选择Appearance &BehaviorAppearance,其中Theme中选择Darcula主题,如下图所示:

保存该主题重新进入,可以看到如下图样式的开发工具,是不是很酷!

5    使用IDEA编写例子

5.1 创建项目

5.1.1 设置项目基本信息

IDEA菜单栏选择File->New Project,出现如下界面,选择创建Scala项目:

在项目的基本信息填写项目名称、项目所在位置、Project SDKScala SDK,在这里设置项目名称为class3,关于Scala SDK的安装参见第2节《Spark编译与部署》下Spark编译安装介绍:

5.1.2 设置Modules

创建该项目后,可以看到现在还没有源文件,只有一个存放源文件的目录src以及存放工程其他信息的杂项。通过双击src目录或者点击菜单上的项目结构图标打开项目配置界面,如下图所示:

Modules设置界面中,src点击右键选择“新加文件夹”添加src->main->scala目录:

Modules设置界面中,分别设置main->scala目录为Sources类型:

5.1.3 配置Library

选择Library目录,添加Scala SDK Library,这里选择scala-2.10.4版本

添加Java Library,这里选择的是在$SPARK_HOME/lib/spark-assembly-1.1.0-hadoop2.2.0.jar文件,添加完成的界面如下:

5.2 例子1:直接运行

Spark编程模型(上)<span lang="EN-US" style="margin: 0px; padding: 0px;"--<概念及Shell试验》中使用Spark-Shell进行了搜狗日志的查询,在这里我们使用IDEASession查询次数排行榜进行重新练习,可以发现借助专业的开发工具可以方便快捷许多。

5.2.1 编写代码

src->main->scala下创建class3包,在该包中添加SogouResult对象文件,具体代码如下:

 1 package class3
 2 
 3 import org.apache.spark.SparkContext._
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 object SogouResult{
 7   def main(args: Array[String]) {
 8     if (args.length == 0) {
 9       System.err.println("Usage: SogouResult <file1> <file2>")
10       System.exit(1)
11     }
12 
13     val conf = new SparkConf().setAppName("SogouResult").setMaster("local")
14     val sc = new SparkContext(conf)
15 
16     //session查询次数排行榜17     val rdd1 = sc.textFile(args(0)).map(_.split("\t")).filter(_.length==6)
18 val rdd2=rdd1.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
19     rdd2.saveAsTextFile(args(1))
20     sc.stop()
21   }
22 }

  

5.2.2 编译代码

代码在运行之前需要进行编译,可以点击菜单Build->Make Project或者Ctrl+F9对代码进行编译,编译结果会在Event Log进行提示,如果出现异常可以根据提示进行修改

5.2.3 运行环境配置

SogouResult首次运行或点击菜单Run->Edit Configurations打开"运行/调试 配置界面"

运行SogouResult时需要输入搜狗日志文件路径和输出结果路径两个参数,需要注意的是HDFS的路径参数路径需要全路径,否则运行会报错:

l  搜狗日志文件路径:使用上节上传的搜狗查询日志文件hdfs://hadoop1:9000/sogou/SogouQ1.txt

l  输出结果路径:hdfs://hadoop1:9000/class3/output2

5.2.4 运行结果查看

启动Spark集群,点击菜单Run->Run或者Shift+F10运行SogouResult,在运行结果窗口可以运行情况。当然了如果需要观察程序运行的详细过程,可以加入断点,使用调试模式根据程序运行过程。

使用如下命令查看运行结果,该结果和上节运行的结果一致

hadoop fs -ls /class3/output2 

hadoop fs -cat /class3/output2/part-00000 | less

5.3  例子2:打包运行

上个例子使用了IDEA直接运行结果,在该例子中将使用IDEA打包程序进行执行

5.3.1 编写代码

class3包中添加Join对象文件,具体代码如下:

 1 package class3
 2  
 3 import org.apache.spark.SparkContext._
 4 import org.apache.spark.{SparkConf, SparkContext}
 5  
 6 object Join{
 7   def main(args: Array[String]) {
 8     if (args.length == 0) {
 9       System.err.println("Usage: Join <file1> <file2>")
10       System.exit(1)
11     }
12  
13     val conf = new SparkConf().setAppName("Join").setMaster("local")
14     val sc = new SparkContext(conf)
15  
16     val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
17     case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float)
18     case class Click (d: java.util.Date, uuid: String, landing_page: Int)
19     val reg = sc.textFile(args(0)).map(_.split("\t")).map(r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))
20 val clk = sc.textFile(args(1)).map(_.split("\t")).map(c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)))
21     reg.join(clk).take(2).foreach(println)
22  
23     sc.stop()
24   }
25 }

5.3.2 生成打包文件

第一步   配置打包信息

在项目结构界面中选择"Artifacts",在右边操作界面选择绿色"+"号,选择添加JAR包的"From modules with dependencies"方式,出现如下界面,在该界面中选择主函数入口为Join

第二步   填写该JAR包名称和调整输出内容

【注意】的是默认情况下"Output Layout"会附带Scala相关的类包,由于运行环境已经有Scala相关类包,所以在这里去除这些包只保留项目的输出内容

第三步   输出打包文件

点击菜单Build->Build Artifacts,弹出选择动作,选择Build或者Rebuild动作

第四步   复制打包文件到Spark根目录下

cd /home/hadoop/IdeaProjects/out/artifacts/class3

cp LearnSpark.jar  /app/hadoop/spark-1.1.0/

ls /app/hadoop/spark-1.1.0/

5.3.3 运行查看结果

通过如下命令调用打包中的Join方法,运行结果如下:

cd /app/hadoop/spark-1.1.0

bin/spark-submit --master spark://hadoop1:7077 --class class3.Join --executor-memory 1g LearnSpark.jar hdfs://hadoop1:9000/class3/join/reg.tsv hdfs://hadoop1:9000/class3/join/clk.tsv

6、问题解决

6.1 出现"*** is already defined as object ***"错误

编写好SogouResult后进行编译,出现"Sogou is already as object SogouResult"的错误,

出现这个错误很可能不是程序代码的问题,很可能是使用Scala JDK版本问题,作者在使用scala-2.11.4遇到该问题,换成scala-2.10.4后重新编译该问题得到解决,需要检查两个地方配置:LibrariesGlobal Libraries分别修改为scala-2.10.4

打赏
赞(0) 打赏
未经允许不得转载:同乐学堂 » Spark编程模型:spark-shell与IDEA实战

特别的技术,给特别的你!

联系QQ:1071235258QQ群:710045715

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

error: Sorry,暂时内容不可复制!