Spark world count

sbt项目

idea安装scala插件

参考:http://blog.csdn.net/a2011480169/article/details/52712421

worldCount代码编写

Scala代码版本

Main.scala

object Main {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    //test.txt在linux上的存放位置
    val line = sc.textFile("/usr/local/spark/spark-2.2.0-bin-hadoop2.7/test.txt") 
    line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect().foreach(println)
    sc.stop()
  }
}

Java代码版本

        /**
         *
         * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
         * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
         *
         * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
         * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
         * 可以这样写 :
         */
        //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
        //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) {
                return Arrays.asList(SPACE.split(s)).iterator();
            }
        });
        /**
         * map 键值对 ,类似于MR的map方法
         * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
         * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
         * 需要重写call方法实现转换
         */
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            //scala.Tuple2<K,V> call(T t)
            //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //A two-argument function that takes arguments
        // of type T1 and T2 and returns an R.
        /**
         * 调用reduceByKey方法,按key值进行reduce
         *  reduceByKey方法,类似于MR的reduce
         *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
         *  若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer
         *输出<"one", 2>
         */
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            //reduce阶段,key相同的value怎么处理的问题
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });
        //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
        // reduce方法会对输入进来的所有数据进行两两运算

        /**
         * collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
         */
        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
        ctx.stop();

spark&scala兼容性问题

我的spark版本是2.2.0,spark中jars中能查看到其运行使用的scala类库编译器都是2.11.8,如下:

  • scala-compiler-2.11.8.jar
  • scala-library-2.11.8.jar
  • scalap-2.11.8.jar
  • scala-reflect-2.11.8.jar

因此在idea中项目中build.sbt指定scalaVersion := “2.11.8”应是最合适的。 我的build.sbt配置如下:

name := "helloworld"

version := "1.0"

scalaVersion := "2.11.8"

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", xs @ _*)         => MergeStrategy.first
  case PathList("javax", "inject", xs @ _*)         => MergeStrategy.first
  case PathList("org", "aopalliance", xs @ _*)         => MergeStrategy.first
  case PathList(ps@_*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
  case PathList(ps@_*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
  case PathList(ps@_*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"

libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.0.0-rc2"

项目打包

出了利用IDE工具打包外,另一不错的选择便是通过sbt提供的插件assembly。首先在项目中project文件夹下新建plugins.sbt,附上一下配置:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

logLevel := sbt.Level.Info //日志级别,可有可无

然后在项目根目录运行:

sbt clean assembly

helloworld-assembly-1.0.jar有90M大小,OMG…

spark-submit运行

我的spark安装在: /usr/local/spark/spark-2.2.0-bin-hadoop2.7/ 然后仙剑test.txt文本,内容为:hello world hello spark 在spark根目录运行如下命令:

bin/spark-submit --class Main helloworld-assembly-1.0.jar

输出:

(spark,1)
(hello,2)
(world,1)

worldCount项目源码

https://github.com/yanlzhl/spark-worldCount

热情洋溢的程序员欢迎您以任何形式转载!