storm+kafka集成

原创
2017/06/20 10:58
阅读数 521

POM 

    <properties>
        <storm.version>1.1.0</storm.version>
        <kafka.version>0.10.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!-- 由于storm环境中有该jar,所以不用pack到最终的task.jar中 -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
            <scope>${provided.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${storm.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

toploploy

public class KafkaTopology {

    /**
     *  storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2
     */
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200);

        /**
         * 1)shuffleGrouping(随机分组)
         * 2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)
         * 3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)
         * 4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)
         * 5)noneGrouping(随机分派)
         * 6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)
         * 7)Local or shuffle Grouping
         * 8)customGrouping (自定义的Grouping)
         */
        builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout");

        builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter");

        builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis");

        Config conf = new Config();
//        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(5);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("x2", conf, builder.createTopology());
            cluster.shutdown();
        }
    }

    /**
     * 获取kafka spout 配置
     */
    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2")
                                                                            .setGroupId("kafka")
                                                                            .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                                                                            .build();
    }

    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
    /**
     * Needs to be serializable
     */
    private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
        @Override
        public List<Object> apply(ConsumerRecord<String, String> record) {
            return new Values(record.value());
        }
    }
}

Blot

public class FilterBolt extends BaseRichBolt{
    
    private OutputCollector outputCollector;

    /**
     * 初始化工作
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    /**
     * 执行逻辑,目的是过滤无用的字符串
     */
    @Override
    public void execute(Tuple tuple) {
        String value = tuple.getString(0);

        // 提交下一个
        if (null != value && !"".equals(value)){
            try {
                int val = Integer.parseInt(value);
                outputCollector.emit(new Values(val));
            }catch (Exception e){
                // ignore
                System.out.println(value + "不是数字!略过!!!");
            }

        }
        // 返回确认
        outputCollector.ack(tuple);
    }

    /**
     * 申明传入到一个Bolt的字段名称
     * 通过 input.getStringByField("str");input.getIntegerByField("int"); 获得指定的
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("value"));
    }
}

 

其它blot略

代码写好后。使用maven打成jar包。

放在集群Nimbus的某个目录下

storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部