文档章节

Storm 【开发细节】 - geting Start with Storm

止静
 止静
发布于 2014/06/25 14:24
字数 402
阅读 288
收藏 0
package test;

import java.io.IOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.copyFromClass.TestWordSpout;

import com.esotericsoftware.minlog.Log;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

// 测试目的,在这里我们需要测试一下当前Spout 不断产生数据的过程
public class testWordSpoutTopology {

	public static class TestSimpleBolt extends BaseBasicBolt {

		@Override
		public void execute(Tuple input, BasicOutputCollector collector) {
			System.out.println(input.toString());
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			System.out.println("Method declare");
		}
	}
	

	public static void main(String[] args) throws IOException {

	
	        // 首先,我们必须建立一个新的TopologyBuilder
		TopologyBuilder builder = new TopologyBuilder();
		
		//其次,我们需要配置如下的组件: 1 Spout,2Bolt
		builder.setSpout("word-emit-byThread", new TestWordSpout());
		
		//在这个Spout之中,我们约定将 【word-emit-byThread】Spout组件 发射的元祖进行
		shuffleGrouping
		
		builder.setBolt("word-show", new TestSimpleBolt()).shuffleGrouping(
				"word-emit-byThread");
		Config config = new Config();
		config.setDebug(false);

		//最后进行本地提交
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("simple", config, builder.createTopology());
	}
}


以上,

testWordSpoutTopology

是我们运行的主类



package storm.copyFromClass;

import backtype.storm.Config;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



//
public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
        
    // 发送
    public void nextTuple() {
        Utils.sleep(100);
	final String[] words = new String[] { "张兵", "吴哥", "仝志维", "前辈", "禅师"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    //在这里,我们没有进行ACK
    public void ack(Object msgId) {

    }

    //在这里,我们没有进行fail
    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }    
}


结果:

    

      请注意在这里,我们的Stream 默认的id为空





© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Apache Slider + Storm

Apache Slider + Storm 系统环境 安装如下组件,部署可用环境 JDK 1.7.0_79 Apache Zookeeper 3.4.* Apache Zookeeper Apache Hadoop 2.6.* Apache Hadoop Apache Storm 0.9.4 Apache Storm......

Yulong_
2016/09/21
504
0
storm启动卡着不动,求高手

刚把storm集群部好,结果启动时卡着不动。 执行:./storm nimbus 提示: [root@WY_CentOS_100G bin]# ./storm nimbus Running: java -server -Dstorm.options= -Dstorm.home=/usr/local/src......

lja
2013/07/11
2.2K
1
storm client command

最近在研究实时日志分析,storm确实不错,以下是命令参数: storm help Syntax: storm jar topology-jar-path class 运行jar包中类的主函数和指定的参数 Commands: activate storm activate ...

China_OS
2014/02/22
1K
0
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
789
1
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
2019/07/20
868
0

没有更多内容

加载失败,请刷新页面

加载更多

2020,向死而生

或许2020年注定是非常艰难的一年,毕竟两个轮回前之的1900年,清廷过得也很艰难,义和团在北方闹得轰轰烈烈,紫禁城也被八国联军占领。次年(1901年)即签订了后世所谓的丧权辱国的辛丑条约,...

嘉树
16分钟前
3
0
git 常用配置

git config --global core.compression 0 git config --global http.lowSpeedLimit 0 git config --global http.lowSpeedTime 999999 git config --global http.postBuffer 524288000......

老码农008
16分钟前
6
0
Protel99SE WIN10系统下无法添加封装库的解决方法

Protel99SE WIN10系统下无法添加封装库的解决方法 Protel99SE这款PCB设计软件实在太古老了,导致与微软的最新操作系统有些功能不能兼容,比如WIN10系统下无法添加封装库;但是由于Protel99S...

demyar
17分钟前
3
0
大数据风控系统概述

为什么要做风控系统 不做的话,会有以下风险: 各种小号、垃圾账号泛滥 撞库攻击、盗号、毁号、拖库等 拉新 10w 留存率不到 5% 百万营销费用,却增加不了用户粘性 投票票数差距非常悬殊 各种...

大数据技术进阶
18分钟前
5
0
串口调试助手,VB6.0开发

1、为什么要自己开发一个串口调试助手 通常我们都是:在网上直接下载一个串口助手,可执行文件,直接使用,并无法得到其源码,在此我们提供了一个VB6.0开发的串口助手: (1)让你极速掌握串...

superman150
21分钟前
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部