文档章节

flume+kafka+storm运行实例

 张欢19933
发布于 2016/02/04 11:33
字数 1528
阅读 1103
收藏 9

概述

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive或者mr来实现统计分析,但是对于实时的需求Hive和mr就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
直接使用Storm的Topology对数据进行实时分析处理
整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

flume+kafka+storm相结合,此时,flume作为数据来源收集数据,kafka作为消息队列,起缓冲作用,storm从kafka拉取数据分析处理。做软件开发的都知道模块化思想,这样设计的原因有两方面:
一方面是可以模块化,功能划分更加清晰,从“数据采集--数据接入--流式计算--数据输出/存储”

1).数据采集
负责从各节点上实时采集数据,选用cloudera的flume来实现
2).数据接入
由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka
3).流式计算
对采集到的数据进行实时分析,选用apache的storm
4).数据输出
对分析后的结果持久化,暂定用mysql
另一方面是模块化之后,假如当Storm挂掉了之后,数据采集和数据接入还是继续在跑着,数据不会丢失,storm起来之后可以继续进行流式计算;

数据来源flume

Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka。我们根据情况选择合适的source,这里我用的是exec,channel是memory,sink当然就是kafkasink。详细配置如下:

flume到kafka

flume到kafka的传输过程如下图:

kafka的配置跟之前搭建的没有什么改动。

测试flume到kafka

flume和kafka配置好以后,先启动flume集群,这里是后台运行:

flume-ng agent -n agent -c /usr/local/apache-flume-1.6.0-bin/conf  -f /usr/local/apache-flume-1.6.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG,console &

然后启动zookeeper:

./zkServer.sh start

接着启动kafka集群,这里是后台运行:

./kafka-server-start.sh ../config/server.properties &

然后向监控的文件里输入数据:

echo 'hello world' >> topic-test.txt

接着在kafka集群上创建消费者,测试flume到kafka是否联通,当然也可以使用kafka监控工具查看:

我们可以事先创建好topic,当然我们也可以自动创建topic,设置kafka auto.create.topics.enable属性为true,默认就为true。

./kafka-console-consumer.sh  --zookeeper master:2181 --from-beginning --topic topic1
这边输出'hello world'则表明flume到kafka连接成功。

 storm读取kafka数据分析编程

首先搭建好storm集群,启动nimbus、supervisor、ui

然后topology编程,我这里是java编程的一个小例子:

主类

package com.kafka_storm;
import java.util.HashMap;
import java.util.Map;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
public class StormKafkaTopo {   
    public static void main(String[] args) throws Exception { 
     // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("master:2181");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        //这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");
        
        // 配置KafkaBolt中的kafka.broker.properties(可以参考kafka java编程)
        Config conf = new Config();  
        Map<String, String> map = new HashMap<String, String>(); 
        // 配置Kafka broker地址       
        map.put("metadata.broker.list", "master:9092");
        // serializer.class为消息的序列化类
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "topic2");
         
        //默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 
        builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");        
 
        if (args != null && args.length > 0) {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else {  
   
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }  
}

消息处理

package com.kafka_storm;
import java.io.UnsupportedEncodingException;
import java.util.List;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
 * 使用KafkaSpout时需要子集实现Scheme接口,它主要负责从消息流中解析出需要的数据
 * @author lenovo
 *
 */
public class MessageScheme implements Scheme { 
     
    /* (non-Javadoc)
     * @see backtype.storm.spout.Scheme#deserialize(byte[])
     */
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8"); 
            return new Values(msg);
        } catch (UnsupportedEncodingException e) {  
          
        }
        return null;
    }
     
     
    /* (non-Javadoc)
     * @see backtype.storm.spout.Scheme#getOutputFields()
     */
    public Fields getOutputFields() {
        // TODO Auto-generated method stub
        return new Fields("msg");  
    }  
}

bolt

package com.kafka_storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SenqueceBolt extends BaseBasicBolt{
     
    /* (non-Javadoc)
     * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        // TODO Auto-generated method stub
         String word = (String) input.getValue(0);  
         String out = "I'm " + word +  "!";  
         System.out.println("out=" + out);
         collector.emit(new Values(out));
    }
     
    /* (non-Javadoc)
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

在集群上运行

我们要将引入的第三方包全部放到storm的lib包下面,包括kafka、zookeeper的,否则会报缺失jar包的错

storm jar StormKafkaDemo.jar com.kafka_storm.StormKafkaTopo StormKafkaDemo

开始总体测试:

向flume监控的文件输入数据,在storm的log日志里查看输出,当然我们也可以在kafka里查看,因为我将结果输出到kafka里了,topic为topic2。

日志里结果如下:

 

到此,flume+kafka+storm结合使用结束

 

© 著作权归作者所有

粉丝 46
博文 538
码字总数 247200
作品 0
海淀
私信 提问
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
693
1
【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753
2018/05/06
0
0
#研发解决方案介绍#Recsys-Evaluate(推荐评测)

推荐系统可不仅仅是围着推荐算法打转 先明确一下,我们属于工业领域。很多在学术论文里行之有效的新特奇算法,在工业界是行不通的。当年我们做语义聚合时,分词、聚类、相似性计算、实体词识...

旁观者-郑昀
2014/12/17
150
1
#研发解决方案介绍#Recsys-Evaluate(推荐评测)

关键词:recsys、推荐评测、Evaluation of Recommender System、piwik、flume、kafka、storm、redis、mysql 本文档适用人员:研发 推荐系统可不仅仅是围着推荐算法打转 先明确一下,我们属于...

郭恩洲_OSC博客
2015/02/26
146
1
Flume+Kafka+storm流式计算——文件分组

如题,我用Flume+Kafka监控一个文件夹下的小文件输入,把各个数据流传输到storm集群进行数据计算。 由于每个小文件中内容有业务计算逻辑,所以我在flume传输到kafka时,修改了Kafkasink代码,...

诺诺
2016/06/20
489
0

没有更多内容

加载失败,请刷新页面

加载更多

vue vue-router beforeRouteEnter

本文转载于:专业的前端网站➬vue vue-router beforeRouteEnter beforeRouteEnter (to, from, next) { // 在渲染该组件的对应路由被 confirm 前调用 // 不!能!获取组件实例 `thi...

前端老手
46分钟前
4
0
Spring Boot 2 实战:结合 Docker 容器化

1. 前言 Docker 是啥?好处是啥? 什么现在如果作为一名开发你还不知道甚至没有用过我都感觉你落伍了。Docker 是一种虚拟化的容器技术,目的为了打造持续集成、版本控制、可移植性、隔离性和...

码农小胖哥
50分钟前
4
0
Linux 常用查询句柄的命令(Too many open files)

摘要 查询命令 进程占用的句柄总数 lsof -n | wc -l 查看哪一个进程占用的句柄最多 lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|more 第一列 是 占用句柄数; 第二列 是 进程号 PID;...

liangxiao
50分钟前
5
0
JavaScript权威指南笔记5

第五章、语句 0、概述 JS程序是一系列可执行语句的集合。 通过控制语句来改变语句的默认执行顺序。 控制语句:条件、循环和跳转(如return、break和throw) 2、复合语句 当多条语句被当做一条语...

_Somuns
56分钟前
5
0
vmware安装ubuntu18.04总是 panic -not syncing:corrupted stack end detected inside schedule

Vmware 安装ubuntu 总是卡着不动, 提示panic -not synciong 网上提示很多办法,都试了效果不佳, 找到了完美解决办法 你的兼容模式重新选一下为6.0,兼容性对硬件有要求的 即可实现 成功安装...

dragon_tech
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部