文档章节

storm+kafka集成

杰仪
 杰仪
发布于 2017/06/20 10:58
字数 499
阅读 117
收藏 0

POM 

    <properties>
        <storm.version>1.1.0</storm.version>
        <kafka.version>0.10.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!-- 由于storm环境中有该jar,所以不用pack到最终的task.jar中 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
            <scope>${provided.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${storm.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

toploploy

public class KafkaTopology {

    /**
     *  storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2
     */
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200);

        /**
         * 1)shuffleGrouping(随机分组)
         * 2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)
         * 3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)
         * 4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
         * 5)noneGrouping(随机分派)
         * 6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
         * 7)Local or shuffle Grouping
         * 8)customGrouping (自定义的Grouping)
         */
        builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout");

        builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter");

        builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis");

        Config conf = new Config();
//        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(5);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("x2", conf, builder.createTopology());
            cluster.shutdown();
        }
    }

    /**
     * 获取kafka spout 配置
     */
    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2")
                                                                            .setGroupId("kafka")
                                                                            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                                                                            .build();
    }

    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
    /**
     * Needs to be serializable
     */
    private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, String> record) {
            return new Values(record.value());
        }
    }
}

Blot

public class FilterBolt extends BaseRichBolt{
    
    private OutputCollector outputCollector;

    /**
     * 初始化工作
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    /**
     * 执行逻辑,目的是过滤无用的字符串
     */
    @Override
    public void execute(Tuple tuple) {
        String value = tuple.getString(0);

        // 提交下一个
        if (null != value && !"".equals(value)){
            try {
                int val = Integer.parseInt(value);
                outputCollector.emit(new Values(val));
            }catch (Exception e){
                // ignore
                System.out.println(value + "不是数字!略过!!!");
            }

        }
        // 返回确认
        outputCollector.ack(tuple);
    }

    /**
     * 申明传入到一个Bolt的字段名称
     * 通过 input.getStringByField("str");input.getIntegerByField("int"); 获得指定的
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("value"));
    }
}

 

其它blot略

代码写好后。使用maven打成jar包。

放在集群Nimbus的某个目录下

storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2

 

© 著作权归作者所有

下一篇: storm集群安装
杰仪
粉丝 1
博文 52
码字总数 10231
作品 0
成都
私信 提问
大数据storm应该怎么学?

帮助大家在业务上面快速使用Storm,相信学完之后可以直接面对生成环境的问题,解决问题,优化结构。项目使用Storm0.9.1+Kafka0.8.1.1+Zookeeper锁+Memcached+mysql架构,从集群搭建到集群连通...

让往事随风
2016/04/19
69
0
emsite 全自动分布式开源框架发布 2.0.0 版本

hi,小伙伴们好, 平台简介: emsite框架是一个分布式的后台全自动快速开发框架,采用dubbo作为服务层框架,后台将集成单点登录、Auth2.0、storm+kafka消息处理系统、kafka+ flume+storm+hdfs...

em_aaron
2018/04/07
1K
2
storm+kafka集成简单应用

这两天公司要学习kafka,结合之前的storm,做了一个简单的集成,之前也参考了网上的例子一些例子,发现或多或少都有一些问题。所以自己做了一个。 这个是网上其他人遇到的问题,给摘录一下,...

choulanlan
2016/06/23
0
0
storm+kafka:WordCount程序

简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic。 首先规划需要用到的类: 从KafkaSpout接收...

choulanlan
2016/06/23
0
0
storm kafka 编程指南

一、原理及关键步骤介绍 storm中的storm-kafka组件提供了storm与kafka交互的所需的所有功能,请参考其官方文档:https://github.com/apache/storm/tree/master/external/storm-kafka#broker...

大数据之路
2012/06/27
653
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
昨天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
昨天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部