文档章节

101.Spark实现聚合功能(面试题)

片刻
 片刻
发布于 2016/04/29 12:52
字数 635
阅读 1449
收藏 6
互联网公司-面试题:
/**  
举个例子,比如要统计用户的总访问次数和去除访问同一个URL之后的总访问次数,随便造了几条样例数据(四个字段:id,name,vtm,url,vtm字段本例没用,不用管)如下:

id1,user1,2,http://www.hupu.com
id1,user1,2,http://www.hupu.com
id1,user1,3,http://www.hupu.com
id1,user1,100,http://www.hupu.com
id2,user2,2,http://www.hupu.com
id2,user2,1,http://www.hupu.com
id2,user2,50,http://www.hupu.com
id2,user2,2,http://touzhu.hupu.com

根据这个数据集,我们可以写hql 实现:
select id,name, count(0) as ct,count(distinct url) as urlcount 
from table 
group by id,name;

得出结果应该是:

id1,user1,4,1
id2,user2,4,2

下面用Spark实现这个聚合功能<发现Spark还是有难度的,卧槽>
简单说说MR的解析过程:

map阶段: id和name组合为key, url为value
reduce阶段: len(urls) 出现次数, len(set(urls)) 出现用户数
由于本人是不写MR,导致面试很尴尬。
想装逼写个Spark, 发现难度很大,因为的确很多函数不熟悉。

代码如下:

import org.apache.spark.SparkContext._  
import org.apache.spark._   

object SparkDemo2 {  
  def main(args: Array[String]) {  
  
    case class User(id: String, name: String, vtm: String, url: String)  
    //val rowkey = (new RowKey).evaluate(_)  
    // val HADOOP_USER = "hdfs"  
    // 设置访问spark使用的用户名  
    // System.setProperty("user.name", HADOOP_USER);  
    // 设置访问hadoop使用的用户名  
    // System.setProperty("HADOOP_USER_NAME", HADOOP_USER);  
  
    val conf = new SparkConf().setAppName("wordcount").setMaster("local") //.setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER)  
    val sc = new SparkContext(conf)  
    val data = sc.textFile("/Users/jiangzl/Desktop/test.txt")  
    val rdd1 = data.map(line => {  
      val r = line.split(",")  
      User(r(0), r(1), r(2), r(3))  
    })
    val rdd2 = rdd1.map(r => ((r.id, r.name), r))  
  
    val seqOp = (a: (Int, List[String]), b: User) => a match {  
      case (0, List()) => (1, List(b.url))  
      case _ => (a._1 + 1, b.url :: a._2)  
    }  
  
    val combOp = (a: (Int, List[String]), b: (Int, List[String])) => {  
      (a._1 + b._1, a._2 ::: b._2)  
    }
  
    println("-----------------------------------------")  
    val rdd3 = rdd2.aggregateByKey((0, List[String]()))(seqOp, combOp).map(a => {  
      (a._1, a._2._1, a._2._2.distinct.length)  
    })  
    rdd3.collect.foreach(println)  
    println("-----------------------------------------")  
    sc.stop()  
  }  
}

解决方案-报错Scala版本问题:Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;

修改Scala版本2.11.7改为2.10.4

simple.sbt

name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

———————————————————————————修改为:——————————————————————————

name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

运行过程

jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.11/simple-project_2.11-1.0.jar
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;
    at SparkDemo2$.main(tmp_spark.scala)
    at SparkDemo2.main(tmp_spark.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

———————————————————————————修改为:——————————————————————————

jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.10/sparkdemo-project_2.10-1.0.jar
16/04/29 12:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-----------------------------------------
((id1,user1),4,1)
((id2,user2),4,2)
-----------------------------------------

本文转载自:http://blog.csdn.net/xiao_jun_0820/article/details/44223743

片刻
粉丝 107
博文 269
码字总数 306754
作品 0
海淀
高级程序员
私信 提问
Mycat - 数据库分库分表中间件,国内最活跃的、性能最好的开源数据库中间件!

Mycat是什么 Mycat - 数据库分库分表中间件,国内最活跃的、性能最好的开源数据库中间件! 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 ...

架构之路
2017/12/05
0
0
推荐一个在线创作流程图、思维导图软件—ProcessOn

最近要画流程图,破解了半天Visio2016没搞定,2016的估计都被封了,Visio收费又过贵,又不想折腾低版本的破解,所以找了个在线画图平台ProcessOn,没想到还出乎人意料,完全可以达到预期效果...

架构之路
2017/12/18
0
1
年末干货!Java技术栈2017年度精选干货总结

Java技术栈2017年总结 2017年即将收尾了 这一年,满满的都是干货 这一年,我们的更新不曾停歇 这一年,你装逼内功应已有所成 我是小猿,下面是本年度的分享知识图谱 看完是不是有点蒙逼了?没...

架构之路
2017/12/24
0
0
java多线程--几个多线程面试题小结

  自学了一段时间的多线程知识,尝试了做了几个编程题,发现想象中很简单的功能,自己真写起来要花费远超自己想象的功夫,知识点易学,不易用啊. 面试题1:编写程序实现,子线程循环10次,接着主线...

冬至饮雪
2016/04/04
0
0
30道shell编程题目

题目出自:http://oldboy.blog.51cto.com/2561410/1632876 提供自己做的答案,仅供学习测试用。 企业面试题1:(生产实战案例):监控MySQL主从同步是否异常,如果异常,则发送短信或者邮件给...

出VU时代
2016/09/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

自建redis笔记

自建redis笔记 最近在linux安装了一下redis,特做一些笔记! 本文先单节点启动redis,然后再进行持久化配置,在次基础上,再分享搭建主从模式的配置以及Sentinel 哨兵模式及集群的搭建 单节点...

北极之北
30分钟前
4
0
vue+element之多表单验证

方法一:利用promise var p1=new Promise(function(resolve, reject) { this.$refs[form1].validate((valid) => { if(valid){ ......

沉迷代码我爱学习
32分钟前
4
0
golang 1.13 errors 包 新函数介绍

引 这次 errors 包算重量级更新。很有更能把以前的一些设计模式给推到。下面聊下用法。 error 装包 以前返回一个错误,想要保存 error 链,还要定义结构体保存以前的 error 信息。感兴趣看下...

guonaihong
41分钟前
55
1
并发编程之线程池

一、线程池 1、什么是线程池 Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。 第一:降...

codeobj
43分钟前
6
0
知识点总结思维导图模板分享,良心安利,建议收藏

思维导图经常被用在学习中,对大脑思维进行发散,对知识进行记忆。使用思维导图可以让知识更加简单更有层次。下面是利用思维导图所绘制的几款知识点总结思维导图模板,大家可以进行进行参考使...

干货趣分享
47分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部