文档章节

sparkStreaming RDD黑名单过滤

JPblog
 JPblog
发布于 2017/09/06 15:40
字数 329
阅读 9
收藏 0

1.目的

    在线过滤掉黑名单的点击,防止刷点击刷评分刷票数等行为

2.素材

    启动linux上的netcat程序

        nc -lk 9999

    输入字符

        20170901141258  tom
        20170901141301  hadoop
        20170901141306  jesse

3.代码

/**
  * Created by puwenchao on 2017-09-06.
  */

package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object streaming_blacklist {
  def main(args:Array[(String)]): Unit ={
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建上下文,设置batch时间间隔5s
    val conf = new SparkConf().setAppName("streamingwctop3").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))


    //通过并行化数组产生黑名单RDD
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

    //从socket中接收广告点击数据:(time name),并用map转换为(name,(time,name))的格式
    val adsClickStream = ssc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }


    //通过leftOuterJoin操作既保留了左侧用户数据,又获知了用户是否在黑名单中
    adsClickStreamFormatted.transform(userClickRDD => {
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

      //filter的输入Tuple: (name,((time,name),boolean)),若_2._2存在则为黑名单需要过滤掉
      val validClicked = joinedBlackListRDD.filter(joinedItem => {
        if(joinedItem._2._2.getOrElse(false))
        {
          false
        } else {
          true
        }
      })

      validClicked.map(_._2._1)
    }).print

    /**
      * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
      */
    ssc.start()
    ssc.awaitTermination()
    
  }

}

4.输出

    (hadoop点击记录被过滤)
    -------------------------------------------
    Time: 1504682410000 ms
    -------------------------------------------
    20170901141258  tom
    20170901141306  jesse

本文转载自:http://blog.csdn.net/duan_zhihua/article/details/51307733

共有 人打赏支持
JPblog
粉丝 10
博文 60
码字总数 37322
作品 0
朝阳
程序员
第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

从昨天第一课的黑名单过滤的案例中,我們可以看見其實一個Spark Streaming 程序,里面會自動生成很多不同的作業,可以用以下的圖,去理解什麼是DStream,它跟RDD 之間有什麼不同。 簡單說 DS...

jcchoiling
2016/05/10
50
0
12.transform以及实时黑名单过滤案例实战

transform以及实时黑名单过滤案例实战 transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,...

weixin_32265569
2017/11/18
0
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
07/26
0
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.2K
0
Spark Streaming源码解读之数据清理 内幕

前文已经讲解很清晰,Spark Streaming是通过定时器按照DStream 的DAG 回溯出整个RDD的DAG。 细心的读者一定有一个疑问,随着时间的推移,生产越来越多的RDD,SparkStreaming是如何保证RDD的生...

柯里昂
2016/05/31
47
0

没有更多内容

加载失败,请刷新页面

加载更多

00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
今天
2
0
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
6
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
112
0
Qt编写自定义控件属性设计器

以前做.NET开发中,.NET直接就集成了属性设计器,VS不愧是宇宙第一IDE,你能够想到的都给你封装好了,用起来不要太爽!因为项目需要自从全面转Qt开发已经6年有余,在工业控制领域,有一些应用...

飞扬青云
昨天
4
0
我为什么用GO语言来做区块链?

Go语言现在常常被用来做去中心化系统(decentralised system)。其他类型的公司也都把Go用在产品的核心模块中,并且它在网站开发中也占据了一席之地。 我们在决定做Karachain的时候,考量(b...

HiBlock
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部