文档章节

sparkStreaming+sql点击前十商品

JPblog
 JPblog
发布于 2017/09/08 13:49
字数 469
阅读 6
收藏 0

1. 目的

    输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志

2. 素材

    1)mysql建立product表

mysql> select * from product;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | book     |
| 003    | paper    |
+--------+----------+

    2)socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            9342 001 20170909
            4532 002 20170909
            7159 001 20170909
            5834 003 20170909
            5521 003 20170909
            2633 001 20170909

3. 代码

/**
  * Created by puwenchao on 2017-09-08.
  * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket
  */
package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

//商品点击日志格式
case class AccessLog(userid: String, itemid: String, clicktime: String)

object streaming_blacklist3 {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建所需context
    val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlc = new SQLContext(sc)
    val streamc = new StreamingContext(sc, Seconds(10))
    import sqlc.implicits._

    //从数据库中加载product表
    val productDF = sqlc.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://192.168.252.141:3306/test",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"product",
      "user"->"root",
      "password"-> "mysql"
    )).load()

    productDF.registerTempTable("product")

    //点击日志格式化处理的逻辑
    def parseLog(log: String): AccessLog = {
      val logInfo = log.split(" ")
      if (logInfo.length == 3) {
        AccessLog(logInfo(0),logInfo(1), logInfo(2))
      }
      else {
        AccessLog("0","0","0")
      }
    }

    //处理socket输入的点击日志
    //  val logLinesDStream = ssc.textFileStream("D:/logs_incoming")
    val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val accessLogsDStream = logLinesDStream.map(parseLog).cache()
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)


    //关联点击数据与产品数据,输出前十名产品信息和点击次数
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " +
         " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " +
          " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10"
        val topTenClick = sqlc.sql(sqlStr)

        //可将前十名输出到HDFS、数据库等地方
        topTenClick.show()
      }
    })

    streamc.start()
    streamc.awaitTermination()
  }
}

4. 输出

    No logs received in this time interval
    No logs received in this time interval
    +------+--------+----+
    |itemid|itemname|cnt|
    +------+--------+----+
    |   001| phone|   3|
    |   003|  paper|   2|
    |   002|   book|   1|
    +------+--------+----+

© 著作权归作者所有

共有 人打赏支持
JPblog
粉丝 10
博文 60
码字总数 37322
作品 0
朝阳
程序员
利用Python分析拼多多上卖的最热的产品, 结果出乎大多数人意料!

一、缘起 当然,我们先走第一步,开个头。现在,我们就从一个切面来窥探下。 二、我们获取了多少商品条目? 三、这些商品总共卖出了多少钱? 四、销售额前十的品类是哪些? 拼多多的主打品类...

Python新世界
08/07
0
0
垂直搜索引擎中的用户行为数据价值解析

垂直搜索引擎是网站/APP里提供的搜索窗口,让用户通过搜索关键词就直达目标内容。上一篇文章中已经做过简单解释,并且从易读性角度阐述了其三个应用阶段。 此篇文章则详细讲述用户在应用垂直...

达观数据
2017/08/10
0
0
关于淘宝API 求助

最近需要开发一套基于淘宝api,实现将登陆用户在淘宝后台仓库中的商品上架,下架操作,以及查询在售商品详情,查询其他店铺中销量前十的商品信息 等内容,就是想知道,如果调用这些api的话,...

miss_all
2017/04/06
127
2
基于盒马鲜生改编的微信小程序

前段时间,随着马化腾现身全国多地用微信小程序乘坐公交的新闻出现,微信小程序的热度可谓是更上了一层。微信小程序现身至今,因其不用下载就可使用的方便等优点,发展趋势一直良好。 盒马鲜...

TeanLee
2017/12/15
0
0
java的权重匹配算法技术求教

我是一名java开发者,最近有些技术思路需要请教各位。麻烦大家告知我相关技术资料,或者详细的解决方案即可。 特征对象:唯一编号,类型,大小,价格,标题,内容,标签,点击量,使用量,平均...

雪舞潇湘
2016/04/18
637
4

没有更多内容

加载失败,请刷新页面

加载更多

00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
今天
4
0
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
6
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
120
0
Qt编写自定义控件属性设计器

以前做.NET开发中,.NET直接就集成了属性设计器,VS不愧是宇宙第一IDE,你能够想到的都给你封装好了,用起来不要太爽!因为项目需要自从全面转Qt开发已经6年有余,在工业控制领域,有一些应用...

飞扬青云
昨天
4
0
我为什么用GO语言来做区块链?

Go语言现在常常被用来做去中心化系统(decentralised system)。其他类型的公司也都把Go用在产品的核心模块中,并且它在网站开发中也占据了一席之地。 我们在决定做Karachain的时候,考量(b...

HiBlock
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部