文档章节

Flume-ng+kafka+storm+hbase 整合实例

西二旗之猫
 西二旗之猫
发布于 2017/03/17 18:40
字数 1721
阅读 1151
收藏 1

写在前面

        flume 版本:1.6.0 ,  kafka 版本:0.9.0,  storm 版本:1.0.2,  hbase 版本:1.2.0

        处理流程:整个流程为flume监听一个端口号随机发送字符串,经过kafka发送给storm,storm进行切割句子为单词并且统计单词个数,并且实时存入hbase.

         具体的流程:flume sources.type采用avro模式监听端口44444,sink发送数据给kafka,kafka创建topic-->fkshTopic,storm使用自带的kafkaSpout消费消息,切割字符串,统计单词个数,存入hbase.

 

配置flume-ng

         配置在flume-ng/conf 下新建配置文件 flume-conf-kafka.properties 内容如下

#单节点Flume配置  flume + kafka
#命名Agent a1的组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
#描述/配置Source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
 
#描述Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.topic = fkshTopic 
a1.sinks.k1.brokerList = 172.17.201.142:6667,172.17.201.70:6667 
a1.sinks.k1.requiredAcks = 1 
a1.sinks.k1.batchSize = 20 
a1.sinks.k1.channel = c1 

 
#描述内存Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
 
#为Channle绑定Source和Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

解释:

     sources type类型为:avro

     bind:本机

     port:监听的端口号

     sinks type: kafkaSink

     topic: fkshTopic ---> 消息队列的topic的名字

     brokerlist:kafka集群的broker 的主机和端口号

启动flume-ng

 cd /flume-ng/bin

执行

./flume-ng agent --conf ../conf --conf-file ../conf/flume-conf-kafka.properties --name a1 -Dflume.root.logger=INFO,console

编写 flume 模拟数据程序

package leap.flume;

import org.apache.flume.api.RpcClient;

import java.util.Random;

/**
 * Created by FromX on 2017/3/15.
 */
public class FlumeRpcApp {


    public static void main(String[] args) {

        RpcClient client = FlumeRPCClient.createClient("172.17.201.142", 44444);
        Random _rand = new Random();
        //随机发送句子

        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"
                ,"难道 没有 中文 吗"};

        for (int i=0;i<100;i++){
            String data =sentences[_rand.nextInt(sentences.length)];
            FlumeRPCClient.sendData(client,data);
            System.out.println("sendData--------->"+data);
        }
        client.close();

    }
}
package leap.flume;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;

/**
 * Created by FromX on 2017/3/15.
 * Flume RPC 远程调用 客户端
 */
public class FlumeRPCClient {
    /**
     * 创建一个默认的flume rpc client
     * @param hostname
     * @param port
     * @return
     */
    public static RpcClient createClient(String hostname, Integer port) {

//      Use the following method to create a thrift client (instead of the above line):
//      return RpcClientFactory.getThriftInstance(hostname, port);
        return RpcClientFactory.getDefaultInstance(hostname, port);

    }

    /**
     * 关闭客户端
     * @param client
     */
    public static void closeClient(RpcClient client) {
        client.close();
    }

    /**
     * 发送 数据到flume  如果发生异常 关闭客户端
     * @param client
     * @param data
     */
    public static void sendData(RpcClient client,String data) {

        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        // Send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
        }
    }



}

运行效果:

解释:向 flume-ng agent 所在的主机的44444端口随机发送字符串

启动一个kafka consumer 

这是一个可以选操作,目的是为了测试 flume-ng+kafka 是否成功,

./kafka-console-consumer.sh --zookeeper 172.17.201.142:2181,172.17.200.158:2181,172.17.201.15:2181 --from-beginning --topic flumeTopic

运行效果:

Storm程序进行 wordcount

如果对storm编程的基本概念不了解,

请参考:https://my.oschina.net/u/2969788/blog/859910

TopologyFksh

package leap.storm.fksh;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * Created by FromX on 2017/3/15.
 * 处理流程: flume--> kafka --> storm--> hbase
 */
public class TopologyFksh {


    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException, InterruptedException {

        TopologyBuilder builder = new TopologyBuilder();

        // config kafka spout   kafkaspout topic 的名称  这个topic名称可以在 flume的配置文件中
        String topic = "fkshTopic";
        ZkHosts zkHosts = new ZkHosts("c1.wb3.com:2181,c2.wb4.com:2181,m1.wb1.com:2181");
        //设置偏移量  每次不会从头开始读取 消息队列 而会从 上次读取的结尾读取 /fkshtest 目录用来存储偏移量 fkshApp 对应一个应用
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/fkshtest", "fkshApp");
        List<String> zkServers = new ArrayList<String>();
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        //超时时间 100分钟
        spoutConfig.socketTimeoutMs = 60 * 1000 * 100;
        // 定义输出格式为String
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        //创建  storm自带的kafkaspout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


        // set kafka spout  启用2个线程
        builder.setSpout("kafka-spout", kafkaSpout, 2);

        // set bolt   启动3个线程 随机分发
        builder.setBolt("line-spilt", new BoltSpilt(), 3).shuffleGrouping("kafka-spout");

        //  统计单词  fieldsGrouping  2线程  word 分组 分发
        builder.setBolt("word-counter", new BoltWordCount(), 2)
                .fieldsGrouping("line-spilt", new Fields("word"));

        /**
         * 提交任务配置
         */
        Config conf = new Config();
        conf.setDebug(false);

        // set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "172.17.201.142:6667,172.17.201.70:6667");
        //ack机智 配置
        props.put("request.required.acks", "1"); // 0  1 -1
        props.put("serializer.class", "kafka.serializer.StringEnscoder");
        conf.put("kafka.broker.properties", props);

        //如果有任务id参数 则进行集群提交
        if (args != null && args.length > 0) {
            conf.setNumWorkers(5);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        }
        //如果没有参数  默认本地提交任务
        else {
            //设置任务最大进程数
            conf.setMaxTaskParallelism(3);
            //创建本地集群
            LocalCluster cluster = new LocalCluster();
            //提交任务 任务id word-count
            cluster.submitTopology("word-count", conf, builder.createTopology());
            // 睡眠 10000*6毫秒后手动结停止本地集权
            Thread.sleep(10000 * 6);
            cluster.shutdown();
        }

    }
}

解释 :

      kafkaspout: 使用storm提供的 kafkaspout来消费kafka消息,其中构造kafkaspout的参数为 zookeeper集群,端口号,偏移量,应用名称,应用名称随便起不重复就行,偏移量用来记录kafka消息队列的消费情况,不会每次都重新消费.

        任务提交:任务提交的时候,需要特别注意在配置项, props里面需要制定 kafka的brokerlist

        BoltSpilt:用来切割字符串为单词发送给下游

        BoltWordCount:统计单词个数并且存入hbase

BoltSpilt

package leap.storm.fksh;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Created by FromX on 2017/3/15.
 * 切割 单句为 单词
 */
public class BoltSpilt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                word = word.toLowerCase();
                // Emit the word
                List a = new ArrayList();
                a.add(input);
                collector.emit(a, new Values(word));
            }
        }
        //确认成功处理一个tuple
        collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

BoltWordCount

package leap.storm.fksh;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * Created by FromX on 2017/3/15.
 * <p>
 * 统计单词数 并且存入hbase
 */
public class BoltWordCount extends BaseRichBolt {

    private OutputCollector collector;

    //参数设置 hbase 连接参数
    private static Configuration conf;

    private static HTable table;

    Map<String, Integer> counts = new HashMap<String, Integer>();

    /**
     * 初始化  collector 和hbase连接
     * @param stormConf
     * @param context
     * @param collector
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum", "172.17.201.142:2181,172.17.200.158:2181,172.17.201.15:2181");
        conf.set("hbase.master", "172.17.201.142" + ":60000");
        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
        // 表名
        String tableName = "stormTable";
        try {
            table = new HTable(conf, tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 统计并且存入hbase
     *
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple) {


        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);

        //行key 随机生成一个
        UUID uuid = UUID.randomUUID();
        String rowKey = uuid.toString();
        //列族
        String columnFamily = "WCFy";
        //标识符
        String identifier = "WC";
        Put p1 = new Put(Bytes.toBytes(rowKey));
        p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(identifier), Bytes.toBytes(counts.toString()));
        try {
            table.put(p1);
        } catch (IOException e) {
            e.printStackTrace();
        }

        collector.emit(new Values(word, count));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("resoult"));
    }
}

解释:

  初始化: prepare方法中初始化了 hbase的连接 

  注意: 务必预先 建好 hbase的表 建表语句      create "stormTable","WCFy"

提交storm任务

将TopologyFksh 达成jar 提交到storm集群即可

./storm jar poc.jar leap.storm.fksh.TopologyFksh fksh

关于如何打包参考:https://my.oschina.net/u/2969788/blog/859910

提交完任务之后 可以在 storm ui 中查看 任务

重新运行--FlumeRpcApp--main  产生随机字符串

在 storm ui中可以看到 tupl 元组的 emit情况

hbase中查看结果

scan "stormTable"

结果截图

 

源码参考:https://git.oschina.net/z2q_m/poc/tree/branch1

© 著作权归作者所有

西二旗之猫
粉丝 7
博文 54
码字总数 39081
作品 0
私信 提问
SparkStreaming整合flume

SparkStreaming整合flume 在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。 1.首...

强行快乐~
07/22
0
0
Apache Flume 1.6.0 发布,日志服务器

Apache Flume 1.6.0 发布,此版本现已提供下载: http://flume.apache.org/download.html 更新内容: ** Bug 修复 [FLUME-1793] - Unit test TestElasticSearchLogStashEventSerializer fail......

oschina
2015/06/03
3.1K
2
Apache Flume 1.5.0 发布,日志服务器

Apache Flume 1.5.0 发布,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 改进内容包括...

oschina
2014/05/22
2.8K
4
Apache Flume 1.9.0 发布,日志服务器

Apache Flume 1.9.0 发布了,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 本次更新如...

程六金
01/10
1K
4
Apache Flume 1.7.0 发布,日志服务器

Apache Flume 1.7.0 发布了,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 本次更新如...

局长
2016/10/19
2.1K
3

没有更多内容

加载失败,请刷新页面

加载更多

skywalking(容器部署)

skywalking(容器部署) 标签(空格分隔): APM [toc] 1. Elasticsearch SkywalkingElasticsearch 5.X(部分功能报错、拓扑图不显示) Skywalking需要Elasticsearch 6.X docker network create......

JUKE
21分钟前
4
0
解决Unable to find a single main class from the following candidates [xxx,xxx]

一、问题描述 1.1 开发环境配置 pom.xml <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><!--一定要对上springboot版本号,因......

TeddyIH
22分钟前
4
0
Dubbo服务限制大数据传输抛Data length too large: 13055248, max payload: 8388608解决方案

当dubbo服务提供者向消费层传输大数据容量数据时,会受到Dubbo的限制,报类似如下异常: 2019-08-23 11:04:31.711 [ DubboServerHandler-XX.XX.XX.XXX:20880-thread-87] - [ ERROR ] [com.al...

huangkejie
25分钟前
4
0
HashMap和ConcurrentHashMap的区别

为了线程安全,ConcurrentHashMap 引入了一个 “分段锁” 的概念。具体可以理解把一个大的 map 拆分成 N 个小的 Map 。最后再根据 key.hashcode( )来决定放到哪一个 hashmap 中去。 hashmap ...

Garphy
25分钟前
3
0
购买SSL证书需要注意哪些问题

为了保障网站的基本安全,为网站部署SSL证书,已经是一种常态了。各大浏览器对于安装了SSL证书的网站会更友好,并且不会发出“不安全”的提示。部署SSL证书之前首先得去给网站购买一个SSL证书...

安信证书
55分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部