文档章节

sparkStreaming+sql点击前十商品

JPblog
 JPblog
发布于 2017/09/08 13:49
字数 469
阅读 6
收藏 0
点赞 0
评论 0

1. 目的

    输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志

2. 素材

    1)mysql建立product表

mysql> select * from product;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | book     |
| 003    | paper    |
+--------+----------+

    2)socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            9342 001 20170909
            4532 002 20170909
            7159 001 20170909
            5834 003 20170909
            5521 003 20170909
            2633 001 20170909

3. 代码

/**
  * Created by puwenchao on 2017-09-08.
  * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket
  */
package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

//商品点击日志格式
case class AccessLog(userid: String, itemid: String, clicktime: String)

object streaming_blacklist3 {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建所需context
    val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlc = new SQLContext(sc)
    val streamc = new StreamingContext(sc, Seconds(10))
    import sqlc.implicits._

    //从数据库中加载product表
    val productDF = sqlc.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://192.168.252.141:3306/test",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"product",
      "user"->"root",
      "password"-> "mysql"
    )).load()

    productDF.registerTempTable("product")

    //点击日志格式化处理的逻辑
    def parseLog(log: String): AccessLog = {
      val logInfo = log.split(" ")
      if (logInfo.length == 3) {
        AccessLog(logInfo(0),logInfo(1), logInfo(2))
      }
      else {
        AccessLog("0","0","0")
      }
    }

    //处理socket输入的点击日志
    //  val logLinesDStream = ssc.textFileStream("D:/logs_incoming")
    val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val accessLogsDStream = logLinesDStream.map(parseLog).cache()
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)


    //关联点击数据与产品数据,输出前十名产品信息和点击次数
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " +
         " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " +
          " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10"
        val topTenClick = sqlc.sql(sqlStr)

        //可将前十名输出到HDFS、数据库等地方
        topTenClick.show()
      }
    })

    streamc.start()
    streamc.awaitTermination()
  }
}

4. 输出

    No logs received in this time interval
    No logs received in this time interval
    +------+--------+----+
    |itemid|itemname|cnt|
    +------+--------+----+
    |   001| phone|   3|
    |   003|  paper|   2|
    |   002|   book|   1|
    +------+--------+----+

© 著作权归作者所有

共有 人打赏支持
JPblog
粉丝 10
博文 51
码字总数 27225
作品 0
朝阳
程序员
垂直搜索引擎中的用户行为数据价值解析

垂直搜索引擎是网站/APP里提供的搜索窗口,让用户通过搜索关键词就直达目标内容。上一篇文章中已经做过简单解释,并且从易读性角度阐述了其三个应用阶段。 此篇文章则详细讲述用户在应用垂直...

达观数据 ⋅ 2017/08/10 ⋅ 0

java的权重匹配算法技术求教

我是一名java开发者,最近有些技术思路需要请教各位。麻烦大家告知我相关技术资料,或者详细的解决方案即可。 特征对象:唯一编号,类型,大小,价格,标题,内容,标签,点击量,使用量,平均...

雪舞潇湘 ⋅ 2016/04/18 ⋅ 4

基于盒马鲜生改编的微信小程序

前段时间,随着马化腾现身全国多地用微信小程序乘坐公交的新闻出现,微信小程序的热度可谓是更上了一层。微信小程序现身至今,因其不用下载就可使用的方便等优点,发展趋势一直良好。 盒马鲜...

TeanLee ⋅ 2017/12/15 ⋅ 0

为电子商务网站添加在线支付功能

网页制作Webjx文章简介:为了方便买家们在网站购物时能够在线支付货款并有效提升网站档次,作为购物网站的站长有必要给网站增添在线支付功能,这也是商务网站发展的大势所趋。拥有在线支付功...

james_laughing ⋅ 2015/01/15 ⋅ 0

关于淘宝API 求助

最近需要开发一套基于淘宝api,实现将登陆用户在淘宝后台仓库中的商品上架,下架操作,以及查询在售商品详情,查询其他店铺中销量前十的商品信息 等内容,就是想知道,如果调用这些api的话,...

miss_all ⋅ 2017/04/06 ⋅ 2

android操作前的登录检查

小弟有个问题,类似京东或者其他一些有会员系统的应用,在进行一些操作的时候会检查登录状态。比如点击我的订单或者点击某个商品的收藏按钮的时候,如果发现没有登录,就会自动跳转到登录界面...

林泳坛 ⋅ 2015/10/16 ⋅ 2

点击加入购物车直接转到购物车页面

有的网站管理员希望自己的网站在顾客点击网站上的商品时,能够直接转到购物车页面进行结账。 1. 安装本插件前建议先安装另一个插件: http://www.mycncart.com/index.php?route=product/prod...

OpenCart中国 ⋅ 2015/05/28 ⋅ 0

设置金蝶kis记账王会计科目的教程

完成新建账套工作之后,金蝶KIS记账王进入初始化设置界面,需要录入的初始资料包括币别、会计科目初始数据和明细科目。下文着重讲解如何设置会计科目,在初始化界面点击“会计科目”,进入“...

石沉大海 ⋅ 2016/12/07 ⋅ 0

细化迭代二之输入设计

细化迭代二之输入设计 1,登陆设计 (1),该超市员工号为6位数字,如020001~工号输入也只能填写数字~ (2),密码使用<input type=“password”/> (3),具有表单验证,按登陆的时候,会检验...

Allen_Chou ⋅ 2015/04/17 ⋅ 0

刷关键词是什么意思?有什么用?真的可以刷么?

     在这个浮躁的社会,不管各行各业都越来越讲究一个“快”字,不论从事什么行业都在追求“快速见效”,当然了这也符合咱们正常的心理和思维方式。刷关键词是什么意思?在SEO的领域说起...

seoyyedu ⋅ 2017/11/19 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

CENTOS7防火墙命令记录

安装Firewall命令: yum install firewalld firewalld-config Firewall开启常见端口命令: firewall-cmd --zone=public --add-port=80/tcp --permanent firewall-cmd --zone=public --add-po......

cavion ⋅ 21分钟前 ⋅ 0

【C++】【STL】利用chromo来测量程序运行时间与日志时间打印精确到微秒

直接上代码吧,没啥好说的。头疼。 #include <iostream>#include <string>#include <ctime>#include <sstream>#include <iomanip>#include <thread>#include <chrono>using ......

muqiusangyang ⋅ 24分钟前 ⋅ 0

Mac环境下svn的使用

在Windows环境中,我们一般使用TortoiseSVN来搭建svn环境。在Mac环境下,由于Mac自带了svn的服务器端和客户端功能,所以我们可以在不装任何第三方软件的前提下使用svn功能,不过还需做一下简...

故久呵呵 ⋅ 34分钟前 ⋅ 0

破解公司回应苹果“USB限制模式”:已攻破

本周四,苹果发表声明称 iOS 中加入了一项名为“USB 限制模式”的功能,可以防止 iPhone 在连接其他设备的时候被破解,并且强调这一功能并不是针对 FBI 等执法部门,为的是保护用户数据安全。...

六库科技 ⋅ 35分钟前 ⋅ 0

MyBtais整合Spring Boot整合,TypeHandler对枚举类(enum)处理

概要 问题描述 我想用枚举类来表示用户当前状态,枚举类由 code 和 msg 组成,但我只想把 code 保存到数据库,查询处理,能知道用户当前状态,这应该怎么做呢?在 Spring 整合MyBatis 的时候...

Wenyi_Feng ⋅ 54分钟前 ⋅ 0

synchronized与Lock的区别

# <center>王梦龙的读书笔记第一篇</center> ## <center>-synchronized与Lock的区别</centre> ###一、从使用场景来说 + synchronized 是能够注释代码块、类、方法但是它的加锁是和解锁使用一......

我不想加班 ⋅ 今天 ⋅ 0

VConsole的使用

手机端控制台打印输出,方便bug的排查。 首先需要引入vconsole.min.js 文件,然后在文件中创造实例。就能直接使用了。 var vConsole = new VConsole(); vConsole的文件地址...

大美琴 ⋅ 今天 ⋅ 0

Java NIO之字符集

1 字符集和编解码的概念 首先,解释一下什么是字符集。顾名思义,就是字符的集合。它的初衷是把现实世界的符号映射为计算机可以理解的字节。比如我创造一个字符集,叫做sex字符集,就包含两个...

士别三日 ⋅ 今天 ⋅ 0

Spring Bean基础

1、Bean之间引用 <!--如果Bean配置在同一个XML文件中,使用local引用--><ref bean="someBean"/><!--如果Bean配置在不同的XML文件中,使用ref引用--><ref local="someBean"/> 其实两种......

霍淇滨 ⋅ 今天 ⋅ 0

05、基于Consul+Upsync+Nginx实现动态负载均衡

1、Consul环境搭建 下载consul_0.7.5_linux_amd64.zip到/usr/local/src目录 cd /usr/local/srcwget https://releases.hashicorp.com/consul/0.7.5/consul_0.7.5_linux_amd64.zip 解压consu......

北岩 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部