当前位置:网站首页 > Java基础 > 正文

java 版 spark 教程



spark-sql是用来处理结构化数据的模块,是入门spark的首要模块。

技术的学习无非就是去了解它的API,但是Spark有点难,因为它的例子和网上能搜到的基本都是Scala写的。我们这里使用Java。


数据处理的第一个例子通常都是word count,就是统计一个文件里每个单词出现了几次。我们也来试一下。

 
  

 
  

还是以我们前面的例子来改:

  1. String logFile = “words”;
  2. SparkSession spark = SparkSession.builder().appName(”Simple Application”).master(”local”).getOrCreate();
  3. Dataset<String> logData = spark.read().textFile(logFile).cache();
  4. System.out.println(”行数:” + logData.count());

这里我不再使用之前的README文件,自己创建了一个words文件,内容随意写了一堆单词。

执行程序,可以正常打印出来:

 </span></p><p><span style="font-family: 宋体; font-size: 12pt">接下来我们需要把句子分割成一个个单词合在一起,然后统计每个单词出现的次数。 

 
  

 
  

 
  

 
  

我们通过Spark的flatMap先来处理一下:

  1. Dataset&lt;String&gt; words = logData.flatMap((FlatMapFunction&lt;String, String&gt;) k -&gt; Arrays.asList(k.split(”\s”)).iterator(), Encoders.STRING());
  2. System.out.println(”单词数:” + words.count());
  3. words.foreach(k -&gt; {
  4. System.out.println(”W:” + k);
  5. });

不同于Java的流,spark这个flatMap的返回值是可以直接访问结果的:

 </span></p><p></p><pre></pre><p></p><p><span style="font-family: 宋体; font-size: 12pt">再Java中使用Scala的方法总是有些怪异,Lambda表达式前面总是需要强制类型转换,只是为了指明参数类型,否则需要new一个匿名类。 

这个也花了我不少时间,后来找到一个网页&nbsp;org.apache.spark.sql.Dataset.flatMap java code examples | Tabnine

 </span></p><p><img src="https://img2020.cnblogs.com/blog///--.png" alt=""><span style="font-family: 宋体; font-size: 12pt"> </span></p><p><span style="font-family: 宋体; font-size: 12pt">再往后我迷茫了: 

  1. KeyValueGroupedDataset&lt;String, String&gt; group = words.groupByKey((Function1&lt;String, String&gt;) k -&gt; k, Encoders.STRING());

这样我已经group好了,但是返回的不是DataSet,我也不知道这个返回有啥用,怎么拿到里面的内容呢?我费了好大劲没搞定。

比如我发现count方法会返回一个DataSet:

 </span></p><p><span style="font-family: 宋体; font-size: 12pt">看起来正是我想要的,但是当我想把它输出竟然执行报错: 

  1. count.foreach(t -&gt; {
  2. &nbsp;&nbsp;&nbsp;&nbsp;System.out.println(t);
  3. });

 </span></p><p><span style="font-family: 宋体; font-size: 12pt">别说foreach了,就算想看看里面的数量(就像一开始我们查看了文件有几行那样)都会报错,错误内容一样 

  1. count.count();

查了很多资料,大意是说spark的计算方法都是分布式的,各个任务之间需要通信,通信时需要序列化来传递信息。所以上面我们能看文件行数因为类型是String,有序列化标志;现在生成的是元组,不能序列化。我尝试了各种方法,甚至自己创建新类模拟了计算过程还是不行


查了好久资料,比如&nbsp;Job aborted due to stage failure: Task not serializable: | Databricks Spark Knowledge Base (gitbooks.io)&nbsp;依然没有解决。偶然的机会找到一个令人激动的网站&nbsp;Spark Groupby Example with DataFrame — SparkByExamples&nbsp;终于解决了我的问题。

DataFrame虽然是spark提供的重要工具,但是再Java上并没有对应的类,只是把DataSet的泛型对象改成Row而已。注意这个Row没有泛型定义,所以里面有哪些列不知道

可以从一开始就把DataSet转成DataFrame:

 </span></p><p><span style="font-family: 宋体; font-size: 12pt">但是可以看到要从Row里面拿数据比较麻烦。所以目前我只在需要序列化的地方转: 

版权声明


相关文章:

  • 尚学堂 马士兵java视频教程2025-11-07 11:42:02
  • java菜鸟教程实例2025-11-07 11:42:02
  • java流 教程2025-11-07 11:42:02
  • java教程442025-11-07 11:42:02
  • java桥接模式教程2025-11-07 11:42:02
  • java教程4732025-11-07 11:42:02
  • java实战教程49讲2025-11-07 11:42:02
  • java安装教程与设置2025-11-07 11:42:02
  • java教程高级2025-11-07 11:42:02
  • java 教程 -广告2025-11-07 11:42:02