文档章节

sparkStreaming RDD黑名单过滤

JPblog
 JPblog
发布于 2017/09/06 15:40
字数 329
阅读 11
收藏 0

1.目的

    在线过滤掉黑名单的点击,防止刷点击刷评分刷票数等行为

2.素材

    启动linux上的netcat程序

        nc -lk 9999

    输入字符

        20170901141258  tom
        20170901141301  hadoop
        20170901141306  jesse

3.代码

/**
  * Created by puwenchao on 2017-09-06.
  */

package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object streaming_blacklist {
  def main(args:Array[(String)]): Unit ={
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建上下文,设置batch时间间隔5s
    val conf = new SparkConf().setAppName("streamingwctop3").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))


    //通过并行化数组产生黑名单RDD
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

    //从socket中接收广告点击数据:(time name),并用map转换为(name,(time,name))的格式
    val adsClickStream = ssc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }


    //通过leftOuterJoin操作既保留了左侧用户数据,又获知了用户是否在黑名单中
    adsClickStreamFormatted.transform(userClickRDD => {
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

      //filter的输入Tuple: (name,((time,name),boolean)),若_2._2存在则为黑名单需要过滤掉
      val validClicked = joinedBlackListRDD.filter(joinedItem => {
        if(joinedItem._2._2.getOrElse(false))
        {
          false
        } else {
          true
        }
      })

      validClicked.map(_._2._1)
    }).print

    /**
      * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
      */
    ssc.start()
    ssc.awaitTermination()
    
  }

}

4.输出

    (hadoop点击记录被过滤)
    -------------------------------------------
    Time: 1504682410000 ms
    -------------------------------------------
    20170901141258  tom
    20170901141306  jesse

本文转载自:http://blog.csdn.net/duan_zhihua/article/details/51307733

共有 人打赏支持
JPblog
粉丝 13
博文 64
码字总数 40732
作品 0
朝阳
程序员
私信 提问
第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

从昨天第一课的黑名单过滤的案例中,我們可以看見其實一個Spark Streaming 程序,里面會自動生成很多不同的作業,可以用以下的圖,去理解什麼是DStream,它跟RDD 之間有什麼不同。 簡單說 DS...

jcchoiling
2016/05/10
50
0
12.transform以及实时黑名单过滤案例实战

transform以及实时黑名单过滤案例实战 transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,...

weixin_32265569
2017/11/18
0
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
07/26
0
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.2K
0
Spark——Streaming源码解析之DAG定义

---title: sparkStreaming源码解析之DAG定义subtitle: sparkStream的DAG定义源码解析description: sparkStream的DAG定义源码解析keywords: [spark,streaming,源码,DAG]author: liyzdate: 20......

freeli
12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

tomcat编译超过64k大小的jsp文件报错原因

  今天遇到一个问题,首先是在tomcat中间件上跑的web项目,一个jsp文件,因为代码行数实在是太多了,更新了几个版本之后编译报错了,页面打开都是报500的错误,500的报错,知道http协议返回...

SEOwhywhy
9分钟前
0
0
flutter http 请求客户端

1、pubspec文件管理Flutter应用程序的assets(资源,如图片、package等)。 在pubspec.yaml中,通过网址“https://pub.dartlang.org/packages/http#-installing-tab-”确认版本号后,将http(0...

渣渣曦
9分钟前
0
0
Django基本命令及moduls举例

一、Django基本命令 1.创建项目 django-admin.py startproject mysite 创建后的项目结构:- mysite - mysite #对整个程序进行配置 - init #导入包专用- settings ...

枫叶云
24分钟前
4
0
zabbix安装

rpm -ivh http://repo.webtatic.com/yum/el6/latest.rpm 安装jdk rpm -ivh (自行在网上下载rpm包) 安装php并修改相应参数 yum -y install php56w php56w-gd php56w-mysqlnd php56w-bcmath......

muoushi
25分钟前
3
0
MySQL自增属性auto_increment_increment和auto_increment_offset

MySQL的系统变量或会话变量auto_increment_increment(自增步长)和auto_increment_offset(自增偏移量)控制着数据表的自增列ID。 mysql> show tables;Empty set (0.00 sec)mysql> CREATE TA......

野雪球
59分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部