文档章节

sparkStreaming RDD黑名单过滤

JPblog
 JPblog
发布于 2017/09/06 15:40
字数 329
阅读 9
收藏 0
点赞 0
评论 0

1.目的

    在线过滤掉黑名单的点击,防止刷点击刷评分刷票数等行为

2.素材

    启动linux上的netcat程序

        nc -lk 9999

    输入字符

        20170901141258  tom
        20170901141301  hadoop
        20170901141306  jesse

3.代码

/**
  * Created by puwenchao on 2017-09-06.
  */

package Streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object streaming_blacklist {
  def main(args:Array[(String)]): Unit ={
    //设定日志等级
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建上下文,设置batch时间间隔5s
    val conf = new SparkConf().setAppName("streamingwctop3").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))


    //通过并行化数组产生黑名单RDD
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

    //从socket中接收广告点击数据:(time name),并用map转换为(name,(time,name))的格式
    val adsClickStream = ssc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }


    //通过leftOuterJoin操作既保留了左侧用户数据,又获知了用户是否在黑名单中
    adsClickStreamFormatted.transform(userClickRDD => {
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

      //filter的输入Tuple: (name,((time,name),boolean)),若_2._2存在则为黑名单需要过滤掉
      val validClicked = joinedBlackListRDD.filter(joinedItem => {
        if(joinedItem._2._2.getOrElse(false))
        {
          false
        } else {
          true
        }
      })

      validClicked.map(_._2._1)
    }).print

    /**
      * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
      */
    ssc.start()
    ssc.awaitTermination()
    
  }

}

4.输出

    (hadoop点击记录被过滤)
    -------------------------------------------
    Time: 1504682410000 ms
    -------------------------------------------
    20170901141258  tom
    20170901141306  jesse

本文转载自:http://blog.csdn.net/duan_zhihua/article/details/51307733

共有 人打赏支持
JPblog
粉丝 10
博文 51
码字总数 27225
作品 0
朝阳
程序员
第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

从昨天第一课的黑名单过滤的案例中,我們可以看見其實一個Spark Streaming 程序,里面會自動生成很多不同的作業,可以用以下的圖,去理解什麼是DStream,它跟RDD 之間有什麼不同。 簡單說 DS...

jcchoiling ⋅ 2016/05/10 ⋅ 0

12.transform以及实时黑名单过滤案例实战

transform以及实时黑名单过滤案例实战 transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,...

weixin_32265569 ⋅ 2017/11/18 ⋅ 0

Spark1.0.1 生态圈

Spark Spark是一个快速的通用大规模数据处理系统,和Hadoop MapRedeuce相比: 更好的容错性和内存计算 高速,在内存中运算100倍速度于MapReduce 易用,相同的应用程序代码量要比MapReduce少2...

时间在追我 ⋅ 2014/07/24 ⋅ 0

第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫 ⋅ 2016/05/08 ⋅ 0

Spark Streaming源码解读之数据清理 内幕

前文已经讲解很清晰,Spark Streaming是通过定时器按照DStream 的DAG 回溯出整个RDD的DAG。 细心的读者一定有一个疑问,随着时间的推移,生产越来越多的RDD,SparkStreaming是如何保证RDD的生...

柯里昂 ⋅ 2016/05/31 ⋅ 0

基于案例贯通Spark Streaming流计算框架运行源码12

上文提到Job已经创建完成。 先贴案例 再来回溯下触发过程。 定时器定时触发执行某个方法。这里是 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),将一个 GenerateJobs 类型...

柯里昂 ⋅ 2016/05/12 ⋅ 0

第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操...

葛晨鑫 ⋅ 2016/05/14 ⋅ 0

SparkStreaming 性能调优

在开发Spark Streaming应用程序时,要结合集群中各节点的配置情况尽可能地提高数据处理的实时性。在调优的过程中,一方面要尽可能利用集群资源来减少每个批处理的时间;另一方面要确保接收到...

ChinaUnicom110 ⋅ 2017/10/11 ⋅ 0

sparkStreaming SQL黑名单过滤

1.目的 在线过滤掉黑名单的点击,防止刷点击刷评分刷票数等行为 2.素材 1)mysql建立blacklist表 2) socket输入模拟点击log 启动linux上的netcat程序 nc -lk 9999 输入字符 20170901141258...

JPblog ⋅ 2017/09/30 ⋅ 0

Ls 1 - Understanding the nature of Spark Streaming

What is Spark Streaming? According to the Official Apache Spark website, Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tole......

jcchoiling ⋅ 2016/05/09 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

volatile和synchronized的区别

volatile和synchronized的区别 在讲这个之前需要先了解下JMM(Java memory Model :java内存模型):并发过程中如何处理可见性、原子性、有序性的问题--建立JMM模型 详情请看:https://baike.b...

MarinJ_Shao ⋅ 31分钟前 ⋅ 0

深入分析Kubernetes Critical Pod(一)

Author: xidianwangtao@gmail.com 摘要:大家在部署Kubernetes集群AddOn组件的时候,经常会看到Annotation scheduler.alpha.kubernetes.io/critical-pod"="",以表示这是一个关键服务,那你知...

WaltonWang ⋅ 38分钟前 ⋅ 0

原子性 - synchronized关键词

原子性概念 原子性提供了程序的互斥操作,同一时刻只能有一个线程能对某块代码进行操作。 原子性的实现方式 在jdk中,原子性的实现方式主要分为: synchronized:关键词,它依赖于JVM,保证了同...

dotleo ⋅ 45分钟前 ⋅ 0

【2018.06.22学习笔记】【linux高级知识 14.4-15.3】

14.4 exportfs命令 14.5 NFS客户端问题 15.1 FTP介绍 15.2/15.3 使用vsftpd搭建ftp

lgsxp ⋅ 55分钟前 ⋅ 0

JeeSite 4.0 功能权限管理基础(Shiro)

Shiro是Apache的一个开源框架,是一个权限管理的框架,实现用户认证、用户授权等。 只要有用户参与一般都要有权限管理,权限管理实现对用户访问系统的控制,按照安全规则或者安全策略控制用户...

ThinkGem ⋅ 昨天 ⋅ 0

python f-string 字符串格式化

主要内容 从Python 3.6开始,f-string是格式化字符串的一种很好的新方法。与其他格式化方式相比,它们不仅更易读,更简洁,不易出错,而且速度更快! 在本文的最后,您将了解如何以及为什么今...

阿豪boy ⋅ 昨天 ⋅ 0

Python实现自动登录站点

如果我们想要实现自动登录,那么我们就需要能够驱动浏览器(比如谷歌浏览器)来实现操作,ChromeDriver 刚好能够帮助我们这一点(非谷歌浏览器的驱动有所不同)。 一、确认软件版本 首先我们...

blackfoxya ⋅ 昨天 ⋅ 0

线性回归原理和实现基本认识

一:介绍 定义:线性回归在假设特证满足线性关系,根据给定的训练数据训练一个模型,并用此模型进行预测。为了了解这个定义,我们先举个简单的例子;我们假设一个线性方程 Y=2x+1, x变量为商...

wangxuwei ⋅ 昨天 ⋅ 0

容器之查看minikue的environment——minikube的环境信息

执行如下命令 mjduan@mjduandeMacBook-Pro:~/Docker % minikube docker-envexport DOCKER_TLS_VERIFY="1"export DOCKER_HOST="tcp://192.168.99.100:2376"export DOCKER_CERT_PATH="/U......

汉斯-冯-拉特 ⋅ 昨天 ⋅ 0

mysql远程连接不上

设置了root所有hosts远程登录,可是远程登录还是失败,原因可能如下: 登录本地数据库 mysql -uroot -p123456 查询用户表 mysql> select user,host,password from mysql.user; 删除密码为空的...

冰公子 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部