Storm集成Redis

原创
2016/01/20 14:03
阅读数 1.9K

一、Storm集成redis前的准备

    首先是需要安装storm和redis环境,具体安装方法可分别去http://storm.apache.org/http://redis.io查询,安装完成之后 需要准备运行所需要的jar包,除了storm本身lib下的jar包外,操作redis还需要guava-12.0.1、jedis-2.7.2、commons-pool2-2.3.jar、hamcrest-core-1.3.jar和storm-redis-0.10.0.jar,将其引入即可

二、代码

    具体的实例代码在storm-1.10.0的版本中自带,具体可以去下载storm的1.10.0的源码中自行查找,下面将wordcount的集成实例代码列下,spout/bolt的topology主要需要三个文件,分别为PersistentWordCount.java、WordSpout.java和WordCount.java,代码如下:(对trident感兴趣的,源码中也有实例程序,可自行查找学习)

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.redis.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
public class PersistentWordCount {
    private static final String WORD_SPOUT = "WORD_SPOUT";
    private static final String COUNT_BOLT = "COUNT_BOLT";
    private static final String STORE_BOLT = "STORE_BOLT";
    private static final String TEST_REDIS_HOST = "127.0.0.1";
    private static final int TEST_REDIS_PORT = 6379;
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        String host = TEST_REDIS_HOST;
        int port = TEST_REDIS_PORT;
        if (args.length >= 2) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(host).setPort(port).build();
        WordSpout spout = new WordSpout();
        WordCounter bolt = new WordCounter();
        RedisStoreMapper storeMapper = setupStoreMapper();
        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
        // wordSpout ==> countBolt ==> RedisBolt
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(WORD_SPOUT, spout, 1);
        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
        builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
        if (args.length == 2) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Thread.sleep(30000);
            cluster.killTopology("test");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 3) {
            StormSubmitter.submitTopology(args[2], config, builder.createTopology());
        } else {
            System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
        }
    }
    private static RedisStoreMapper setupStoreMapper() {
        return new WordCountStoreMapper();
    }
    private static class WordCountStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "wordCount";
        public WordCountStoreMapper() {
            description = new RedisDataTypeDescription(
                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
        @Override
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
        @Override
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("word");
        }
        @Override
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getStringByField("count");
        }
    }
}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.redis.topology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
public class WordSpout implements IRichSpout {
    boolean isDistributed;
    SpoutOutputCollector collector;
    public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" };
    public WordSpout() {
        this(true);
    }
    public WordSpout(boolean isDistributed) {
        this.isDistributed = isDistributed;
    }
    public boolean isDistributed() {
        return this.isDistributed;
    }
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void close() {
    }
    public void nextTuple() {
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        this.collector.emit(new Values(word), UUID.randomUUID());
        Thread.yield();
    }
    public void ack(Object msgId) {
    }
    public void fail(Object msgId) {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.redis.topology;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.common.collect.Maps;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;
public class WordCounter implements IBasicBolt {
    private Map<String, Integer> wordCounter = Maps.newHashMap();
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
    }
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        int count;
        if (wordCounter.containsKey(word)) {
            count = wordCounter.get(word) + 1;
            wordCounter.put(word, wordCounter.get(word) + 1);
        } else {
            count = 1;
        }
        wordCounter.put(word, count);
        collector.emit(new Values(word, String.valueOf(count)));
    }
    public void cleanup() {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

三、查看结果

    运行PersistentWordCount,在命令行中键入

src/redis-cli

进入到redis的client,输入命令

HGETALL wordCount

即可查看到结果

展开阅读全文
打赏
1
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部