文档章节

sparkStreaming+sql点击前十商品

JPblog
 JPblog
发布于 2017/09/08 13:49
字数 469
阅读 7
收藏 0

1. 目的

    输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志

2. 素材

    1)mysql建立product表

mysql> select * from product;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | book     |
| 003    | paper    |
+--------+----------+

    2)socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            9342 001 20170909
            4532 002 20170909
            7159 001 20170909
            5834 003 20170909
            5521 003 20170909
            2633 001 20170909

3. 代码

/**
  * Created by puwenchao on 2017-09-08.
  * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket
  */
package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

//商品点击日志格式
case class AccessLog(userid: String, itemid: String, clicktime: String)

object streaming_blacklist3 {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建所需context
    val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlc = new SQLContext(sc)
    val streamc = new StreamingContext(sc, Seconds(10))
    import sqlc.implicits._

    //从数据库中加载product表
    val productDF = sqlc.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://192.168.252.141:3306/test",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"product",
      "user"->"root",
      "password"-> "mysql"
    )).load()

    productDF.registerTempTable("product")

    //点击日志格式化处理的逻辑
    def parseLog(log: String): AccessLog = {
      val logInfo = log.split(" ")
      if (logInfo.length == 3) {
        AccessLog(logInfo(0),logInfo(1), logInfo(2))
      }
      else {
        AccessLog("0","0","0")
      }
    }

    //处理socket输入的点击日志
    //  val logLinesDStream = ssc.textFileStream("D:/logs_incoming")
    val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val accessLogsDStream = logLinesDStream.map(parseLog).cache()
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)


    //关联点击数据与产品数据,输出前十名产品信息和点击次数
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " +
         " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " +
          " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10"
        val topTenClick = sqlc.sql(sqlStr)

        //可将前十名输出到HDFS、数据库等地方
        topTenClick.show()
      }
    })

    streamc.start()
    streamc.awaitTermination()
  }
}

4. 输出

    No logs received in this time interval
    No logs received in this time interval
    +------+--------+----+
    |itemid|itemname|cnt|
    +------+--------+----+
    |   001| phone|   3|
    |   003|  paper|   2|
    |   002|   book|   1|
    +------+--------+----+

© 著作权归作者所有

共有 人打赏支持
JPblog
粉丝 13
博文 63
码字总数 39976
作品 0
朝阳
程序员
私信 提问
利用Python分析拼多多上卖的最热的产品, 结果出乎大多数人意料!

一、缘起 当然,我们先走第一步,开个头。现在,我们就从一个切面来窥探下。 二、我们获取了多少商品条目? 三、这些商品总共卖出了多少钱? 四、销售额前十的品类是哪些? 拼多多的主打品类...

Python新世界
08/07
0
0
垂直搜索引擎中的用户行为数据价值解析

垂直搜索引擎是网站/APP里提供的搜索窗口,让用户通过搜索关键词就直达目标内容。上一篇文章中已经做过简单解释,并且从易读性角度阐述了其三个应用阶段。 此篇文章则详细讲述用户在应用垂直...

达观数据
2017/08/10
0
0
java的权重匹配算法技术求教

我是一名java开发者,最近有些技术思路需要请教各位。麻烦大家告知我相关技术资料,或者详细的解决方案即可。 特征对象:唯一编号,类型,大小,价格,标题,内容,标签,点击量,使用量,平均...

雪舞潇湘
2016/04/18
637
4
关于淘宝API 求助

最近需要开发一套基于淘宝api,实现将登陆用户在淘宝后台仓库中的商品上架,下架操作,以及查询在售商品详情,查询其他店铺中销量前十的商品信息 等内容,就是想知道,如果调用这些api的话,...

miss_all
2017/04/06
127
2
日志服务-一站式配置采集Apache访问日志

日志服务推出了数据接入向导(Wizard)功能以来,不断优化接入向导的功能,支持各种数据的采集、存储、分析、离线投递, 降低用户使用日志服务门槛。本文介绍数据接入向导一站式配置采集Apa...

沐自
08/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

数据集可视化---张量场的可视表示

中国龙-扬科
26分钟前
1
0
JavaScript创建对象方法实例小结

本文实例讲述了JavaScript创建对象方法。分享给大家供大家参考,具体如下: 最简单的方式就是创建一个Object对象,并为其添加属性和方法。 //示例代码var person=new Object()person.name=...

peakedness丶
28分钟前
3
0
GO 读写锁实现原理剖析

前言 TODO:简单说明读写锁用法及规则。 读写锁数据结构 类型定义 TODO: 源码中数据结构 TODO:讲解每个成员作用 接口定义 获取写锁 释放写锁 获取读锁 释放读锁 场景分析 写锁阻止写锁 TODO...

恋恋美食
32分钟前
2
0
Java核心(二)深入理解线程池ThreadPool

本文你将获得以下信息: 线程池源码解读 线程池执行流程分析 带返回值的线程池实现 延迟线程池实现 为了方便读者理解,本文会由浅入深,先从线程池的使用开始再延伸到源码解读和源码分析等高...

王磊的博客
34分钟前
3
0
web项目中的乱码问题原理分析

Java web开发过程经常遇到乱码,本篇我们探讨一下乱码产生的原因与解决思路。 一次完整的Web请求会有4次编解码转换,如下所示。 第一次:客户端(通常为浏览器)将字符转换成TCP字节流发向服...

fame_yao
38分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部