文档章节

实时日志分析平台搭建笔记(一)

季牧云
 季牧云
发布于 2017/07/03 14:31
字数 2518
阅读 123
收藏 1

    基于ELK构架的日志收集平台,很多公司都搭建好了,但ELK只是做到了收集存储,缺少了分析功能。博主作为信息安全从业人员,需要对所有收集的日志进行安全分析,给出处理结果,这样才算完成一个闭环。正好目前所在的公司也准备启动日志分析工作,所以最近研究了日志分析平台。日志分析平台主要目的是收集生产和办公系统中产生的各种日志(目前主要是access log和系统日志,可能后期还会包含防火墙等设备的日志),实时的做出分析,找出风险,及时预警。结合ELK以及同行老司机们的经验,目前暂时的构架如下,目前只是在完成了测试环境的简单测试demo,但还有很多未研究或者未解决的问题(例如如何将不同日志存入不同的kafka topic中),有待后续逐渐完善,写个博客作为笔记。

    主要思路是收集日志采用flume,日志可通过syslog等方式传输给flume,在flume中汇总后,存入kafka中,然后storm去读取kafka的日志,在storm中运行一些实时检测的安全规则(例如owasp top 10的一些内容项),如果检测到一条日志存在风险,就直接扔到es里,另外不管日志是否有风险,都会扔到Hbase中。spark上会运行一些基于统计的规则,spark运行完成后,也会将有问题的日志写入elasticsearch中,最后暂且使用kibana进行展示。以上各组建都采用集群方式进行运行,可有效避免单机故障。

    测试环境中目前采用accesslog进行测试,accesslog日志格式如下:

200.110.75.247 - [03/Jul/2017:10:56:34 +0800] "POST /social/getDropsById.htm HTTP/1.1" "id=9" 500 1483 0.745 "http://192.168.32.33/social/showDrops.htm?id=3" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"

    flume-agent中采用的conf文件如下:

mysyslog.sources = s1
mysyslog.channels = c1
mysyslog.sinks = output36 output37
mysyslog.sinkgroups = g1

mysyslog.sources.s1.type = syslogudp
mysyslog.sources.s1.port = 5140
mysyslog.sources.s1.host = 192.168.32.33
mysyslog.sources.s1.channels = c1

mysyslog.channels.c1.type = memory
mysyslog.channels.c1.capacity = 500
mysyslog.channels.c1.transactionCapacity = 100

mysyslog.sinks.output36.type = avro
mysyslog.sinks.output36.hostname = 192.168.32.36
mysyslog.sinks.output36.port = 12888

mysyslog.sinks.output37.type = avro
mysyslog.sinks.output37.hostname = 192.168.32.37
mysyslog.sinks.output37.port = 12888

mysyslog.sinks.output36.channel = c1
mysyslog.sinks.output37.channel = c1


mysyslog.sinkgroups.g1.sinks = output36 output37
mysyslog.sinkgroups.g1.processor.type = load_balance
mysyslog.sinkgroups.g1.processor.backoff = true
mysyslog.sinkgroups.g1.processor.selector = random

flume-collector在测试环境中主要有两台机器,分别上192.168.32.36和192.168.32.37,其中一台的conf文件如下:

collector36.sources = avro-in
collector36.channels = mem-channel
collector36.sinks = kafka-out

# For each one of the sources, the type is defined
collector36.sources.avro-in.channels = mem-channel
collector36.sources.avro-in.type = avro
collector36.sources.avro-in.bind = 192.168.32.36
collector36.sources.avro-in.port = 12888


# Each sink's type must be defined
collector36.sinks.kafka-out.type = org.apache.flume.sink.kafka.KafkaSink
collector36.sinks.kafka-out.kafka.bootstrap.servers = 192.168.32.36:9092,192.168.32.37:9092
collector36.sinks.kafka-out.kafka.topic = nginx-accesslog
collector36.sinks.kafka-out.kafka.producer.acks = 1
collector36.sinks.kafka-out.channel = mem-channel


# Each channel's type is defined.
collector36.channels.mem-channel.type = memory

    flume主要是收集日志,flume本身就支持多种日志接收方式,例如常见的通过rsyslog发送日志。日志经过flume收集汇总后写入到kafka集群中。kafka、jstorm、hadoop这些集群需要zookeeper来管理,所以准备了四台机器搭建一个zookeeper集群。其中一台的配置文件如下

# The number of milliseconds of each tick
tickTime=3000
The number of ticks that the initial
synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/mydata
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.33=192.168.32.33:2888:3888
server.35=192.168.32.35:2888:3888
server.36=192.168.32.36:2888:3888

    在kafka中创建一个topic:nginx-accesslog,flume汇总的accesslog全部写入该topic中。kafka的主要目的是暂存所有的日志,随后,jstorm会去kafka读取需要的日志并进行处理。

bin/kafka-topics.sh --create --zookeeper 192.168.32.33:2181,192.168.32.34:2181,192.168.32.35:2181,192.168.32.36:2181/kafka --replication-factor 3 --partitions 4 --topic nginx-accesslog

    随后的jstorm和spark才是日志分析的关键所在。jstorm被定义为实时日志分析,每条日志会作为一个单独的个体被分析,所以在jstorm中主要是漏洞POC匹配的规则,例如sql注入,XSS等规则。而spark中主要运行统计分析规则,这些规则需要基于多条日志进行分析,例如3分钟内,某个IP访问量超过N次或者非200响应次数超过M次等。

    jstorm主要有3台服务器组成一个集群。目前运行一个demo程序,demo程序首先会把读取到的accesslog写入到hbase库中,随后依次检测SQL注入、XSS、远程目录包含、本地目录包含四大类漏洞,如果判定某条日志存在安全风险,则写入ES中。

目前是一条日志直接写入到hbase的一条记录,不做拆分。Hbase中的rowkey采用yyyy-MM-dd'T'HH:mm:ssZZ + i。AccessHbaseBolt代码如下:

package com.jiyufei.hlas.bolt;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/*
 *Created by jiyufei on 2017年3月27日 上午9:46:08 .
 */
public class AccessHbaseBolt extends BaseRichBolt {

	private static final long serialVersionUID = 4431861248787718777L;
	private final Logger logger = LoggerFactory.getLogger(this.getClass());
	private OutputCollector collector;
	private TableName tableName;
	private Connection connection;
	private String columnFamily = "access";
	private Table table;
	private Integer i;
	
	@Override
	public void execute(Tuple tuple) {
		String log = tuple.getString(0);
		String rowKey = null;
		Date time = null;
		SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ");
		SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss ZZ",Locale.ENGLISH);
		String pattern = "((\\d+\\.){3}\\d+)\\s(\\S+)\\s\\[(.+)\\]\\s(.*)";
		Matcher m = Pattern.compile(pattern).matcher(log);
		if(m.find()){
			try {
				time = sdf.parse(m.group(4));
				rowKey = sdf2.format(time);
			} catch (ParseException e1) {
				e1.printStackTrace();
			}
		}else{
			rowKey = sdf2.format(new Date());
		}
		Put put = new Put(Bytes.toBytes(rowKey + "|" + i));
		
		put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("info"), Bytes.toBytes(log));
		try {
			table.put(put);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.execute]" + e.getMessage());
		}
		collector.emit("accesslogStream",tuple,new Values(log));
		collector.ack(tuple);
		if(i > 99999){
			i = 10000;
		}else{
			i++;
		}
	}

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		tableName = TableName.valueOf("accesslog2");
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", "192.168.32.33,192.168.32.34,192.168.32.35,192.168.32.36");
		configuration.set("hbase.zookeeper.property.clientPort", "2181");
		i = 10000;
		try {
			connection = ConnectionFactory.createConnection(configuration);
			Admin admin = connection.getAdmin();
			if(admin.tableExists(tableName)){
				System.out.println("table exists");
			}else{
				HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
				tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
				admin.createTable(tableDescriptor);
			}
			table = connection.getTable(tableName);
			//BufferedMutatorParams params = new BufferedMutatorParams(tableName);
			//mutator = connection.getBufferedMutator(params);
		} catch (IOException e) {
			//e.printStackTrace();
			logger.error("[AccessHbaseBolt.prepare]" + e.getMessage());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
	}

	public void cleanup(){
		try {
			table.close();
			connection.close();
		} catch (IOException e) {
			logger.error("[AccessHbaseBolt.cleanup]" + e.getMessage());
		}
	}
	
}

Hbase中目前的存储结果:

下一个bolt是split-bolt主要负责拆分一个String类型的accesslog为一个类AccessLog,会将accesslog各个字段赋值到AccessLog中的相应字段。该类中目前有vulType,vulTypeId,msg,logType四个字段分别表示该日志触发何种漏洞规则,规则编号,提示信息,日志类型。

AccessLog类

package com.jiyufei.hlas.util;

import java.io.Serializable;


public class AccessLog implements Serializable{
	
	private static final long serialVersionUID = 7078625027459628744L;
	
	private String vulType;
	private String vulTypeId;
	private String msg;
	private String logType;
	private String clientIp;	
	private String clientUser;
	private String time;
	private String method;	
	private String url;	
	private String version;
	private String requestBody; 
	private String status;	
	private String httpBytes;
	private String requestTime;
	private String referer;
	private String userAgent;
	
	//private String hostIp;
	//private String indexName;
	//private String business;
	
	public String getVulType() {
		return vulType;
	}
	public void setVulType(String vulType) {
		this.vulType = vulType;
	}
	public String getVulTypeId() {
		return vulTypeId;
	}
	public void setVulTypeId(String vulTypeId) {
		this.vulTypeId = vulTypeId;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	public String getLogType() {
		return logType;
	}
	public void setLogType(String logType) {
		this.logType = logType;
	}
	/*
	public String getHostIp() {
		return hostIp;
	}
	public void setHostIp(String hostIp) {
		this.hostIp = hostIp;
	}
	*/
	/*
	public String getIndexName() {
		return indexName;
	}
	public void setIndexName(String indexName) {
		this.indexName = indexName;
	}
	public String getBusiness() {
		return business;
	}
	public void setBusiness(String business) {
		this.business = business;
	}
	 */
	public String getClientIp() {
		return clientIp;
	}
	public void setClientIp(String clientIp) {
		this.clientIp = clientIp;
	}
	public String getClientUser() {
		return clientUser;
	}
	public void setClientUser(String clientUser) {
		this.clientUser = clientUser;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getVersion() {
		return version;
	}
	public void setVersion(String version) {
		this.version = version;
	}
	public String getRequestBody() {
		return requestBody;
	}
	public void setRequestBody(String requestBody) {
		this.requestBody = requestBody;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	public String getHttpBytes() {
		return httpBytes;
	}
	public void setHttpBytes(String httpBytes) {
		this.httpBytes = httpBytes;
	}
	public String getRequestTime() {
		return requestTime;
	}
	public void setRequestTime(String requestTime) {
		this.requestTime = requestTime;
	}
	public String getReferer() {
		return referer;
	}
	public void setReferer(String referer) {
		this.referer = referer;
	}
	public String getUserAgent() {
		return userAgent;
	}
	public void setUserAgent(String userAgent) {
		this.userAgent = userAgent;
	}
	@Override
	public String toString() {
		return "AccessLog [vulType=" + vulType + ", vulTypeId=" + vulTypeId
				+ ", msg=" + msg + ", logType=" + logType 
				+ ", clientIp=" + clientIp + ", clientUser="
				+ clientUser + ", time=" + time + ", method=" + method
				+ ", url=" + url + ", version=" + version + ", requestBody="
				+ requestBody + ", status=" + status + ", httpBytes="
				+ httpBytes + ", requestTime=" + requestTime + ", referer="
				+ referer + ", userAgent=" + userAgent + "]";
	}
	
	
	
}

后面四个bolt主要是进行POC检测,如果命中某个POC,则判定该日志有风险,会写入到ES中。在SqlModule类中主要存放一些规则,会将url和requestBody两个字段进行比较。

AccessSqlInjectionBolt类

package com.jiyufei.hlas.bolt;

import java.util.Map;

import com.jiyufei.hlas.module.SqlModule;
import com.jiyufei.hlas.util.AccessLog;
import com.jiyufei.hlas.util.ModuleUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class AccessSqlInjectionBolt extends BaseRichBolt {

	private static final long serialVersionUID = 8359035500983257762L;
	private OutputCollector collector;
	private SqlModule sqlModule;
	private Integer status;
	
	public void execute(Tuple tuple) {
		AccessLog accessLog = (AccessLog)tuple.getValueByField("accesslog");
		status = 0;
		ModuleUtil sqlModuleUtil = null;
		if(accessLog.getRequestBody().compareTo("-") != 0){
			sqlModuleUtil = sqlModule.check(accessLog.getRequestBody());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}
		}
		if(status != 1){
			sqlModuleUtil =sqlModule.check(accessLog.getUrl());
			if(sqlModuleUtil != null){
				accessLog.setVulTypeId(String.valueOf(sqlModuleUtil.getVulTypeId()));
				accessLog.setVulType("SQL注入");
				accessLog.setMsg(sqlModuleUtil.getMsg());
				status = 1;
			}else{
				accessLog.setVulType("0");
				accessLog.setVulType("");
				accessLog.setMsg("");
			}
		}
		
		if(status == 1){			
			collector.emit("finalStream",tuple,new Values(accessLog));
			collector.ack(tuple);
		}else{
			collector.emit("accesslogStream",tuple,new Values(accessLog));	
			collector.ack(tuple);
		}
		
		
	}

	public void prepare(@SuppressWarnings("rawtypes") Map stormConfig, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		this.sqlModule = new SqlModule();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("accesslogStream",new Fields("accesslog"));
		declarer.declareStream("finalStream",new Fields("accesslog"));
	}

	public void cleanup(){
		//logFile.close();
	}
	
}

写入ES的类如下,使用jest。

package com.jiyufei.hlas.bolt;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;

import java.io.IOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jiyufei.hlas.util.AccessLog;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;


public class ElasticSearchBolt extends BaseRichBolt {

	private static final String ES_IP = "http://192.168.32.32:9200";
	private static final long serialVersionUID = 145040226237959431L;
	private JestClient client;
	private OutputCollector collector;
	private static Logger logger = LoggerFactory.getLogger(ElasticSearchBolt.class);
	private AccessLog accessLog;
	
	public void execute(Tuple tuple) {
		accessLog = (AccessLog)tuple.getValueByField("accesslog");
		logger.info("[ElasticSearchBolt.execute]tuple:" + accessLog.toString());
		
		Index index = new Index.Builder(accessLog).index("access-log").type(accessLog.getVulType()).build();
		try {
			
			client.execute(index);
			collector.ack(tuple);
		} catch (IOException e) {
			logger.error("[ElasticSearchBolt.execute]" + e.getMessage());
		}
	}

	public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		JestClientFactory jestClientFactory = new JestClientFactory();
		jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(ES_IP).multiThreaded(true).build());
		client = jestClientFactory.getObject();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

}

main函数示例

        BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,id);
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		
		spoutConfig.zkServers = Arrays.asList(new String[]{"192.168.32.33","192.168.32.34","192.168.32.35","192.168.32.36"});
		spoutConfig.zkPort = 2181;
		spoutConfig.forceFromStart = false;
		
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
		
		TopologyBuilder builder = new TopologyBuilder();
		
		builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout,1);
		/*
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		*/
		builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,2).shuffleGrouping(KAFKA_SPOUT_ID);
		builder.setBolt(SPLIT_BOLT_ID, accessSplitBolt,1).shuffleGrouping(HBASE_BOLT_ID,"accesslogStream");
		builder.setBolt(SQL_INJECTION_BOLT_ID, accessSqlInjectionBolt,1).shuffleGrouping(SPLIT_BOLT_ID,"accesslogStream");
		
		
		builder.setBolt(XSS__BOLT_ID, accessXssBolt,1).shuffleGrouping(SQL_INJECTION_BOLT_ID,"accesslogStream");
		builder.setBolt(LFI_BOLT_ID, accessLFIBolt,1).shuffleGrouping(XSS__BOLT_ID,"accesslogStream");
		builder.setBolt(RFI_BOLT_ID, accessRFIBolt,1).shuffleGrouping(LFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(HBASE_BOLT_ID, accessHbaseBolt,1).shuffleGrouping(RFI_BOLT_ID,"accesslogStream");
		//builder.setBolt(MYSQL_BOLT_ID,accessMysqlBolt,1).globalGrouping(RFI_BOLT_ID,"accesslogStream");
		
		builder.setBolt("elasticsearch-bolt1", elasticSearchBolt).globalGrouping(SQL_INJECTION_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt2", elasticSearchBolt).globalGrouping(XSS__BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt3", elasticSearchBolt).globalGrouping(LFI_BOLT_ID,"finalStream");
		builder.setBolt("elasticsearch-bolt4", elasticSearchBolt).globalGrouping(RFI_BOLT_ID,"finalStream");

 

© 著作权归作者所有

共有 人打赏支持
季牧云
粉丝 21
博文 26
码字总数 20788
作品 0
浦东
其他
加载中

评论(7)

季牧云
季牧云

引用来自“断崖逐梦”的评论

又没了,可不可以连续,求助
你有什么问题吗
断崖逐梦
断崖逐梦
又没了,可不可以连续,求助
季牧云
季牧云

引用来自“断崖逐梦”的评论

��
新地址:https://git.oschina.net/jiyufei/HlasExample.git
断崖逐梦
断崖逐梦
��
断崖逐梦
断崖逐梦

引用来自“季牧云”的评论

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample

回复@季牧云 : 访问不了郁闷啊,麻烦提供可以访问的地址。
季牧云
季牧云

引用来自“断崖逐梦”的评论

分享一下代码吗
传到github了,https://github.com/anjiyufei/HlasExample
断崖逐梦
断崖逐梦
分享一下代码吗
基于Flume+Log4j+Kafka的日志采集架构方案

http://www.linuxidc.com/Linux/2016-05/131402.htm (一)kafka-jstorm集群实时日志分析 之 ---------kafka实时日志处理 ELK 实现 Java 分布式系统日志分析架构 ELK(ElasticSearch, Logstas...

cccyb
2016/12/12
42
0
2017年12月19-20日 阿里云 飞天/智能 云栖大会-北京峰会 会议笔记

2017年12月19-20日,在北京国家会议中心举办了云栖大会北京峰会,19日为Tech Insight,20日为主论坛和其他分论坛。场次很多,内容很丰富,自己的一些参会笔记整理如下: 20171219 上午 阿里云...

海洋的云
2017/12/21
0
0
Kafka是个奇葩!——Linkin论文学习笔记

是个消息中间件吗?那和市面上其他一堆堆的中间件例如ActiveMQ, RabbitMQ有什么区别? 答案只有一个: Kafka是个集群的消息中间件+存储,一个节点可以存储几T的数据! 为啥一个中间件需要存储...

难易
2014/08/26
0
9
从京东618数据井喷看大数据平台峰值处理制胜关键

作者介绍 京东集团618作战指挥中心,成员来自于京东各个技术体系,包括核心系统架构师、一线运维专家、科研学者等。近200位成员在618时共同努力,确保流量洪峰来临时系统安全、稳定、可靠,致...

博文视点
09/14
0
0
CTO详细讲解海量日志处理ELK

ELK实时日志分析平台之Elasticsearch简介 Elasticsearch是一个高度灵活的开源全文检索和分析引擎。它能够迅速(几乎是实时地)地存储、查找和分析大规模数据。通常被用在有复杂的搜索要求的系...

Java架构分享
05/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

职场老人谈:Linux学习分享

随着Linux系统慢慢渗透到生活的方方面面,对Linux的基本操作就越来越必要了。 我是一位软件测试工程师,初次接触Linux主要是因为需要配置Linux服务器。因为Linux系统是一套开源的操作系统,安...

linux-tao
16分钟前
1
0
Oracle chr() ascii()

函数简介 实用函数 chr() 和 ascii() chr() 函数将ASCII码转换为字符: ASCII码 –> 字符; ascii() 函数将字符转换为ASCII码: 字符 –> ASCII码; 在 Oracle 中 chr() 函数和 ascii() 是一对...

taadis
18分钟前
0
0
职场老人谈:Linux学习分享

随着Linux系统慢慢渗透到生活的方方面面,对Linux的基本操作就越来越必要了。 我是一位软件测试工程师,初次接触Linux主要是因为需要配置Linux服务器。因为Linux系统是一套开源的操作系统,安...

linuxprobe16
27分钟前
0
0
Confluence 6 Windows 中以服务方式自动重启的原因

针对长时间使用的 Confluence,我们推荐你配置 Confluence 自动随操作系统重启而启动。针对一些 Windows 的服务器,这意味着需要让 Confluence 以服务的方式运行。 有下面 2 种方式来以服务的...

honeymose
今天
3
0
day93-20180920-英语流利阅读-待学习

时尚之觞:外表光鲜靓丽,其实穷得要命 Lala 2018-09-20 1.今日导读 讲到时尚界,我们脑海里浮现的可能都是模特和设计师光鲜靓丽、从容潇洒的模样。可是,最近在法国出版的一本书却颠覆了我们...

飞鱼说编程
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部