文档章节

业务系统-kafka-Storm【日志本地化】 - 1 将日志文件打印到local

止静
 止静
发布于 2014/09/10 15:30
字数 1044
阅读 2837
收藏 5

阅读前提:

        1 : 您可能需要对  logback 日志系统有所了解

        2 :您可能需要对于 kafka 有初步的了解

        3:请代码查看之前,请您仔细参考系统的业务图解

       

    由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:

        业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS


    1: 一个正式环境系统的系统设计图解:

                

              通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:

            其一: 实时通道

            其二:离线通道


       在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。

      也就是:

            如下图所示:

             







      在kafka之中,通常而言,有如下的 代码 用来处理:

         在这里我们针对了2种日志,有两个Consumer用来处理

package com.mixbox.kafka.consumer;

public class logSave {

	public static void main(String[] args) throws Exception {

		Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
		visitlog.start();

		Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
		orderlog.start();

	}
}


     在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。

package com.mixbox.kafka.consumer;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * @author Yin Shuai
 */
public class Consumer_Thread extends Thread {

	// 在事实上我们会依据传递的topic名称,来生成不桐的记录机器
	// private Logger _log_order = LoggerFactory.getLogger("order");
	// private Logger _log_visit = LoggerFactory.getLogger("visit");

	private Logger _log = null;

	private final ConsumerConnector _consumer;
	private final String _topic;

	public Consumer_Thread(String topic) {

		_consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig());
		this._topic = topic;

		_log = LoggerFactory.getLogger(_topic);

		System.err.println("log的名称" + _topic);

	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KafkaProperties.zkConnect);
		// 在这里我们的组ID为logSave
		props.put("group.id", KafkaProperties.logSave);
		props.put("zookeeper.session.timeout.ms", "100000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);

	}

	public void run() {

		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(_topic, new Integer(1));

		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer
				.createMessageStreams(topicCountMap);

		for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) {
			ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
			while (iterator.hasNext()) {
				MessageAndMetadata<byte[], byte[]> next = iterator.next();
				try {

					// 在这里我们分拆了一个Consumer 来处理visit日志
					logFile(next);
					System.out.println("message:"
							+ new String(next.message(), "utf-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void logFile(MessageAndMetadata<byte[], byte[]> next)
			throws UnsupportedEncodingException {
		_log.info(new String(next.message(), "utf-8"));
	}

}



    一个简单的小tips:

        logback.xml  ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。

        

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>

	<jmxConfigurator />
	<!-- 控制台输出日志 -->
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">

		<!-- 过滤掉 TRACE 和 DEBUG 级别的日志 -->
		<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> -->
		<!-- <level>INFO</level> -->
		<!-- </filter> -->

		<!-- 按天来回滚,如果需要按小时来回滚,则设置为{yyyy-MM-dd_HH} -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>f:/opt/log/test.%d{yyyy-MM-dd}.log</fileNamePattern>
			<!-- 如果按天来回滚,则最大保存时间为1天,1天之前的都将被清理掉 -->
		</rollingPolicy>

		<!-- 日志输出格式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>
				%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>



	<!-- 记录到日志 文件的滚动日志 -->
	<appender name="ERROR"
		class="ch.qos.logback.core.rolling.RollingFileAppender">

		<file>
			e:/logs/error/error.log
		</file>
		<filter class="ch.qos.logback.classic.filter.LevelFilter">
			<level>
				ERROR
			</level>
			<onMatch>ACCEPT</onMatch>
			<onMismatch>DENY</onMismatch>
		</filter>
		<!-- 定义每天生成一个日志文件 -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>e:/logs/yuanshi-%d{yyyy-MM-dd}.log</fileNamePattern>
			<MaxHistory>10</MaxHistory>
		</rollingPolicy>

		<!-- 日志样式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>


	<!-- 记录到日志 文件的滚动日志 -->
	<appender name="FILE"
		class="ch.qos.logback.core.rolling.RollingFileAppender">

		<file>E:\logs\file\file.log</file>

		<filter class="ch.qos.logback.classic.filter.LevelFilter">
			<level>INFO</level>
			<onMatch>ACCEPT</onMatch>
			<onMismatch>DENY</onMismatch>
		</filter>

		<!-- 定义每天生成一个日志文件 -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>e:/logs/venality-%d{yyyy-MM-dd}.log
			</fileNamePattern>
			<MaxHistory>10</MaxHistory>
		</rollingPolicy>

		<!-- 日志样式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>


	<appender name="visit"
	class="ch.qos.logback.core.rolling.RollingFileAppender">
		<File>
			E:\logs\visitlog\visit.log
		</File>
		<encoder>
			<pattern>%msg%n</pattern>
		</encoder>

		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
			<level>INFO</level>
		</filter>
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>E:\logs\visit.log.%d{yyyy-MM-dd}
			</fileNamePattern>
		</rollingPolicy>
	</appender>
	<logger name="visit" additivity="false" level="INFO">
		<appender-ref ref="visit" />
	</logger>


	<appender name="order"
		class="ch.qos.logback.core.rolling.RollingFileAppender">
		<File>
			E:\logs\orderlog\order.log
		</File>
		<encoder>
			<pattern>%msg%n
			</pattern>
		</encoder>

		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
			<level>INFO</level>
		</filter>
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>E:\logs\order.log.%d{yyyy-MM-dd}
			</fileNamePattern>
		</rollingPolicy>
	</appender>
	<logger name="order" additivity="false" level="INFO">
		<appender-ref ref="order" />
	</logger>


	<root level="DEBUG">
		<appender-ref ref="FILE" />
	</root>
</configuration>


© 著作权归作者所有

止静
粉丝 120
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
2018/07/09
0
0
Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中《大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是...

xpleaf
2018/04/16
0
0
大数据经典学习路线(及供参考)之 三

3.Storm实时计算部分阶段 实时课程分为两个部分:流式计算核心技术和流式计算计算案例实战。 1.流式计算核心技术 流式计算核心技术主要分为两个核心技术点:Storm和Kafka,学完此阶段能够掌握...

柯西带你学编程
2018/05/22
0
0
SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成

开发者社区活动,SODBASE产品的用户现在可以领礼品啦 Storm框架原本是设计用来做互联网短文本处理和一些统计工作的,是一种分布式流式计算框架。在一些场合,特别是在已经用了Storm架构以后,...

wishuhappyyear
2015/04/30
0
0
Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli
2015/06/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

2019年普通高校在川招生专业及名额介绍文科 带学费

2019年普通高校在川招生专业及名额介绍文科 带学费

asdtiang
10分钟前
0
0
springCloud配置中心config配置svn(踩坑记录)(基于consul)

新建一个config Server模块; 引入如下依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId></dependency>......

为何不可1995
10分钟前
0
0
mysql相关tips(持续更新)

1.字符集:utf8mb4 mysql 5.5.3之后出来的字符集,占用1-4个字节,最大占用的字节数为4.目前这个字段主要应用在(Emoji表情)。utf8mb4兼容utf8(1-3个字节),且比utf8能表示更多的字符。什...

lara_
10分钟前
0
0
微服务开源生态报告 No.1

从关注开源,到使用开源,再到参与开源贡献,越来越多的国内开发者通过开源技术来构建业务。 截止目前,Arthas / Dubbo / ChaosBalde / Nacos / RocketMQ / Seata / Sentinel / Spring Clou...

阿里云官方博客
13分钟前
1
0
MaxCompute 费用暴涨之存储压缩率降低导致SQL输入量变大

现象:同样的SQL,每天处理的数据行数差不多,但是费用突然暴涨甚至会翻数倍。 分析: 我们先明确MaxCompute SQL后付费的计费公式:一条SQL执行的费用=扫描输入量 ️ SQL复杂度 ️ 0.3(¥/GB...

zhaowei121
15分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部