文档章节

实时日志分析平台搭建笔记(一)

季牧云
 季牧云
发布于 2017/07/03 14:31
字数 2518
阅读 120
收藏 1
点赞 0
评论 7

    基于ELK构架的日志收集平台,很多公司都搭建好了,但ELK只是做到了收集存储,缺少了分析功能。博主作为信息安全从业人员,需要对所有收集的日志进行安全分析,给出处理结果,这样才算完成一个闭环。正好目前所在的公司也准备启动日志分析工作,所以最近研究了日志分析平台。日志分析平台主要目的是收集生产和办公系统中产生的各种日志(目前主要是access log和系统日志,可能后期还会包含防火墙等设备的日志),实时的做出分析,找出风险,及时预警。结合ELK以及同行老司机们的经验,目前暂时的构架如下,目前只是在完成了测试环境的简单测试demo,但还有很多未研究或者未解决的问题(例如如何将不同日志存入不同的kafka topic中),有待后续逐渐完善,写个博客作为笔记。

    主要思路是收集日志采用flume,日志可通过syslog等方式传输给flume,在flume中汇总后,存入kafka中,然后storm去读取kafka的日志,在storm中运行一些实时检测的安全规则(例如owasp top 10的一些内容项),如果检测到一条日志存在风险,就直接扔到es里,另外不管日志是否有风险,都会扔到Hbase中。spark上会运行一些基于统计的规则,spark运行完成后,也会将有问题的日志写入elasticsearch中,最后暂且使用kibana进行展示。以上各组建都采用集群方式进行运行,可有效避免单机故障。

    测试环境中目前采用accesslog进行测试,accesslog日志格式如下:

200.110.75.247 - [03/Jul/2017:10:56:34 +0800] "POST /social/getDropsById.htm HTTP/1.1" "id=9" 500 1483 0.745 "http://192.168.32.33/social/showDrops.htm?id=3" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"

    flume-agent中采用的conf文件如下:

mysyslog.sources = s1
mysyslog.channels = c1
mysyslog.sinks = output36 output37
mysyslog.sinkgroups = g1

mysyslog.sources.s1.type = syslogudp
mysyslog.sources.s1.port = 5140
mysyslog.sources.s1.host = 192.168.32.33
mysyslog.sources.s1.channels = c1

mysyslog.channels.c1.type = memory
mysyslog.channels.c1.capacity = 500
mysyslog.channels.c1.transactionCapacity = 100

mysyslog.sinks.output36.type = avro
mysyslog.sinks.output36.hostname = 192.168.32.36
mysyslog.sinks.output36.port = 12888

mysyslog.sinks.output37.type = avro
mysyslog.sinks.output37.hostname = 192.168.32.37
mysyslog.sinks.output37.port = 12888

mysyslog.sinks.output36.channel = c1
mysyslog.sinks.output37.channel = c1


mysyslog.sinkgroups.g1.sinks = output36 output37
mysyslog.sinkgroups.g1.processor.type = load_balance
mysyslog.sinkgroups.g1.processor.backoff = true
mysyslog.sinkgroups.g1.processor.selector = random

flume-collector在测试环境中主要有两台机器,分别上192.168.32.36和192.168.32.37,其中一台的conf文件如下:

collector36.sources = avro-in
collector36.channels = mem-channel
collector36.sinks = kafka-out

# For each one of the sources, the type is defined
collector36.sources.avro-in.channels = mem-channel
collector36.sources.avro-in.type = avro
collector36.sources.avro-in.bind = 192.168.32.36
collector36.sources.avro-in.port = 12888


# Each sink's type must be defined
collector36.sinks.kafka-out.type = org.apache.flume.sink.kafka.KafkaSink
collector36.sinks.kafka-out.kafka.bootstrap.servers = 192.168.32.36:9092,192.168.32.37:9092
collector36.sinks.kafka-out.kafka.topic = nginx-accesslog
collector36.sinks.kafka-out.kafka.producer.acks = 1
collector36.sinks.kafka-out.channel = mem-channel


# Each channel's type is defined.
collector36.channels.mem-channel.type = memory

    flume主要是收集日志,flume本身就支持多种日志接收方式,例如常见的通过rsyslog发送日志。日志经过flume收集汇总后写入到kafka集群中。kafka、jstorm、hadoop这些集群需要zookeeper来管理,所以准备了四台机器搭建一个zookeeper集群。其中一台的配置文件如下

# The number of milliseconds of each tick
tickTime=3000
The number of ticks that the initial
synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/mydata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.33=192.168.32.33:2888:3888
server.35=192.168.32.35:2888:3888
server.36=192.168.32.36:2888:3888

    在kafka中创建一个topic:nginx-accesslog,flume汇总的accesslog全部写入该topic中。kafka的主要目的是暂存所有的日志,随后,jstorm会去kafka读取需要的日志并进行处理。

bin/kafka-topics.sh --create --zookeeper 192.168.32.33:2181,192.168.32.34:2181,192.168.32.35:2181,192.168.32.36:2181/kafka --replication-factor 3 --partitions 4 --topic nginx-accesslog

    随后的jstorm和spark才是日志分析的关键所在。jstorm被定义为实时日志分析,每条日志会作为一个单独的个体被分析,所以在jstorm中主要是漏洞POC匹配的规则,例如sql注入,XSS等规则。而spark中主要运行统计分析规则,这些规则需要基于多条日志进行分析,例如3分钟内,某个IP访问量超过N次或者非200响应次数超过M次等。

    jstorm主要有3台服务器组成一个集群。目前运行一个demo程序,demo程序首先会把读取到的accesslog写入到hbase库中,随后依次检测SQL注入、XSS、远程目录包含、本地目录包含四大类漏洞,如果判定某条日志存在安全风险,则写入ES中。

目前是一条日志直接写入到hbase的一条记录,不做拆分。Hbase中的rowkey采用yyyy-MM-dd'T'HH:mm:ssZZ + i。AccessHbaseBolt代码如下:

package com.jiyufei.hlas.bolt;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/*
 *Created by jiyufei on 2017年3月27日 上午9:46:08 .
 */
public class AccessHbaseBolt extends BaseRichBolt {

	private static final long serialVersionUID = 4431861248787718777L;
	private final Logger logger = LoggerFactory.getLogger(this.getClass());
	private OutputCollector collector;
	private TableName tableName;
	private Connection connection;
	private String columnFamily = "access";
	private Table table;
	private Integer i;
	
	@Override
	public void execute(Tuple tuple) {
		String log = tuple.getString(0);
		String rowKey = null;
		Date time = null;
		SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
		SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
		String pattern = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s(.*)";
		Matcher m = Pattern.compile(pattern).matcher(log);
		if(m.find()){
			try {
				time = sdf.parse(m.group(4));
				rowKey = sdf2.format(time);
			} catch (ParseException e1) {
				e1.printStackTrace();
			}
		}else{
			rowKey = sdf2.format(new Date());
		}
		Put put = new Put(Bytes.toBytes(rowKey + "|" + i));
		
		put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("info"), Bytes.toBytes(log));
		try {
			table.put(put);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.execute]" + e.getMessage());
		}
		collector.emit("accesslogStream",tuple,new Values(log));
		collector.ack(tuple);
		if(i > 99999){
			i = 10000;
		}else{
			i++;
		}
	}

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		tableName = TableName.valueOf("accesslog2");
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", "192.168.32.33,192.168.32.34,192.168.32.35,192.168.32.36");
		configuration.set("hbase.zookeeper.property.clientPort", "2181");
		i = 10000;
		try {
			connection = ConnectionFactory.createConnection(configuration);
			Admin admin = connection.getAdmin();
			if(admin.tableExists(tableName)){
				System.out.println("table exists");
			}else{
				HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
				tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
				admin.createTable(tableDescriptor);
			}
			table = connection.getTable(tableName);
			//BufferedMutatorParams params = new BufferedMutatorParams(tableName);
			//mutator = connection.getBufferedMutator(params);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.prepare]" + e.getMessage());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
	}

	public void cleanup(){
		try {
			table.close();
			connection.close();
		} catch (IOException e) {
			logger.error("[AccessHbaseBolt.cleanup]" + e.getMessage());
		}
	}
	
}

Hbase中目前的存储结果:

下一个bolt是split-bolt主要负责拆分一个String类型的accesslog为一个类AccessLog,会将accesslog各个字段赋值到AccessLog中的相应字段。该类中目前有vulType,vulTypeId,msg,logType四个字段分别表示该日志触发何种漏洞规则,规则编号,提示信息,日志类型。

AccessLog类

package com.jiyufei.hlas.util;

import java.io.Serializable;


public class AccessLog implements Serializable{
	
	private static final long serialVersionUID = 7078625027459628744L;
	
	private String vulType;
	private String vulTypeId;
	private String msg;
	private String logType;
	private String clientIp;	
	private String clientUser;
	private String time;
	private String method;	
	private String url;	
	private String version;
	private String requestBody; 
	private String status;	
	private String httpBytes;
	private String requestTime;
	private String referer;
	private String userAgent;
	
	//private String hostIp;
	//private String indexName;
	//private String business;
	
	public String getVulType() {
		return vulType;
	}
	public void setVulType(String vulType) {
		this.vulType = vulType;
	}
	public String getVulTypeId() {
		return vulTypeId;
	}
	public void setVulTypeId(String vulTypeId) {
		this.vulTypeId = vulTypeId;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public String getLogType() {
		return logType;
	}
	public void setLogType(String logType) {
		this.logType = logType;
	}
	/*
	public String getHostIp() {
		return hostIp;
	}
	public void setHostIp(String hostIp) {
		this.hostIp = hostIp;
	}
	*/
	/*
	public String getIndexName() {
		return indexName;
	}
	public void setIndexName(String indexName) {
		this.indexName = indexName;
	}
	public String getBusiness() {
		return business;
	}
	public void setBusiness(String business) {
		this.business = business;
	}
	 */
	public String getClientIp() {
		return clientIp;
	}
	public void setClientIp(String clientIp) {
		this.clientIp = clientIp;
	}
	public String getClientUser() {
		return clientUser;
	}
	public void setClientUser(String clientUser) {
		this.clientUser = clientUser;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getVersion() {
		return version;
	}
	public void setVersion(String version) {
		this.version = version;
	}
	public String getRequestBody() {
		return requestBody;
	}
	public void setRequestBody(String requestBody) {
		this.requestBody = requestBody;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	public String getHttpBytes() {
		return httpBytes;
	}
	public void setHttpBytes(String httpBytes) {
		this.httpBytes = httpBytes;
	}
	public String getRequestTime() {
		return requestTime;
	}
	public void setRequestTime(String requestTime) {
		this.requestTime = requestTime;
	}
	public String getReferer() {
		return referer;
	}
	public void setReferer(String referer) {
		this.referer = referer;
	}
	public String getUserAgent() {
		return userAgent;
	}
	public void setUserAgent(String userAgent) {
		this.userAgent = userAgent;
	}
	@Override
	public String toString() {
		return "AccessLog [vulType=" + vulType + ", vulTypeId=" + vulTypeId
				+ ", msg=" + msg + ", logType=" + logType 
				+ ", clientIp=" + clientIp + ", clientUser="
				+ clientUser + ", time=" + time + ", method=" + method
				+ ", url=" + url + ", version=" + version + ", requestBody="
				+ requestBody + ", status=" + status + ", httpBytes="
				+ httpBytes + ", requestTime=" + requestTime + ", referer="
				+ referer + ", userAgent=" + userAgent + "]";
	}
	
	
	
}

后面四个bolt主要是进行POC检测,如果命中某个POC,则判定该日志有风险,会写入到ES中。在SqlModule类中主要存放一些规则,会将url和requestBody两个字段进行比较。

AccessSqlInjectionBolt类

package com.jiyufei.hlas.bolt;

import java.util.Map;

import com.jiyufei.hlas.module.SqlModule;
import com.jiyufei.hlas.util.AccessLog;
import com.jiyufei.hlas.util.ModuleUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class AccessSqlInjectionBolt extends BaseRichBolt {

	private static final long serialVersionUID = 8359035500983257762L;
	private OutputCollector collector;
	private SqlModule sqlModule;
	private Integer status;
	
	public void execute(Tuple tuple) {
		AccessLog accessLog = (AccessLog)tuple.getValueByField("accesslog");
		status = 0;
		ModuleUtil sqlModuleUtil = null;
		if(accessLog.getRequestBody().compareTo("-") != 0){
			sqlModuleUtil = sqlModule.check(accessLog.getRequestBody());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}
		}
		if(status != 1){
			sqlModuleUtil =sqlModule.check(accessLog.getUrl());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}else{
				accessLog.setVulType("0");
				accessLog.setVulType("");
				accessLog.setMsg("");
			}
		}
		
		if(status == 1){			
			collector.emit("finalStream",tuple,new Values(accessLog));
			collector.ack(tuple);
		}else{
			collector.emit("accesslogStream",tuple,new Values(accessLog));	
			collector.ack(tuple);
		}
		
		
	}

	public void prepare(@SuppressWarnings("rawtypes") Map stormConfig, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		this.sqlModule = new SqlModule();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
		declarer.declareStream("finalStream",new Fields("accesslog"));
	}

	public void cleanup(){
		//logFile.close();
	}
	
}

写入ES的类如下,使用jest。

package com.jiyufei.hlas.bolt;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;

import java.io.IOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jiyufei.hlas.util.AccessLog;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;


public class ElasticSearchBolt extends BaseRichBolt {

	private static final String ES_IP = "http://192.168.32.32:9200";
	private static final long serialVersionUID = 145040226237959431L;
	private JestClient client;
	private OutputCollector collector;
	private static Logger logger = LoggerFactory.getLogger(ElasticSearchBolt.class);
	private AccessLog accessLog;
	
	public void execute(Tuple tuple) {
		accessLog = (AccessLog)tuple.getValueByField("accesslog");
		logger.info("[ElasticSearchBolt.execute]tuple:" + accessLog.toString());
		
		Index index = new Index.Builder(accessLog).index("access-log").type(accessLog.getVulType()).build();
		try {
			
			client.execute(index);
			collector.ack(tuple);
		} catch (IOException e) {
			logger.error("[ElasticSearchBolt.execute]" + e.getMessage());
		}
	}

	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		JestClientFactory jestClientFactory = new JestClientFactory();
		jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(ES_IP).multiThreaded(true).build());
		client = jestClientFactory.getObject();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

}

main函数示例

        BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,id);
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		
		spoutConfig.zkServers = Arrays.asList(new String[]{"192.168.32.33","192.168.32.34","192.168.32.35","192.168.32.36"});
		spoutConfig.zkPort = 2181;
		spoutConfig.forceFromStart = false;
		
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout,1);
		/*
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		*/
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,2).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(HBASE_BOLT_ID,"accesslogStream");
		builder.setBolt(SQL_INJECTION_BOLT_ID, accessSqlInjectionBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		
		
		builder.setBolt(XSS__BOLT_ID, accessXssBolt,1).shuffleGrouping(SQL_INJECTION_BOLT_ID,"accesslogStream");
		builder.setBolt(LFI_BOLT_ID, accessLFIBolt,1).shuffleGrouping(XSS__BOLT_ID,"accesslogStream");
		builder.setBolt(RFI_BOLT_ID, accessRFIBolt,1).shuffleGrouping(LFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(RFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(MYSQL_BOLT_ID,accessMysqlBolt,1).globalGrouping(RFI_BOLT_ID,"accesslogStream");
		
		builder.setBolt("elasticsearch-bolt1", elasticSearchBolt).globalGrouping(SQL_INJECTION_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt2", elasticSearchBolt).globalGrouping(XSS__BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt3", elasticSearchBolt).globalGrouping(LFI_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt4", elasticSearchBolt).globalGrouping(RFI_BOLT_ID,"finalStream");

 

© 著作权归作者所有

共有 人打赏支持
季牧云
粉丝 20
博文 26
码字总数 20788
作品 0
浦东
其他
加载中

评论(7)

季牧云
季牧云

引用来自“断崖逐梦”的评论

又没了,可不可以连续,求助
你有什么问题吗
断崖逐梦
断崖逐梦
又没了,可不可以连续,求助
季牧云
季牧云

引用来自“断崖逐梦”的评论

��
新地址:https://git.oschina.net/jiyufei/HlasExample.git
断崖逐梦
断崖逐梦
��
断崖逐梦
断崖逐梦

引用来自“季牧云”的评论

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample

回复@季牧云 : 访问不了郁闷啊,麻烦提供可以访问的地址。
季牧云
季牧云

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample
断崖逐梦
断崖逐梦
分享一下代码吗
基于Flume+Log4j+Kafka的日志采集架构方案

http://www.linuxidc.com/Linux/2016-05/131402.htm (一)kafka-jstorm集群实时日志分析 之 ---------kafka实时日志处理 ELK 实现 Java 分布式系统日志分析架构 ELK(ElasticSearch, Logstas...

cccyb
2016/12/12
42
0
2017年12月19-20日 阿里云 飞天/智能 云栖大会-北京峰会 会议笔记

2017年12月19-20日,在北京国家会议中心举办了云栖大会北京峰会,19日为Tech Insight,20日为主论坛和其他分论坛。场次很多,内容很丰富,自己的一些参会笔记整理如下: 20171219 上午 阿里云...

海洋的云
2017/12/21
0
0
Kafka是个奇葩!——Linkin论文学习笔记

是个消息中间件吗?那和市面上其他一堆堆的中间件例如ActiveMQ, RabbitMQ有什么区别? 答案只有一个: Kafka是个集群的消息中间件+存储,一个节点可以存储几T的数据! 为啥一个中间件需要存储...

难易
2014/08/26
0
9
CTO详细讲解海量日志处理ELK

ELK实时日志分析平台之Elasticsearch简介 Elasticsearch是一个高度灵活的开源全文检索和分析引擎。它能够迅速(几乎是实时地)地存储、查找和分析大规模数据。通常被用在有复杂的搜索要求的系...

Java架构分享
05/23
0
0
全球敏捷运维峰会丨Gdevops北京站,不一样的端午等你来!

堵塞的高速、拥挤的景区、匆忙的到此一游……在接下来的端午小长假,不如换种过法?你可知道,6月11日在北京,DBAplus社群联合了运维帮、Linux中国战略开启Gdevops全球敏捷运维峰会第二站! ...

DBAplus社群
2016/05/18
0
0
在ELK+Filebeat搭建日志中心

当前环境 系统:centos7 docker 1.12.1 介绍 ElasticSearch Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析。它是一个建立在全文搜索引擎 Apache...

禁区铁铍人
2017/12/01
0
0
大数据经典学习路线(及供参考)

转:https://blog.csdn.net/yuexianchang/article/details/52468291 目录(?)[+]

junzixing1985
04/15
0
0
Logstash+Redis+Elasticsearch+Kibana+Nginx搭建日志分析系统

前言: 随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析。目前我们服务的用户包括微博、微盘、云存储、弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理...

小柒2012
2016/03/02
1K
0
大数据经典学习路线(及供参考)之 三

3.Storm实时计算部分阶段 实时课程分为两个部分:流式计算核心技术和流式计算计算案例实战。 1.流式计算核心技术 流式计算核心技术主要分为两个核心技术点:Storm和Kafka,学完此阶段能够掌握...

柯西带你学编程
05/22
0
0
哈尔滨大数据培训课程哪个好?从内容教你如何判断

导读 随着大数据飞速发展,大数据将对人类生活产生深远影响,大数据是未来科技浪潮发展不容忽视的巨大推动力量,加上人工智能对大数据技术的需要,使越来越多的人想要从事大数据,但却不知道哈...

hxl0009
04/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

expect(spawn) 自动化git提交和scp拷贝---centos(linux)

**在进行SCP文件拷贝中,往往需要进行用户密码的输入,即用户交互。若采用自动化脚本的方式进行,则可用以下方式: ** #!/usr/bin/expect #设置参数 set src [lindex $argv 0] set dest [lin...

helplove
10分钟前
1
0
用Build来构建对象的写法

如果一个类的属性过多,用构造器来构建对象很难写,因此我们时用Build方式来构建对象。写法大致如下。 import java.io.Serializable;import java.util.Date;public class Log impleme...

算法之名
12分钟前
11
0
利用 acme.sh 获取网站证书并配置https访问

acme.sh 实现了 acme 协议, 可以从 letsencrypt 生成免费的证书.(https://github.com/Neilpang/acme.sh/wiki/%E8%AF%B4%E6%98%8E) 主要步骤: 安装 acme.sh 生成证书 copy 证书到 nginx/ap...

haoyuehong
26分钟前
2
0
微擎框架内如何根据media_id获取到微信图片的路径

微擎的框架内,图片选择后,获取的是那个字符串是media_id,相当于你这张图片在微信的图片服务器里面的id 要求是:获取https://mmbiz.qpic.cn/mmbiz_jpg/…… 微信图片的路径 而微信并没有根据m...

老bia同学
29分钟前
2
0
Spring boot中日期的json格式化

Model 在model层中,类的日期属性上面添加如下注解: @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd hh:mm:ss") 参考 Jackson Date格式化教程...

亚林瓜子
31分钟前
2
0
Eclipse:Failed to load the JNI shared library

1.问题背景: 由于我之前使用jdk1.9学习,当使用Luke的时候发现jdk版本过高,需要向下配置jdk,就向朋友拷了一个安装包。重新配置路径后,便开始报错。 2.问题描述: Failed to load the JNI...

tinder_boy
34分钟前
1
0
少儿学习编程课程是否真的适合七八岁的低龄儿童[图]

少儿学习编程课程是否真的适合七八岁的低龄儿童[图]: 天下熙熙皆为利来,天下攘攘皆为利往。 这几年来,乐高教育机构在国内如同雨后春笋般出现,当然关闭/转手的也很多。从教师角度来看,部...

原创小博客
39分钟前
1
0
ES12-词项查询

1.词项查询介绍 全文查询将在执行之前分析查询字符串,但词项级别查询将按照存储在倒排索引中的词项进行精确操作。这些查询通常用于数字,日期和枚举等结构化数据,而不是全文本字段。 或者,...

贾峰uk
47分钟前
2
0
http状态码与ajax的状态值

ajax状态值 1.1 200 & OK:状态请求成功

litCabbage
50分钟前
2
0
iOS动画效果合集、飞吧企鹅游戏、换肤方案、画板、文字效果等源码

iOS精选源码 动画知识运用及常见动画效果收集 3D卡片拖拽卡片叠加卡片 iFIERO - FLYING PENGUIN 飞吧企鹅SpriteKit游戏(源码) Swift封装的空数据提醒界面EmptyView 沙盒文件浏览与分享调试控...

sunnyaigd
53分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部