SPARK全栈 全流程 大数据实战 之 在线异常检测(二)

原创
2017/01/02 19:51
阅读数 1K

####模块二:实时黑名单过滤,实时大屏,消息预警,分类预测 因时间关系只略了些思路和样例及关键优化点

1、实时黑名单过滤,并针对线上数据量,提升系统整体吞量,并发性能,系统瓶颈优化

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

// 类单例式用checkpoint实现driver容错,并设置KAFKA并发读取和SPARK并行处理,实现毫秒级PB级响应
def functionToCreateContext(): StreamingContext = {
    val ssc: StreamingContext = ???
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
    val readParallelism = 5//KAFKA并发读取使用reserver进行数
    val topics = Map("test" -> 1)//KAFKA并发读取每reserver使用线程数
    val kafkaDStreams = (1 to readParallelism).map { _ =>
        KafkaUtils.createStream(ssc, kafkaParams, topics, ...)//改用directStream可实现 Exactly Once 
      }
    //> collection of five *input* DStreams = handled by five receivers/tasks
    val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
    //> single DStream
    val processingParallelism = 20
    val processingDStream = unionDStream(processingParallelism)//SPARK并行处理
    //> single DStream but now with 20 partitions
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory,driver容错
    ssc
  }
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
val producerPool = {
  // See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)
}

stream.map { ... }.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    // Get a producer from the shared pool//优化使用线程池实现毫秒级PB组消息高吞量,同理不同场合可使用连接池如大屏更新用redis或mapWithState,模型预测略
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
    }
    // Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)
  })
})
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()

2.KafkaUtils的createDirectStream样例

object Iteblog {  

  val brokerAddress = "http://www.iteblog.com:9092"  

  val groupID="testGroup"  

  val kafkaParams = Map[String, String](  
    "metadata.broker.list" -> brokerAddress,  
    "group.id" -> "iteblog")  

  def main(args: Array[String]) {  

    val sparkConf = new SparkConf().setAppName("Test")  
    sparkConf.set("spark.kryo.registrator", "utils.CpcKryoSerializer")  
    val sc = new SparkContext(sparkConf)  


    val ssc = new StreamingContext(sc, Seconds(2))  
    val topicsSet = Set("iteblog")  
     //KafkaUtils的createDirectStream方法,调用底层API直接消费Kafka Partition的数据(Kafka Partition和RDD Partition 一一对应)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)  
    messages.foreachRDD(rdd => {  
      // 把RDD转成HasOffsetRanges类型(KafkaRDD extends HasOffsetRanges)  
      // OffsetRange 说明:Represents a range of offsets from a single Kafka TopicAndPartition.  
      // OffsetRange 说明: Instances of this class can be created with `OffsetRange.create()`.  
      val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  
//     offsetRanges 的实现代码(KafkaRDD中):tp:TopicAndPartition,fo:fromOffset  
//      val offsetRanges = fromOffsets.map { case (tp, fo) =>  
//        val uo = untilOffsets(tp)  
//        OffsetRange(tp.topic, tp.partition, fo, uo.offset)  
//      }.toArray  

      val kc = new KafkaCluster(kafkaParams)  
      for (offsets <- offsetsList) {  
       //TopicAndPartition 主构造参数第一个是topic,第二个是 partition id  
        val topicAndPartition = TopicAndPartition("iteblog", offsets.partition) //offsets.partition表示的是Kafka partition id  
        val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset)))//offsets.untilOffset:是  
        if (o.isLeft) {  
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")  
        }  
      }  
    })  

    ssc.start()  
    ssc.awaitTermination()  
    ssc.stop()  
  }  
}  
展开阅读全文
打赏
0
22 收藏
分享
加载中
更多评论
打赏
0 评论
22 收藏
0
分享
返回顶部
顶部