文档章节

sparkStreaming+sql点击前十商品

JPblog
 JPblog
发布于 2017/09/08 13:49
字数 469
阅读 7
收藏 0

1. 目的

    输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志

2. 素材

    1)mysql建立product表

mysql> select * from product;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | book     |
| 003    | paper    |
+--------+----------+

    2)socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            9342 001 20170909
            4532 002 20170909
            7159 001 20170909
            5834 003 20170909
            5521 003 20170909
            2633 001 20170909

3. 代码

/**
  * Created by puwenchao on 2017-09-08.
  * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket
  */
package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

//商品点击日志格式
case class AccessLog(userid: String, itemid: String, clicktime: String)

object streaming_blacklist3 {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建所需context
    val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlc = new SQLContext(sc)
    val streamc = new StreamingContext(sc, Seconds(10))
    import sqlc.implicits._

    //从数据库中加载product表
    val productDF = sqlc.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://192.168.252.141:3306/test",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"product",
      "user"->"root",
      "password"-> "mysql"
    )).load()

    productDF.registerTempTable("product")

    //点击日志格式化处理的逻辑
    def parseLog(log: String): AccessLog = {
      val logInfo = log.split(" ")
      if (logInfo.length == 3) {
        AccessLog(logInfo(0),logInfo(1), logInfo(2))
      }
      else {
        AccessLog("0","0","0")
      }
    }

    //处理socket输入的点击日志
    //  val logLinesDStream = ssc.textFileStream("D:/logs_incoming")
    val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val accessLogsDStream = logLinesDStream.map(parseLog).cache()
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)


    //关联点击数据与产品数据,输出前十名产品信息和点击次数
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " +
         " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " +
          " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10"
        val topTenClick = sqlc.sql(sqlStr)

        //可将前十名输出到HDFS、数据库等地方
        topTenClick.show()
      }
    })

    streamc.start()
    streamc.awaitTermination()
  }
}

4. 输出

    No logs received in this time interval
    No logs received in this time interval
    +------+--------+----+
    |itemid|itemname|cnt|
    +------+--------+----+
    |   001| phone|   3|
    |   003|  paper|   2|
    |   002|   book|   1|
    +------+--------+----+

© 著作权归作者所有

共有 人打赏支持
JPblog
粉丝 14
博文 64
码字总数 40732
作品 0
朝阳
程序员
私信 提问
利用Python分析拼多多上卖的最热的产品, 结果出乎大多数人意料!

一、缘起 当然,我们先走第一步,开个头。现在,我们就从一个切面来窥探下。 二、我们获取了多少商品条目? 三、这些商品总共卖出了多少钱? 四、销售额前十的品类是哪些? 拼多多的主打品类...

Python新世界
2018/08/07
0
0
为电子商务网站添加在线支付功能

网页制作Webjx文章简介:为了方便买家们在网站购物时能够在线支付货款并有效提升网站档次,作为购物网站的站长有必要给网站增添在线支付功能,这也是商务网站发展的大势所趋。拥有在线支付功...

james_laughing
2015/01/15
0
0
实现一个扫描商品条码进行评价或留言的小程序(附源码)

1.功能介绍 对准上面的小程序码,扫一下,‘嘀’~地一声,扫码成功,打开小程序,进入小程序首页。 因为手头上没有可乐,所以我找来了一只非常专业的猫,扮演一瓶330ml的可口可乐演示给大家看...

秋収冬藏
2018/09/04
0
0
4月10日16:00

发起活动部分: 1、把“创建新的活动模板”改为“创建新活动” 2、“最多参加人数”改为“活动人数上限” 3、在我的活动里, 点击进某个活动后,活动详情下面直接显示报名的商品清单,每个商...

zq6
2015/04/10
2
2
点击加入购物车直接转到购物车页面

有的网站管理员希望自己的网站在顾客点击网站上的商品时,能够直接转到购物车页面进行结账。 1. 安装本插件前建议先安装另一个插件: http://www.mycncart.com/index.php?route=product/prod...

OpenCart中国
2015/05/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

centos7重置密码、单用户模式、救援模式、ls命令、chmod命令

在工作当中如果我们错误的配置了文件使服务器不能正常启动或者忘记密码不能登录系统,如何解决这些问题呢?重装系统是可以实现的,但是往往不能轻易重装系统的,下面用忘记密码作为例子讲解如...

李超小牛子
今天
3
0
Python如何开发桌面应用程序?Python基础教程,第十三讲,图形界面

当使用桌面应用程序的时候,有没有那么一瞬间,想学习一下桌面应用程序开发?行业内专业的桌面应用程序开发一般是C++,C#来做,Java开发的也有,但是比较少。本节课会介绍Python的GUI(图形用...

程序员补给栈
今天
5
0
kafka在的使用

一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。 这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统...

狼王黄师傅
今天
3
0
Android JNI总结

0x01 JNI介绍 JNI是Java Native Interface的缩写,JNI不是Android专有的东西,它是从Java继承而来,但是在Android中,JNI的作用和重要性大大增强。 JNI在Android中起着连接Java和C/C++层的作...

天王盖地虎626
昨天
3
0
大数据教程(11.8)Hive1.2.2简介&初体验

上一篇文章分析了Hive1.2.2的安装,本节博主将分享Hive的体验&Hive服务端和客户端的使用方法。 一、Hive与hadoop直接的关系 Hive利用HDFS存储数据,利用MapReduce查询数据。 二、Hive与传统数...

em_aaron
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部