实时日志分析平台搭建笔记(一)
实时日志分析平台搭建笔记(一)
季牧云 发表于10个月前
实时日志分析平台搭建笔记(一)
  • 发表于 10个月前
  • 阅读 115
  • 收藏 1
  • 点赞 0
  • 评论 5

【腾讯云】买域名送云解析+SSL证书+建站!>>>   

    基于ELK构架的日志收集平台,很多公司都搭建好了,但ELK只是做到了收集存储,缺少了分析功能。博主作为信息安全从业人员,需要对所有收集的日志进行安全分析,给出处理结果,这样才算完成一个闭环。正好目前所在的公司也准备启动日志分析工作,所以最近研究了日志分析平台。日志分析平台主要目的是收集生产和办公系统中产生的各种日志(目前主要是access log和系统日志,可能后期还会包含防火墙等设备的日志),实时的做出分析,找出风险,及时预警。结合ELK以及同行老司机们的经验,目前暂时的构架如下,目前只是在完成了测试环境的简单测试demo,但还有很多未研究或者未解决的问题(例如如何将不同日志存入不同的kafka topic中),有待后续逐渐完善,写个博客作为笔记。

    主要思路是收集日志采用flume,日志可通过syslog等方式传输给flume,在flume中汇总后,存入kafka中,然后storm去读取kafka的日志,在storm中运行一些实时检测的安全规则(例如owasp top 10的一些内容项),如果检测到一条日志存在风险,就直接扔到es里,另外不管日志是否有风险,都会扔到Hbase中。spark上会运行一些基于统计的规则,spark运行完成后,也会将有问题的日志写入elasticsearch中,最后暂且使用kibana进行展示。以上各组建都采用集群方式进行运行,可有效避免单机故障。

    测试环境中目前采用accesslog进行测试,accesslog日志格式如下:

200.110.75.247 - [03/Jul/2017:10:56:34 +0800] "POST /social/getDropsById.htm HTTP/1.1" "id=9" 500 1483 0.745 "http://192.168.32.33/social/showDrops.htm?id=3" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"

    flume-agent中采用的conf文件如下:

mysyslog.sources = s1
mysyslog.channels = c1
mysyslog.sinks = output36 output37
mysyslog.sinkgroups = g1

mysyslog.sources.s1.type = syslogudp
mysyslog.sources.s1.port = 5140
mysyslog.sources.s1.host = 192.168.32.33
mysyslog.sources.s1.channels = c1

mysyslog.channels.c1.type = memory
mysyslog.channels.c1.capacity = 500
mysyslog.channels.c1.transactionCapacity = 100

mysyslog.sinks.output36.type = avro
mysyslog.sinks.output36.hostname = 192.168.32.36
mysyslog.sinks.output36.port = 12888

mysyslog.sinks.output37.type = avro
mysyslog.sinks.output37.hostname = 192.168.32.37
mysyslog.sinks.output37.port = 12888

mysyslog.sinks.output36.channel = c1
mysyslog.sinks.output37.channel = c1


mysyslog.sinkgroups.g1.sinks = output36 output37
mysyslog.sinkgroups.g1.processor.type = load_balance
mysyslog.sinkgroups.g1.processor.backoff = true
mysyslog.sinkgroups.g1.processor.selector = random

flume-collector在测试环境中主要有两台机器,分别上192.168.32.36和192.168.32.37,其中一台的conf文件如下:

collector36.sources = avro-in
collector36.channels = mem-channel
collector36.sinks = kafka-out

# For each one of the sources, the type is defined
collector36.sources.avro-in.channels = mem-channel
collector36.sources.avro-in.type = avro
collector36.sources.avro-in.bind = 192.168.32.36
collector36.sources.avro-in.port = 12888


# Each sink's type must be defined
collector36.sinks.kafka-out.type = org.apache.flume.sink.kafka.KafkaSink
collector36.sinks.kafka-out.kafka.bootstrap.servers = 192.168.32.36:9092,192.168.32.37:9092
collector36.sinks.kafka-out.kafka.topic = nginx-accesslog
collector36.sinks.kafka-out.kafka.producer.acks = 1
collector36.sinks.kafka-out.channel = mem-channel


# Each channel's type is defined.
collector36.channels.mem-channel.type = memory

    flume主要是收集日志,flume本身就支持多种日志接收方式,例如常见的通过rsyslog发送日志。日志经过flume收集汇总后写入到kafka集群中。kafka、jstorm、hadoop这些集群需要zookeeper来管理,所以准备了四台机器搭建一个zookeeper集群。其中一台的配置文件如下

# The number of milliseconds of each tick
tickTime=3000
The number of ticks that the initial
synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/mydata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.33=192.168.32.33:2888:3888
server.35=192.168.32.35:2888:3888
server.36=192.168.32.36:2888:3888

    在kafka中创建一个topic:nginx-accesslog,flume汇总的accesslog全部写入该topic中。kafka的主要目的是暂存所有的日志,随后,jstorm会去kafka读取需要的日志并进行处理。

bin/kafka-topics.sh --create --zookeeper 192.168.32.33:2181,192.168.32.34:2181,192.168.32.35:2181,192.168.32.36:2181/kafka --replication-factor 3 --partitions 4 --topic nginx-accesslog

    随后的jstorm和spark才是日志分析的关键所在。jstorm被定义为实时日志分析,每条日志会作为一个单独的个体被分析,所以在jstorm中主要是漏洞POC匹配的规则,例如sql注入,XSS等规则。而spark中主要运行统计分析规则,这些规则需要基于多条日志进行分析,例如3分钟内,某个IP访问量超过N次或者非200响应次数超过M次等。

    jstorm主要有3台服务器组成一个集群。目前运行一个demo程序,demo程序首先会把读取到的accesslog写入到hbase库中,随后依次检测SQL注入、XSS、远程目录包含、本地目录包含四大类漏洞,如果判定某条日志存在安全风险,则写入ES中。

目前是一条日志直接写入到hbase的一条记录,不做拆分。Hbase中的rowkey采用yyyy-MM-dd'T'HH:mm:ssZZ + i。AccessHbaseBolt代码如下:

package com.jiyufei.hlas.bolt;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/*
 *Created by jiyufei on 2017年3月27日 上午9:46:08 .
 */
public class AccessHbaseBolt extends BaseRichBolt {

	private static final long serialVersionUID = 4431861248787718777L;
	private final Logger logger = LoggerFactory.getLogger(this.getClass());
	private OutputCollector collector;
	private TableName tableName;
	private Connection connection;
	private String columnFamily = "access";
	private Table table;
	private Integer i;
	
	@Override
	public void execute(Tuple tuple) {
		String log = tuple.getString(0);
		String rowKey = null;
		Date time = null;
		SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
		SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
		String pattern = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s(.*)";
		Matcher m = Pattern.compile(pattern).matcher(log);
		if(m.find()){
			try {
				time = sdf.parse(m.group(4));
				rowKey = sdf2.format(time);
			} catch (ParseException e1) {
				e1.printStackTrace();
			}
		}else{
			rowKey = sdf2.format(new Date());
		}
		Put put = new Put(Bytes.toBytes(rowKey + "|" + i));
		
		put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("info"), Bytes.toBytes(log));
		try {
			table.put(put);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.execute]" + e.getMessage());
		}
		collector.emit("accesslogStream",tuple,new Values(log));
		collector.ack(tuple);
		if(i > 99999){
			i = 10000;
		}else{
			i++;
		}
	}

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		tableName = TableName.valueOf("accesslog2");
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", "192.168.32.33,192.168.32.34,192.168.32.35,192.168.32.36");
		configuration.set("hbase.zookeeper.property.clientPort", "2181");
		i = 10000;
		try {
			connection = ConnectionFactory.createConnection(configuration);
			Admin admin = connection.getAdmin();
			if(admin.tableExists(tableName)){
				System.out.println("table exists");
			}else{
				HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
				tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
				admin.createTable(tableDescriptor);
			}
			table = connection.getTable(tableName);
			//BufferedMutatorParams params = new BufferedMutatorParams(tableName);
			//mutator = connection.getBufferedMutator(params);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.prepare]" + e.getMessage());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
	}

	public void cleanup(){
		try {
			table.close();
			connection.close();
		} catch (IOException e) {
			logger.error("[AccessHbaseBolt.cleanup]" + e.getMessage());
		}
	}
	
}

Hbase中目前的存储结果:

下一个bolt是split-bolt主要负责拆分一个String类型的accesslog为一个类AccessLog,会将accesslog各个字段赋值到AccessLog中的相应字段。该类中目前有vulType,vulTypeId,msg,logType四个字段分别表示该日志触发何种漏洞规则,规则编号,提示信息,日志类型。

AccessLog类

package com.jiyufei.hlas.util;

import java.io.Serializable;


public class AccessLog implements Serializable{
	
	private static final long serialVersionUID = 7078625027459628744L;
	
	private String vulType;
	private String vulTypeId;
	private String msg;
	private String logType;
	private String clientIp;	
	private String clientUser;
	private String time;
	private String method;	
	private String url;	
	private String version;
	private String requestBody; 
	private String status;	
	private String httpBytes;
	private String requestTime;
	private String referer;
	private String userAgent;
	
	//private String hostIp;
	//private String indexName;
	//private String business;
	
	public String getVulType() {
		return vulType;
	}
	public void setVulType(String vulType) {
		this.vulType = vulType;
	}
	public String getVulTypeId() {
		return vulTypeId;
	}
	public void setVulTypeId(String vulTypeId) {
		this.vulTypeId = vulTypeId;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public String getLogType() {
		return logType;
	}
	public void setLogType(String logType) {
		this.logType = logType;
	}
	/*
	public String getHostIp() {
		return hostIp;
	}
	public void setHostIp(String hostIp) {
		this.hostIp = hostIp;
	}
	*/
	/*
	public String getIndexName() {
		return indexName;
	}
	public void setIndexName(String indexName) {
		this.indexName = indexName;
	}
	public String getBusiness() {
		return business;
	}
	public void setBusiness(String business) {
		this.business = business;
	}
	 */
	public String getClientIp() {
		return clientIp;
	}
	public void setClientIp(String clientIp) {
		this.clientIp = clientIp;
	}
	public String getClientUser() {
		return clientUser;
	}
	public void setClientUser(String clientUser) {
		this.clientUser = clientUser;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getVersion() {
		return version;
	}
	public void setVersion(String version) {
		this.version = version;
	}
	public String getRequestBody() {
		return requestBody;
	}
	public void setRequestBody(String requestBody) {
		this.requestBody = requestBody;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	public String getHttpBytes() {
		return httpBytes;
	}
	public void setHttpBytes(String httpBytes) {
		this.httpBytes = httpBytes;
	}
	public String getRequestTime() {
		return requestTime;
	}
	public void setRequestTime(String requestTime) {
		this.requestTime = requestTime;
	}
	public String getReferer() {
		return referer;
	}
	public void setReferer(String referer) {
		this.referer = referer;
	}
	public String getUserAgent() {
		return userAgent;
	}
	public void setUserAgent(String userAgent) {
		this.userAgent = userAgent;
	}
	@Override
	public String toString() {
		return "AccessLog [vulType=" + vulType + ", vulTypeId=" + vulTypeId
				+ ", msg=" + msg + ", logType=" + logType 
				+ ", clientIp=" + clientIp + ", clientUser="
				+ clientUser + ", time=" + time + ", method=" + method
				+ ", url=" + url + ", version=" + version + ", requestBody="
				+ requestBody + ", status=" + status + ", httpBytes="
				+ httpBytes + ", requestTime=" + requestTime + ", referer="
				+ referer + ", userAgent=" + userAgent + "]";
	}
	
	
	
}

后面四个bolt主要是进行POC检测,如果命中某个POC,则判定该日志有风险,会写入到ES中。在SqlModule类中主要存放一些规则,会将url和requestBody两个字段进行比较。

AccessSqlInjectionBolt类

package com.jiyufei.hlas.bolt;

import java.util.Map;

import com.jiyufei.hlas.module.SqlModule;
import com.jiyufei.hlas.util.AccessLog;
import com.jiyufei.hlas.util.ModuleUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class AccessSqlInjectionBolt extends BaseRichBolt {

	private static final long serialVersionUID = 8359035500983257762L;
	private OutputCollector collector;
	private SqlModule sqlModule;
	private Integer status;
	
	public void execute(Tuple tuple) {
		AccessLog accessLog = (AccessLog)tuple.getValueByField("accesslog");
		status = 0;
		ModuleUtil sqlModuleUtil = null;
		if(accessLog.getRequestBody().compareTo("-") != 0){
			sqlModuleUtil = sqlModule.check(accessLog.getRequestBody());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}
		}
		if(status != 1){
			sqlModuleUtil =sqlModule.check(accessLog.getUrl());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}else{
				accessLog.setVulType("0");
				accessLog.setVulType("");
				accessLog.setMsg("");
			}
		}
		
		if(status == 1){			
			collector.emit("finalStream",tuple,new Values(accessLog));
			collector.ack(tuple);
		}else{
			collector.emit("accesslogStream",tuple,new Values(accessLog));	
			collector.ack(tuple);
		}
		
		
	}

	public void prepare(@SuppressWarnings("rawtypes") Map stormConfig, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		this.sqlModule = new SqlModule();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
		declarer.declareStream("finalStream",new Fields("accesslog"));
	}

	public void cleanup(){
		//logFile.close();
	}
	
}

写入ES的类如下,使用jest。

package com.jiyufei.hlas.bolt;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;

import java.io.IOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jiyufei.hlas.util.AccessLog;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;


public class ElasticSearchBolt extends BaseRichBolt {

	private static final String ES_IP = "http://192.168.32.32:9200";
	private static final long serialVersionUID = 145040226237959431L;
	private JestClient client;
	private OutputCollector collector;
	private static Logger logger = LoggerFactory.getLogger(ElasticSearchBolt.class);
	private AccessLog accessLog;
	
	public void execute(Tuple tuple) {
		accessLog = (AccessLog)tuple.getValueByField("accesslog");
		logger.info("[ElasticSearchBolt.execute]tuple:" + accessLog.toString());
		
		Index index = new Index.Builder(accessLog).index("access-log").type(accessLog.getVulType()).build();
		try {
			
			client.execute(index);
			collector.ack(tuple);
		} catch (IOException e) {
			logger.error("[ElasticSearchBolt.execute]" + e.getMessage());
		}
	}

	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		JestClientFactory jestClientFactory = new JestClientFactory();
		jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(ES_IP).multiThreaded(true).build());
		client = jestClientFactory.getObject();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

}

main函数示例

        BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,id);
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		
		spoutConfig.zkServers = Arrays.asList(new String[]{"192.168.32.33","192.168.32.34","192.168.32.35","192.168.32.36"});
		spoutConfig.zkPort = 2181;
		spoutConfig.forceFromStart = false;
		
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout,1);
		/*
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		*/
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,2).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(HBASE_BOLT_ID,"accesslogStream");
		builder.setBolt(SQL_INJECTION_BOLT_ID, accessSqlInjectionBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		
		
		builder.setBolt(XSS__BOLT_ID, accessXssBolt,1).shuffleGrouping(SQL_INJECTION_BOLT_ID,"accesslogStream");
		builder.setBolt(LFI_BOLT_ID, accessLFIBolt,1).shuffleGrouping(XSS__BOLT_ID,"accesslogStream");
		builder.setBolt(RFI_BOLT_ID, accessRFIBolt,1).shuffleGrouping(LFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(RFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(MYSQL_BOLT_ID,accessMysqlBolt,1).globalGrouping(RFI_BOLT_ID,"accesslogStream");
		
		builder.setBolt("elasticsearch-bolt1", elasticSearchBolt).globalGrouping(SQL_INJECTION_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt2", elasticSearchBolt).globalGrouping(XSS__BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt3", elasticSearchBolt).globalGrouping(LFI_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt4", elasticSearchBolt).globalGrouping(RFI_BOLT_ID,"finalStream");

 

标签: 日志分析 安全
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 20
博文 26
码字总数 20788
评论 (5)
断崖逐梦
分享一下代码吗
季牧云

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample
断崖逐梦

引用来自“季牧云”的评论

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample

回复@季牧云 : 访问不了郁闷啊,麻烦提供可以访问的地址。
断崖逐梦
��
季牧云

引用来自“断崖逐梦”的评论

��
新地址:https://git.oschina.net/jiyufei/HlasExample.git
×
季牧云
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: