文档章节

Spark Streaming结合Flume、Kafka最新最全日志分析

ericSM
 ericSM
发布于 2016/06/23 15:37
字数 396
阅读 827
收藏 5

Spark Streaming结合Flume、Kafka最新最全日志分析

1.修改相应的配置文件

按照 http://my.oschina.net/sunmin/blog/692994

整合安装Flume+Kafka+SparkStreaming

将flume/conf/producer.conf将需要监控的日志输出文件修改为本地的log 路径:
/var/log/nginx/www.eric.aysaas.com-access.log

2.导入相关 jar 包

(快捷键 Ctrl + Alt + Shift + s),点击Project Structure界面左侧的“Modules”显示下图界面

jar 包自己编译,或者去载  http://search.maven.org/#search|ga|1|g%3A%22org.apache.spark%22%20AND%20v%3A%221.6.1%22

3.新建log/KafkaTest.scala 代码如下

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * flume+kafka+SparkStreaming 实时 nginx 日志获取
  * Created by eric on 16/6/29.
  */
object KafkaLog {

  def main(agrs: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamingTest")
    val ssc = new StreamingContext(sparkConf, Seconds(20))//代表一个给定的秒数的实例

    val topic = "HappyBirthDayToAnYuan"
    val topicSet = topic.split(" ").toSet

    //用 brokers and topics 创建 direct kafka stream
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    //直接从 kafka brokers 拉取信息,而不使用任何接收器.
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicSet
    )
    val lines = messages.map(_._2)
    lines.print()
    val words: DStream[String] = lines.flatMap(_.split("\n"))
    words.count().print()

    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

4.Test

访问本地页面产生日志 http://www.eric.aysaas.com/app/admin

在这20秒内总共产生的日志行数为:

 

 

 

© 著作权归作者所有

上一篇: 王阳明心学
ericSM
粉丝 17
博文 142
码字总数 154379
作品 0
南京
项目经理
私信 提问
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
2018/07/26
0
0
Flume+Kafka+Sparkstreaming日志分析

最近要做一个日志实时分析的应用,采用了flume+kafka+sparkstreaming框架,先搞了一个测试Demo,本文没有分析其架构原理。   简介:flume是一个分布式,高可靠,可用的海量日志聚合系统,k...

hblt-j
2018/07/04
88
0
[Spark]Spark Streaming 指南一 Example

1. 概述 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等许多源中提取,并且...

sjf0115
2017/03/01
0
0
[Spark]Spark Streaming 指南四 输入DStreams和Receivers

1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams。在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入DStream(除 file strea...

sjf0115
2017/03/02
0
0
地铁译:Spark for python developers ---Spark流式数据处理

先研究一下不断改变的动态环境带来的挑战,在列出流处理应用的先决条件(如,Twitter的TCP Sockets连接)之后, 结合Spark, Kafka 和 Flume 把数据放入一个低延迟,高吞吐量,可缩放的处理流...

abel_cao
01/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

前端技术之:Prisma Demo服务部署过程记录

安装前提条件: 1、已经安装了docker运行环境 2、以下命令执行记录发生在MackBook环境 3、已经安装了PostgreSQL(我使用的是11版本) 4、Node开发运行环境可以正常工作 首先需要通过Node包管...

popgis
今天
5
0
数组和链表

数组 链表 技巧一:掌握链表,想轻松写出正确的链表代码,需要理解指针获引用的含义: 对指针的理解,记住下面的这句话就可以了: 将某个变量赋值给指针,实际上就是将这个变量的地址赋值给指...

code-ortaerc
今天
4
0
栈-链式(c/c++实现)

上次说“栈是在线性表演变而来的,线性表很自由,想往哪里插数据就往哪里插数据,想删哪数据就删哪数据...。但给线性表一些限制呢,就没那么自由了,把线性表的三边封起来就变成了栈,栈只能...

白客C
今天
43
0
Mybatis Plus service

/** * @author beth * @data 2019-10-20 23:34 */@RunWith(SpringRunner.class)@SpringBootTestpublic class ServiceTest { @Autowired private IUserInfoService iUserInfoS......

一个yuanbeth
今天
5
0
php7-internal 7 zval的操作

## 7.7 zval的操作 扩展中经常会用到各种类型的zval,PHP提供了很多宏用于不同类型zval的操作,尽管我们也可以自己操作zval,但这并不是一个好习惯,因为zval有很多其它用途的标识,如果自己...

冻结not
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部