基于ELK构架的日志收集平台,很多公司都搭建好了,但ELK只是做到了收集存储,缺少了分析功能。博主作为信息安全从业人员,需要对所有收集的日志进行安全分析,给出处理结果,这样才算完成一个闭环。正好目前所在的公司也准备启动日志分析工作,所以最近研究了日志分析平台。日志分析平台主要目的是收集生产和办公系统中产生的各种日志(目前主要是access log和系统日志,可能后期还会包含防火墙等设备的日志),实时的做出分析,找出风险,及时预警。结合ELK以及同行老司机们的经验,目前暂时的构架如下,目前只是在完成了测试环境的简单测试demo,但还有很多未研究或者未解决的问题(例如如何将不同日志存入不同的kafka topic中),有待后续逐渐完善,写个博客作为笔记。
主要思路是收集日志采用flume,日志可通过syslog等方式传输给flume,在flume中汇总后,存入kafka中,然后storm去读取kafka的日志,在storm中运行一些实时检测的安全规则(例如owasp top 10的一些内容项),如果检测到一条日志存在风险,就直接扔到es里,另外不管日志是否有风险,都会扔到Hbase中。spark上会运行一些基于统计的规则,spark运行完成后,也会将有问题的日志写入elasticsearch中,最后暂且使用kibana进行展示。以上各组建都采用集群方式进行运行,可有效避免单机故障。
测试环境中目前采用accesslog进行测试,accesslog日志格式如下:
200.110.75.247 - [03/Jul/2017:10:56:34 +0800] "POST /social/getDropsById.htm HTTP/1.1" "id=9" 500 1483 0.745 "http://192.168.32.33/social/showDrops.htm?id=3" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
flume-agent中采用的conf文件如下:
mysyslog.sources = s1
mysyslog.channels = c1
mysyslog.sinks = output36 output37
mysyslog.sinkgroups = g1
mysyslog.sources.s1.type = syslogudp
mysyslog.sources.s1.port = 5140
mysyslog.sources.s1.host = 192.168.32.33
mysyslog.sources.s1.channels = c1
mysyslog.channels.c1.type = memory
mysyslog.channels.c1.capacity = 500
mysyslog.channels.c1.transactionCapacity = 100
mysyslog.sinks.output36.type = avro
mysyslog.sinks.output36.hostname = 192.168.32.36
mysyslog.sinks.output36.port = 12888
mysyslog.sinks.output37.type = avro
mysyslog.sinks.output37.hostname = 192.168.32.37
mysyslog.sinks.output37.port = 12888
mysyslog.sinks.output36.channel = c1
mysyslog.sinks.output37.channel = c1
mysyslog.sinkgroups.g1.sinks = output36 output37
mysyslog.sinkgroups.g1.processor.type = load_balance
mysyslog.sinkgroups.g1.processor.backoff = true
mysyslog.sinkgroups.g1.processor.selector = random
flume-collector在测试环境中主要有两台机器,分别上192.168.32.36和192.168.32.37,其中一台的conf文件如下:
collector36.sources = avro-in
collector36.channels = mem-channel
collector36.sinks = kafka-out
# For each one of the sources, the type is defined
collector36.sources.avro-in.channels = mem-channel
collector36.sources.avro-in.type = avro
collector36.sources.avro-in.bind = 192.168.32.36
collector36.sources.avro-in.port = 12888
# Each sink's type must be defined
collector36.sinks.kafka-out.type = org.apache.flume.sink.kafka.KafkaSink
collector36.sinks.kafka-out.kafka.bootstrap.servers = 192.168.32.36:9092,192.168.32.37:9092
collector36.sinks.kafka-out.kafka.topic = nginx-accesslog
collector36.sinks.kafka-out.kafka.producer.acks = 1
collector36.sinks.kafka-out.channel = mem-channel
# Each channel's type is defined.
collector36.channels.mem-channel.type = memory
flume主要是收集日志,flume本身就支持多种日志接收方式,例如常见的通过rsyslog发送日志。日志经过flume收集汇总后写入到kafka集群中。kafka、jstorm、hadoop这些集群需要zookeeper来管理,所以准备了四台机器搭建一个zookeeper集群。其中一台的配置文件如下
# The number of milliseconds of each tick
tickTime=3000
The number of ticks that the initial
synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/mydata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.33=192.168.32.33:2888:3888
server.35=192.168.32.35:2888:3888
server.36=192.168.32.36:2888:3888
在kafka中创建一个topic:nginx-accesslog,flume汇总的accesslog全部写入该topic中。kafka的主要目的是暂存所有的日志,随后,jstorm会去kafka读取需要的日志并进行处理。
bin/kafka-topics.sh --create --zookeeper 192.168.32.33:2181,192.168.32.34:2181,192.168.32.35:2181,192.168.32.36:2181/kafka --replication-factor 3 --partitions 4 --topic nginx-accesslog
随后的jstorm和spark才是日志分析的关键所在。jstorm被定义为实时日志分析,每条日志会作为一个单独的个体被分析,所以在jstorm中主要是漏洞POC匹配的规则,例如sql注入,XSS等规则。而spark中主要运行统计分析规则,这些规则需要基于多条日志进行分析,例如3分钟内,某个IP访问量超过N次或者非200响应次数超过M次等。
jstorm主要有3台服务器组成一个集群。目前运行一个demo程序,demo程序首先会把读取到的accesslog写入到hbase库中,随后依次检测SQL注入、XSS、远程目录包含、本地目录包含四大类漏洞,如果判定某条日志存在安全风险,则写入ES中。
目前是一条日志直接写入到hbase的一条记录,不做拆分。Hbase中的rowkey采用yyyy-MM-dd'T'HH:mm:ssZZ + i。AccessHbaseBolt代码如下:
package com.jiyufei.hlas.bolt;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/*
*Created by jiyufei on 2017年3月27日 上午9:46:08 .
*/
public class AccessHbaseBolt extends BaseRichBolt {
private static final long serialVersionUID = 4431861248787718777L;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private OutputCollector collector;
private TableName tableName;
private Connection connection;
private String columnFamily = "access";
private Table table;
private Integer i;
@Override
public void execute(Tuple tuple) {
String log = tuple.getString(0);
String rowKey = null;
Date time = null;
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
String pattern = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s(.*)";
Matcher m = Pattern.compile(pattern).matcher(log);
if(m.find()){
try {
time = sdf.parse(m.group(4));
rowKey = sdf2.format(time);
} catch (ParseException e1) {
e1.printStackTrace();
}
}else{
rowKey = sdf2.format(new Date());
}
Put put = new Put(Bytes.toBytes(rowKey + "|" + i));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("info"), Bytes.toBytes(log));
try {
table.put(put);
} catch (IOException e) {
//e.printStackTrace();
logger.error("[AccessHbaseBolt.execute]" + e.getMessage());
}
collector.emit("accesslogStream",tuple,new Values(log));
collector.ack(tuple);
if(i > 99999){
i = 10000;
}else{
i++;
}
}
@Override
public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
tableName = TableName.valueOf("accesslog2");
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "192.168.32.33,192.168.32.34,192.168.32.35,192.168.32.36");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
i = 10000;
try {
connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
if(admin.tableExists(tableName)){
System.out.println("table exists");
}else{
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
admin.createTable(tableDescriptor);
}
table = connection.getTable(tableName);
//BufferedMutatorParams params = new BufferedMutatorParams(tableName);
//mutator = connection.getBufferedMutator(params);
} catch (IOException e) {
//e.printStackTrace();
logger.error("[AccessHbaseBolt.prepare]" + e.getMessage());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("accesslogStream",new Fields("accesslog"));
}
public void cleanup(){
try {
table.close();
connection.close();
} catch (IOException e) {
logger.error("[AccessHbaseBolt.cleanup]" + e.getMessage());
}
}
}
Hbase中目前的存储结果:
下一个bolt是split-bolt主要负责拆分一个String类型的accesslog为一个类AccessLog,会将accesslog各个字段赋值到AccessLog中的相应字段。该类中目前有vulType,vulTypeId,msg,logType四个字段分别表示该日志触发何种漏洞规则,规则编号,提示信息,日志类型。
AccessLog类
package com.jiyufei.hlas.util;
import java.io.Serializable;
public class AccessLog implements Serializable{
private static final long serialVersionUID = 7078625027459628744L;
private String vulType;
private String vulTypeId;
private String msg;
private String logType;
private String clientIp;
private String clientUser;
private String time;
private String method;
private String url;
private String version;
private String requestBody;
private String status;
private String httpBytes;
private String requestTime;
private String referer;
private String userAgent;
//private String hostIp;
//private String indexName;
//private String business;
public String getVulType() {
return vulType;
}
public void setVulType(String vulType) {
this.vulType = vulType;
}
public String getVulTypeId() {
return vulTypeId;
}
public void setVulTypeId(String vulTypeId) {
this.vulTypeId = vulTypeId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getLogType() {
return logType;
}
public void setLogType(String logType) {
this.logType = logType;
}
/*
public String getHostIp() {
return hostIp;
}
public void setHostIp(String hostIp) {
this.hostIp = hostIp;
}
*/
/*
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public String getBusiness() {
return business;
}
public void setBusiness(String business) {
this.business = business;
}
*/
public String getClientIp() {
return clientIp;
}
public void setClientIp(String clientIp) {
this.clientIp = clientIp;
}
public String getClientUser() {
return clientUser;
}
public void setClientUser(String clientUser) {
this.clientUser = clientUser;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getRequestBody() {
return requestBody;
}
public void setRequestBody(String requestBody) {
this.requestBody = requestBody;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getHttpBytes() {
return httpBytes;
}
public void setHttpBytes(String httpBytes) {
this.httpBytes = httpBytes;
}
public String getRequestTime() {
return requestTime;
}
public void setRequestTime(String requestTime) {
this.requestTime = requestTime;
}
public String getReferer() {
return referer;
}
public void setReferer(String referer) {
this.referer = referer;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
@Override
public String toString() {
return "AccessLog [vulType=" + vulType + ", vulTypeId=" + vulTypeId
+ ", msg=" + msg + ", logType=" + logType
+ ", clientIp=" + clientIp + ", clientUser="
+ clientUser + ", time=" + time + ", method=" + method
+ ", url=" + url + ", version=" + version + ", requestBody="
+ requestBody + ", status=" + status + ", httpBytes="
+ httpBytes + ", requestTime=" + requestTime + ", referer="
+ referer + ", userAgent=" + userAgent + "]";
}
}
后面四个bolt主要是进行POC检测,如果命中某个POC,则判定该日志有风险,会写入到ES中。在SqlModule类中主要存放一些规则,会将url和requestBody两个字段进行比较。
AccessSqlInjectionBolt类
package com.jiyufei.hlas.bolt;
import java.util.Map;
import com.jiyufei.hlas.module.SqlModule;
import com.jiyufei.hlas.util.AccessLog;
import com.jiyufei.hlas.util.ModuleUtil;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class AccessSqlInjectionBolt extends BaseRichBolt {
private static final long serialVersionUID = 8359035500983257762L;
private OutputCollector collector;
private SqlModule sqlModule;
private Integer status;
public void execute(Tuple tuple) {
AccessLog accessLog = (AccessLog)tuple.getValueByField("accesslog");
status = 0;
ModuleUtil sqlModuleUtil = null;
if(accessLog.getRequestBody().compareTo("-") != 0){
sqlModuleUtil = sqlModule.check(accessLog.getRequestBody());
if(sqlModuleUtil != null){
accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
accessLog.setVulType("SQL注入");
accessLog.setMsg(sqlModuleUtil.getMsg());
status = 1;
}
}
if(status != 1){
sqlModuleUtil =sqlModule.check(accessLog.getUrl());
if(sqlModuleUtil != null){
accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
accessLog.setVulType("SQL注入");
accessLog.setMsg(sqlModuleUtil.getMsg());
status = 1;
}else{
accessLog.setVulType("0");
accessLog.setVulType("");
accessLog.setMsg("");
}
}
if(status == 1){
collector.emit("finalStream",tuple,new Values(accessLog));
collector.ack(tuple);
}else{
collector.emit("accesslogStream",tuple,new Values(accessLog));
collector.ack(tuple);
}
}
public void prepare(@SuppressWarnings("rawtypes") Map stormConfig, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.sqlModule = new SqlModule();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("accesslogStream",new Fields("accesslog"));
declarer.declareStream("finalStream",new Fields("accesslog"));
}
public void cleanup(){
//logFile.close();
}
}
写入ES的类如下,使用jest。
package com.jiyufei.hlas.bolt;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;
import java.io.IOException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jiyufei.hlas.util.AccessLog;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class ElasticSearchBolt extends BaseRichBolt {
private static final String ES_IP = "http://192.168.32.32:9200";
private static final long serialVersionUID = 145040226237959431L;
private JestClient client;
private OutputCollector collector;
private static Logger logger = LoggerFactory.getLogger(ElasticSearchBolt.class);
private AccessLog accessLog;
public void execute(Tuple tuple) {
accessLog = (AccessLog)tuple.getValueByField("accesslog");
logger.info("[ElasticSearchBolt.execute]tuple:" + accessLog.toString());
Index index = new Index.Builder(accessLog).index("access-log").type(accessLog.getVulType()).build();
try {
client.execute(index);
collector.ack(tuple);
} catch (IOException e) {
logger.error("[ElasticSearchBolt.execute]" + e.getMessage());
}
}
public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
JestClientFactory jestClientFactory = new JestClientFactory();
jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(ES_IP).multiThreaded(true).build());
client = jestClientFactory.getObject();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
main函数示例
BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,id);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList(new String[]{"192.168.32.33","192.168.32.34","192.168.32.35","192.168.32.36"});
spoutConfig.zkPort = 2181;
spoutConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout,1);
/*
builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(KAFKA_SPOUT_ID);
builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
*/
builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,2).shuffleGrouping(KAFKA_SPOUT_ID);
builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(HBASE_BOLT_ID,"accesslogStream");
builder.setBolt(SQL_INJECTION_BOLT_ID, accessSqlInjectionBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
builder.setBolt(XSS__BOLT_ID, accessXssBolt,1).shuffleGrouping(SQL_INJECTION_BOLT_ID,"accesslogStream");
builder.setBolt(LFI_BOLT_ID, accessLFIBolt,1).shuffleGrouping(XSS__BOLT_ID,"accesslogStream");
builder.setBolt(RFI_BOLT_ID, accessRFIBolt,1).shuffleGrouping(LFI_BOLT_ID,"accesslogStream");
//builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(RFI_BOLT_ID,"accesslogStream");
//builder.setBolt(MYSQL_BOLT_ID,accessMysqlBolt,1).globalGrouping(RFI_BOLT_ID,"accesslogStream");
builder.setBolt("elasticsearch-bolt1", elasticSearchBolt).globalGrouping(SQL_INJECTION_BOLT_ID,"finalStream");
builder.setBolt("elasticsearch-bolt2", elasticSearchBolt).globalGrouping(XSS__BOLT_ID,"finalStream");
builder.setBolt("elasticsearch-bolt3", elasticSearchBolt).globalGrouping(LFI_BOLT_ID,"finalStream");
builder.setBolt("elasticsearch-bolt4", elasticSearchBolt).globalGrouping(RFI_BOLT_ID,"finalStream");
评论(7)
引用来自“断崖逐梦”的评论
又没了,可不可以连续,求助引用来自“断崖逐梦”的评论
引用来自“季牧云”的评论
引用来自“断崖逐梦”的评论
分享一下代码吗引用来自“断崖逐梦”的评论
分享一下代码吗