文档章节

实时日志分析平台搭建笔记(二)

季牧云
 季牧云
发布于 2017/07/04 10:18
字数 704
阅读 57
收藏 1
点赞 0
评论 0

    接上一篇。ES采用两台服务器的集群,创建一个indices,名称为access-log,jstorm和spark处理后的日志写入其中。

注意:创建完access-log后,在写入数据前,需要运行以下命令,"time"为时间字段,"vulType"为_type

curl -XPUT '192.168.32.32:9200/access-log?pretty' -H 'Content-Type: application/json' -d'
{
  "mappings": {
    "vulType": {
      "properties": {
        "time": {
          "type":   "date",
          "format": "date_time_no_millis"
        }
      }
    }
  }
}
'

伪造一些日志,可以在kibana中看到jstorm执行后的结果

以上是jstorm实时检测的流程。还有一路是spark通过MR分析多条日志,进行统计规则的检测。

spark首先去hbase中读取数据,随后进行Map/Reduce,统计出触发风险规则的日志。例如测试统计三分钟内访问次数超过10次的IP。

获取hbase中的accesslog

public List<String> getTableByDate(String startRow,String endRow){
		Scan s = new Scan();
		List<String> logList = new ArrayList<String>();
		s.setStartRow(Bytes.toBytes(startRow));
		s.setStopRow(Bytes.toBytes(endRow));
		try {
			ResultScanner resultScanner = table.getScanner(s);
			for(Result rs:resultScanner){
				NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = rs.getMap();
				for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry:navigableMap.entrySet()){
	                NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
	                for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
	                    NavigableMap<Long, byte[]> ma = en.getValue();
	                    for(Map.Entry<Long, byte[]>e: ma.entrySet()){
	                    	logList.add(Bytes.toString(e.getValue()));
	                    }
	                }
	            }
			}
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
		return logList;

随后将accesslog映射到类AccessLog中

public void analysesLog(String startKey,String endKey){
		logList = hbaseOperator.getTableByDate(startKey, endKey);
		listLength = logList.size();
		accesslogList = new ArrayList<AccessLog>(listLength);
		String patternstr = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s\"((\\S+)\\s(.*)\\s(\\S+))\"\\s\"(.*)\"\\s(\\d+)\\s(\\d+)\\s(\\S+)\\s\"(.*)\"\\s\"(.*)\"";
		pattern = Pattern.compile(patternstr);
		for(int i=0;i<listLength;i++){
			m = pattern.matcher(logList.get(i));
			if(m.find()){
				AccessLog accessLog = new AccessLog();
				accessLog.setLogType("access-log");
				accessLog.setClientIp(m.group(1));
				accessLog.setClientUser(m.group(3));
				SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
				SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
				Date time = null;
				try {
					time = sdf.parse(m.group(4));
					accessLog.setTime(sdf2.format(time));	
				} catch (ParseException e) {
					accessLog.setTime(sdf2.format(new Date()));
					//logger.error("[LogSplit.execute]:" + e.getMessage());
				}
				accessLog.setMethod(m.group(6));
				accessLog.setUrl(m.group(7));
				accessLog.setVersion(m.group(8));
				accessLog.setRequestBody(m.group(9));
				accessLog.setStatus(m.group(10));
				accessLog.setHttpBytes(m.group(11));
				accessLog.setRequestTime(m.group(12));
				accessLog.setReferer(m.group(13));
				accessLog.setUserAgent(m.group(14));
				accesslogList.add(accessLog);
			}
		}
		analysesIp();
		
		
	}

映射完成后调用analysesIp()进行Map/Reduce操作,并将命中的IP写入到ES中

public void analysesIp(){
		if(!accesslogList.isEmpty()){
			List<String> ipList = new ArrayList<String>(listLength);
			Iterator<AccessLog> iterator = accesslogList.iterator();
			while (iterator.hasNext()) {
				ipList.add(iterator.next().getClientIp());
			}
			JavaRDD<String> ipRdd = sparkContext.parallelize(ipList);
			JavaPairRDD<String, Integer> clientIpRdd = ipRdd.mapToPair(initCount);
			JavaPairRDD<String, Integer> sumRdd = clientIpRdd.reduceByKey(sum);
			Map<String, Integer> ipMap = sumRdd.collectAsMap();
			SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
			AccessLog accessLog = new AccessLog();
			accessLog.setLogType("accesslog");
			accessLog.setVulType("IP访问异常");
			accessLog.setVulTypeId(Integer.toString(RuleLength.START_RULE_LENGTH));
			for(Entry<String, Integer> entry:ipMap.entrySet()){
				if(entry.getValue() > 30){
					accessLog.setTime(sdf2.format(new Date()));
					accessLog.setClientIp(entry.getKey());
					accessLog.setMsg("源IP在3分钟内共访问了" + entry.getValue() + "次");
					elasticSearch.inputData(accessLog);
				}
			}
			
		}
	}

用到两个简单的map/reduce函数,一个进行初始化,将单个IP初始化一个元组(IP,1)

第二个将相同的IP进行累加,并记录出现的次数,累加后的效果为(IP1,10) (IP2,3)

private static PairFunction<String, String, Integer> initCount = new PairFunction<String, String, Integer>() {

		/**
		 * 
		 */
		private static final long serialVersionUID = -6290488020645730311L;

		public Tuple2<String, Integer> call(String x){
			return new Tuple2<String, Integer>(x, 1);
		}
	};
	
	private static Function2<Integer, Integer, Integer> sum = new Function2<Integer, Integer, Integer>() {
		
		/**
		 * 
		 */
		private static final long serialVersionUID = 391813718009018019L;

		@Override
		public Integer call(Integer x, Integer y) throws Exception {
			
			return x + y;
		}
	};

写入ES后,可看到的效果如下

© 著作权归作者所有

共有 人打赏支持
季牧云
粉丝 20
博文 26
码字总数 20788
作品 0
浦东
其他
基于Flume+Log4j+Kafka的日志采集架构方案

http://www.linuxidc.com/Linux/2016-05/131402.htm (一)kafka-jstorm集群实时日志分析 之 ---------kafka实时日志处理 ELK 实现 Java 分布式系统日志分析架构 ELK(ElasticSearch, Logstas...

cccyb ⋅ 2016/12/12 ⋅ 0

2017年12月19-20日 阿里云 飞天/智能 云栖大会-北京峰会 会议笔记

2017年12月19-20日,在北京国家会议中心举办了云栖大会北京峰会,19日为Tech Insight,20日为主论坛和其他分论坛。场次很多,内容很丰富,自己的一些参会笔记整理如下: 20171219 上午 阿里云...

海洋的云 ⋅ 2017/12/21 ⋅ 0

Logstash+Redis+Elasticsearch+Kibana+Nginx搭建日志分析系统

前言: 随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。目前我们服务的用户包括微博、微盘、云存储、弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理...

小柒2012 ⋅ 2016/03/02 ⋅ 0

Kafka是个奇葩!——Linkin论文学习笔记

是个消息中间件吗?那和市面上其他一堆堆的中间件例如ActiveMQ, RabbitMQ有什么区别? 答案只有一个: Kafka是个集群的消息中间件+存储,一个节点可以存储几T的数据! 为啥一个中间件需要存储...

难易 ⋅ 2014/08/26 ⋅ 9

全球敏捷运维峰会丨Gdevops北京站,不一样的端午等你来!

堵塞的高速、拥挤的景区、匆忙的到此一游……在接下来的端午小长假,不如换种过法?你可知道,6月11日在北京,DBAplus社群联合了运维帮、Linux中国战略开启Gdevops全球敏捷运维峰会第二站! ...

DBAplus社群 ⋅ 2016/05/18 ⋅ 0

ELK + Filebeat 搭建日志系统

Elasticsearch 分布式搜索和分析引擎。具有高可伸缩、高可靠和易管理等特点。基于 Apache Lucene 构建,能对大容量的数据进行接近实时的存储、搜索和分析操作。 Logstash 日志收集器。搜集各...

BeckJin ⋅ 2017/12/10 ⋅ 0

ELK(ElasticSearch+Logstash+ Kibana)搭建实时日志分析平台

ELK(ElasticSearch+Logstash+ Kibana)搭建实时日志分析平台 一、准备工具:(Centos7) 1、Elasticsearch:ElasticSearch是一个基于Lucene构建的开源,分布式,RESTful搜索引擎。设计用于云计算...

XiaoBingZ ⋅ 2017/11/06 ⋅ 0

CTO详细讲解海量日志处理ELK

ELK实时日志分析平台之Elasticsearch简介 Elasticsearch是一个高度灵活的开源全文检索和分析引擎。它能够迅速(几乎是实时地)地存储、查找和分析大规模数据。通常被用在有复杂的搜索要求的系...

Java架构分享 ⋅ 05/23 ⋅ 0

ELK实时日志分析平台环境部署-2

一、ELK搭建篇 官网地址:https://www.elastic.co/cn/ 官网权威指南:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html 安装指南:https://www.elastic.co/guide/en/e......

顶风走千里 ⋅ 2017/11/29 ⋅ 0

大数据经典学习路线(及供参考)

转:https://blog.csdn.net/yuexianchang/article/details/52468291 目录(?)[+]

junzixing1985 ⋅ 04/15 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

RabbitMQ学习以及与Spring的集成(三)

本文介绍RabbitMQ与Spring的简单集成以及消息的发送和接收。 在RabbitMQ的Spring配置文件中,首先需要增加命名空间。 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 其次是模...

onedotdot ⋅ 20分钟前 ⋅ 0

JAVA实现仿微信红包分配规则

最近过年发红包拜年成为一种新的潮流,作为程序猿对算法的好奇远远要大于对红包的好奇,这里介绍一种自己想到的一种随机红包分配策略,还请大家多多指教。 算法介绍 一、红包金额限制 对于微...

楠木楠 ⋅ 32分钟前 ⋅ 0

Python 数电表格格式化 xlutils xlwt xlrd的使用

需要安装 xlutils xlwt xlrd 格式化前 格式化后 代码 先copy读取的表格,然后按照一定的规则修改,将昵称中的学号提取出来替换昵称即可 from xlrd import open_workbookfrom xlutils.copy ...

阿豪boy ⋅ 今天 ⋅ 0

面试题:使用rand5()生成rand7()

前言 读研究生这3 年,思维与本科相比变化挺大的,这几年除了看论文、设计方案,更重要的是学会注重先思考、再实现,感觉更加成熟吧,不再像个小P孩,人年轻时总会心高气傲。有1 道面试题:给...

初雪之音 ⋅ 今天 ⋅ 0

Docker Toolbox Looks like something went wrong

Docker Toolbox 重新安装后提示错误:Looks like something went wrong in step ´Checking if machine default exists´ 控制面板-->程序与应用-->启用或关闭windows功能:找到Hyper-V,如果处......

随你疯 ⋅ 今天 ⋅ 0

Guacamole 远程桌面

本文将Apache的guacamole服务的部署和应用,http://guacamole.apache.org/doc/gug/ 该链接下有全部相关知识的英文文档,如果水平ok,可以去这里仔细查看。 一、简介 Apache Guacamole 是无客...

千里明月 ⋅ 今天 ⋅ 0

nagios 安装

Nagios简介:监控网络并排除网络故障的工具:nagios,Ntop,OpenVAS,OCS,OSSIM等开源监控工具。 可以实现对网络上的服务器进行全面的监控,包括服务(apache、mysql、ntp、ftp、disk、qmail和h...

寰宇01 ⋅ 今天 ⋅ 0

AngularDart注意事项

默认情况下创建Dart项目应出现以下列表: 有时会因为不知明的原因导致列表项缺失: 此时可以通过以下步骤解决: 1.创建项目涉及到的包:stagehand 2.执行pub global activate stagehand或pub...

scooplol ⋅ 今天 ⋅ 0

Java Web如何操作Cookie的添加修改和删除

创建Cookie对象 Cookie cookie = new Cookie("id", "1"); 修改Cookie值 cookie.setValue("2"); 设置Cookie有效期和删除Cookie cookie.setMaxAge(24*60*60); // Cookie有效时间 co......

二营长意大利炮 ⋅ 今天 ⋅ 0

【每天一个JQuery特效】淡入淡出显示或隐藏窗口

我是JQuery新手爱好者,有时间就练练代码,防止手生,争取每天一个JQuery练习,在这个博客记录下学习的笔记。 本特效主要采用fadeIn()和fadeOut()方法显示淡入淡出的显示效果显示或隐藏元...

Rhymo-Wu ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部