文档章节

SPARK全栈 全流程 大数据实战 之 在线异常检测(二)

hblt-j
 hblt-j
发布于 2017/01/02 19:51
字数 652
阅读 756
收藏 22

####模块二:实时黑名单过滤,实时大屏,消息预警,分类预测 因时间关系只略了些思路和样例及关键优化点

1、实时黑名单过滤,并针对线上数据量,提升系统整体吞量,并发性能,系统瓶颈优化

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

// 类单例式用checkpoint实现driver容错,并设置KAFKA并发读取和SPARK并行处理,实现毫秒级PB级响应
def functionToCreateContext(): StreamingContext = {
    val ssc: StreamingContext = ???
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
    val readParallelism = 5//KAFKA并发读取使用reserver进行数
    val topics = Map("test" -> 1)//KAFKA并发读取每reserver使用线程数
    val kafkaDStreams = (1 to readParallelism).map { _ =>
        KafkaUtils.createStream(ssc, kafkaParams, topics, ...)//改用directStream可实现 Exactly Once 
      }
    //> collection of five *input* DStreams = handled by five receivers/tasks
    val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
    //> single DStream
    val processingParallelism = 20
    val processingDStream = unionDStream(processingParallelism)//SPARK并行处理
    //> single DStream but now with 20 partitions
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory,driver容错
    ssc
  }
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
val producerPool = {
  // See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)
}

stream.map { ... }.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    // Get a producer from the shared pool//优化使用线程池实现毫秒级PB组消息高吞量,同理不同场合可使用连接池如大屏更新用redis或mapWithState,模型预测略
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
    }
    // Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)
  })
})
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()

2.KafkaUtils的createDirectStream样例

object Iteblog {  

  val brokerAddress = "http://www.iteblog.com:9092"  

  val groupID="testGroup"  

  val kafkaParams = Map[String, String](  
    "metadata.broker.list" -> brokerAddress,  
    "group.id" -> "iteblog")  

  def main(args: Array[String]) {  

    val sparkConf = new SparkConf().setAppName("Test")  
    sparkConf.set("spark.kryo.registrator", "utils.CpcKryoSerializer")  
    val sc = new SparkContext(sparkConf)  


    val ssc = new StreamingContext(sc, Seconds(2))  
    val topicsSet = Set("iteblog")  
     //KafkaUtils的createDirectStream方法,调用底层API直接消费Kafka Partition的数据(Kafka Partition和RDD Partition 一一对应)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)  
    messages.foreachRDD(rdd => {  
      // 把RDD转成HasOffsetRanges类型(KafkaRDD extends HasOffsetRanges)  
      // OffsetRange 说明:Represents a range of offsets from a single Kafka TopicAndPartition.  
      // OffsetRange 说明: Instances of this class can be created with `OffsetRange.create()`.  
      val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  
//     offsetRanges 的实现代码(KafkaRDD中):tp:TopicAndPartition,fo:fromOffset  
//      val offsetRanges = fromOffsets.map { case (tp, fo) =>  
//        val uo = untilOffsets(tp)  
//        OffsetRange(tp.topic, tp.partition, fo, uo.offset)  
//      }.toArray  

      val kc = new KafkaCluster(kafkaParams)  
      for (offsets <- offsetsList) {  
       //TopicAndPartition 主构造参数第一个是topic,第二个是 partition id  
        val topicAndPartition = TopicAndPartition("iteblog", offsets.partition) //offsets.partition表示的是Kafka partition id  
        val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset)))//offsets.untilOffset:是  
        if (o.isLeft) {  
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")  
        }  
      }  
    })  

    ssc.start()  
    ssc.awaitTermination()  
    ssc.stop()  
  }  
}  

© 著作权归作者所有

hblt-j
粉丝 24
博文 218
码字总数 73000
作品 0
海淀
架构师
私信 提问
SequoiaDB x Spark 新主流架构引领企业级应用

6月,汇集当今大数据界精英的Spark Summit 2017盛大召开,Spark作为当今最炙手可热的大数据技术框架,向全世界展示了最新的技术成果、生态体系及未来发展规划。 巨杉作为业内领先的分布式数据...

巨杉数据库
2017/07/03
6
0
Spark(二) -- Spark简单介绍

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45648737 spark是什么? spark开源的类Hadoop MapReduce的通用的并行计算框架 ...

jchubby
2015/05/11
0
0
上海大数据实战开发转型—程序员未来的筹码

活动将长期(每周六)举行,报名后我们会第一时间与您联系 活动流程 13:30 签到 14:00 老师分享 16:00 互动交流 16:30 活动结束 (Hadoop) (含项目实战) NoSQL专题(含HBase) Hadoop企业项...

简直是天才
2018/05/15
18
0
上海大数据实战开发转型—程序员未来的筹码

活动将长期(每周六)举行,报名后我们会第一时间与您联系 活动流程 13:30 签到 14:00 老师分享 16:00 互动交流 16:30 活动结束 (Hadoop) (含项目实战) NoSQL专题(含HBase) Hadoop企业项...

简直是天才
2018/05/15
84
0
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
783
11
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
15
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部