快乐学习
前程无忧、中华英才非你莫属!

Spark入门篇– HelloWorld

一、前言

Spark这么火,越来越多的小伙伴开始搞大数据。
通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。

会通过原生的Scala的方式,传统的Java方式和java8的方式分别实现同一功能。

其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。 
以后的文章会记录如何建立集群。

另外,该系列文章会在本人闲暇时同时在 CSDN 和 简书 更新。

欢迎各位道友纠错。

二、环境搭建

C:\Users\hylexus>java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
C:\Users\hylexus>scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

Eclipse for scala:

Scala IDE build of Eclipse SDK
Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe

此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774

注意:使用的spark-core_2.11依赖的jar文件多的吓人,耐心等待下载jar吧…………^_^

2.1 scala版

pom.xml部分内容如下:

<properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.5</scala.version>
    <scala.compat.version>2.11</scala.compat.version></properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.specs2</groupId>
        <artifactId>specs2-junit_2.11</artifactId>
        <version>2.4.16</version>
    </dependency></dependencies>

<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <!-- see http://davidb.github.com/scala-maven-plugin -->
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!-- <arg>-make:transitive</arg> -->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <compilerArguments>
                    <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
                </compilerArguments>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <!-- If you have classpath issue like NoDefClassError,... -->
                <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>

    </plugins>
</build>

2.2 java版

pom.xml文件内容如下:

<properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties>

<dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.11</artifactId>

        <version>2.0.0</version>

    </dependency>

    <dependency>

        <groupId>junit</groupId>

        <artifactId>junit</artifactId>

        <version>4.11</version>

        <scope>test</scope>

    </dependency></dependencies><build>

    <plugins>

        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-compiler-plugin</artifactId>

            <version>3.5.1</version>

            <configuration>

                <source>1.8</source>

                <target>1.8</target>

                <encoding>UTF-8</encoding>

                <compilerArguments>

                    <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->

                </compilerArguments>

            </configuration>

        </plugin>

        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-surefire-plugin</artifactId>

            <version>2.18.1</version>

            <configuration>

                <useFile>false</useFile>

                <disableXmlReport>true</disableXmlReport>

                <!-- If you have classpath issue like NoDefClassError,... -->

                <!-- useManifestOnlyJar>false</useManifestOnlyJar -->

                <includes>

                    <include>**/*Test.*</include>

                    <include>**/*Suite.*</include>

                </includes>
            </configuration>
       </plugin>
  </plugins>
</build>

三、 代码

3.1 scala-低调版

object WordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("WordCount") //
            .setMaster("local")

        val sc = new SparkContext(conf)

        val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
        //获取文件内容
        val lines = sc.textFile(fileName, 1)
        //分割单词,此处仅以空格分割作为示例
        val words = lines.flatMap(line => line.split(" "))
        //String===>(word,count),count==1
        val pairs = words.map(word => (word, 1))
        //(word,1)==>(word,count)
        val result = pairs.reduceByKey((word, acc) => word + acc)
        //sort by count DESC
        val sorted=result.sortBy(e => { e._2 }, false, 1)

        val mapped=sorted.map(e => (e._2, e._1))

        mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })

        sc.stop()

3.2 scala-高调版

object WordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf
        conf.setAppName("rank test").setMaster("local")

        val sc = new SparkContext(conf)

        val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"

        sc.textFile(fileName, 1) //lines
            .flatMap(_.split(" ")) //all words
            .map(word => (word, 1)) //to pair
            .reduceByKey(_ + _) //count
            .map(e => (e._2, e._1)) //
            .sortByKey(false, 1) //
            .map(e => (e._2, e._1)) //
            .foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })

        sc.stop()

3.3 java-传统版

代码恶心的没法看啊……

到处都是匿名内部类……

还好有java8的lambda来拯救你

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();

        conf.setAppName("WordCounter")//
                .setMaster("local");

        String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties";

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile(fileName, 1);

        JavaRDD<String> words = lines
                .flatMap(new FlatMapFunction<String, String>() {
                    private static final long serialVersionUID = 1L;

                    // 以前的版本好像是Iterable而不是Iterator
                    @Override
                    public Iterator<String> call(String line) throws Exception {
                        return Arrays.asList(line.split(" ")).iterator();
                    }
                });

        JavaPairRDD<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word)
                            throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        JavaPairRDD<String, Integer> result = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer e, Integer acc)
                            throws Exception {
                        return e + acc;
                    }
                }, 1);

        result.map(
                new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(
                            Tuple2<String, Integer> v1) throws Exception {
                        return new Tuple2<>(v1._1, v1._2);
                    }
                })//
                .sortBy(new Function<Tuple2<String, Integer>, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Tuple2<String, Integer> v1)
                            throws Exception {
                        return v1._2;
                    }
                }, false, 1)//
                .foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Integer> e)
                            throws Exception {
                        System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
                    }
                });
        sc.close();

    }
}

四、 运行效果

//...............//...............
【->】出现了6次
【+】出现了5次
【import】出现了5次
【new】出现了4次
【=】出现了4次
//...............//...............

转载自“http://blog.csdn.net/hylexus/article/details/52606540” 作者:“ hylexus”

打赏

未经允许不得转载:同乐学堂 » Spark入门篇– HelloWorld

分享到:更多 ()

评论 1

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
  1. #1

    从百度进来的,博客不错哦!

    衣皇后4年前 (2017-04-04)回复

特别的技术,给特别的你!

联系QQ:1071235258QQ群:226134712
error: Sorry,暂时内容不可复制!