文档章节

Spark小试牛刀

羊八井
 羊八井
发布于 2015/09/23 00:51
字数 1083
阅读 262
收藏 8

Spark小试牛刀

随着项目的运营,收集了很多的用户数据。最近业务上想做些社交图谱相关的产品,但因为数据很多、很杂,传统的数据库查询已经满足不了业务的需求。 试着用Spark来做,权当练练手了。

安装Spark

因为有Scala的开发经验,所以就不用官方提供的二进制包了,自编译scala 2.11版本。

下载Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz

tar zxf spark-1.5.0.tgz
cd spark-1.5.0
./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

以上命令完成Spark基于scala 2.11版本的编译。可以运行自带的一个示例程序来验证安装是否成功。

./bin/run-example SparkPi

编写Standalone application

使用sbt来构建一个可提交的简单Spark程序,功能是计算每个用户加入的群组,并把结果保存下来。project/Build.scala配置文件如下:

import _root_.sbt.Keys._
import _root_.sbt._
import sbtassembly.AssemblyKeys._

object Build extends Build {

  override lazy val settings = super.settings :+ {
    shellPrompt := (s => Project.extract(s).currentProject.id + " > ")
  }

  lazy val root = Project("spark-mongodb", file("."))
    .settings(
      scalaVersion := "2.11.7",
      assemblyJarName in assembly := "spark-mongodb.jar",
      assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
      libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest,
        "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll(
            ExclusionRule(organization = "javax.servlet"), 
            ExclusionRule(organization = "commons-beanutils"), 
            ExclusionRule(organization = "org.apache.hadoop")))
    )
  
  private val scopeProvidedTest = "provided,test"
  private val verSpark = "1.5.0"
}

数据存储在MongoDB数据库中,所以我们还需要使用mongo-hadoop连接器来访问MongoDB数据库。

示例程序

示例程序非常的简单,把数据从数据库里全部读出,使用map来把每条记录里用户ID对应加入的群组ID转换成一个Set,再使用 reduceByKey来把相同用户ID的set合并到一起,存入数据库即可。

import com.mongodb.BasicDBObject
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject

import scala.collection.JavaConverters._

object QQGroup {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("QQGroup")
    val sc = new SparkContext(sparkConf)

    val inputConfig = new Configuration()
    inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup")
    inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""")
    inputConfig.set("mongo.input.noTimeout", "true")

    val documentRDD = sc.newAPIHadoopRDD(
      inputConfig,
      classOf[MongoInputFormat],
      classOf[Object],
      classOf[BSONObject])

    val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), getValue(doc, "groupId"))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

    val outputConfig = new Configuration()
    outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup")

    resultRDD.saveAsNewAPIHadoopFile(
      "file://this-is-completely-unused",
      classOf[Object],
      classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]],
      outputConfig)
  }

  def getValue(dbo: BSONObject, key: String) = {
    val value = dbo.get(key)
    if (value eq null) "" else value.asInstanceOf[String]
  }
}

MongoDB官方提供了Hadoop连接器,Spark可以使用mongo-hadoop连接器来读、写MongoDB数据库。 主要的输入配置荐有:

  • mongo.input.uri: MongoDB的连接URI
  • mongo.input.fields: 指定返回哪些数据,与db.query里的第2个参数功能一样
  • mongo.input.query: MongoDB的查询参数

相应的MongoDB也提供了一系列的输出参数,如:

  • mongo.output.uri: MongoDB的连接URI

sc.newAPIHadoopRDD()方法有4个参数,分别为:配置、输入格式化类、待映射数据主键类型、待映射数据类型。

主要的操作代码:

    val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), Set(getValue(doc, "groupId")))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

先使用map方法获取userIdgroupId,并把groupId转换为一个Set

在把数据转换成Tuple2,就是一个KV的形式以后,我们就可以调用一系列的转换方法来对RDD进行操作,这里使用reduceByKey方法来将同一个userId的所以value都合并在一起。这样我们就有了所有用户对应加入的群组 的一个RDD集了。

(RDD上有两种类型的操作。一种是“变换”,它只是描述了待进行的操作指令,并不会触发实际的计算;另一种是“动作”, 它将触发实际的计算动作,这时候系统才会实际的从数据源读入数据,操作内存,保存数据等)

最后使用resultRDD.saveAsNewAPIHadoopFile()方法来把计算结果存入MongoDB,这里的一个参数:用于指定 HDFS的存储位置并不会使用到,因为mongo-hadoop将会使用mongo.output.uri指定的存储URI连接地址来保存数据。

© 著作权归作者所有

羊八井

羊八井

粉丝 93
博文 41
码字总数 51095
作品 0
朝阳
技术主管
私信 提问
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
Spark 1.2.2/1.3.1 发布,开源集群计算系统

Spark 1.2.2 和 Spark 1.3.1 发布啦!这两个版本是维护版本,超过 90 位开发者在维护 Spark。 Spark 1.2.2 包括稳定性方面的 bug 修复: Spark Core Thread safety problem in Netty shuffl...

oschina
2015/04/20
2.8K
2
Spark成为大数据高手进阶步骤

什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapRedu...

MoksMo
2015/11/05
2.1K
1
微软发布 .Net for Apache Spark :用什么语言开发大数据都可以

4 月 24 日,在 Spark+AI 峰会 上,我们很高兴地宣布推出 .NET for Apache Spark。Spark 是一种流行的开源分布式处理引擎,用于分析大型数据集。Spark 可用于处理批量数据、实时流、机器学习...

开源大数据EMR
05/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

nginx学习笔记

中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。 是连接两个独立应用程序或独立系统的软件。 web请求通过中间件可以直接调用操作系统,也可以经过中间件把请求分发到多...

码农实战
今天
5
0
Spring Security 实战干货:玩转自定义登录

1. 前言 前面的关于 Spring Security 相关的文章只是一个预热。为了接下来更好的实战,如果你错过了请从 Spring Security 实战系列 开始。安全访问的第一步就是认证(Authentication),认证...

码农小胖哥
今天
11
0
JAVA 实现雪花算法生成唯一订单号工具类

import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.util.Calendar;/** * Default distributed primary key generator. * * <p> * Use snowflake......

huangkejie
昨天
12
0
PhotoShop 色调:RGB/CMYK 颜色模式

一·、 RGB : 三原色:红绿蓝 1.通道:通道中的红绿蓝通道分别对应的是红绿蓝三种原色(RGB)的显示范围 1.差值模式能模拟三种原色叠加之后的效果 2.添加-颜色曲线:调整图像RGB颜色----R色增强...

东方墨天
昨天
11
1
将博客搬至CSDN

将博客搬至CSDN

算法与编程之美
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部