Storm-编程入门
Storm-编程入门
如风达 发表于3年前
Storm-编程入门
  • 发表于 3年前
  • 阅读 45
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   


一 编程接口

 

 

 



Spout 接口

 

Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。需要根据情况实方法有:

 

open方法

 

当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:

 

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {   

          _collector = collector;  

}  

declareOutputFields方法

 

此方法用于声明当前SpoutTuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中参数为域名。示例如下:

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {  

        declarer.declare(new Fields("word"));  

}

getComponentConfiguration方法

 

此方法定义在BaseComponent类内,用于声明针对当前组件的特殊的Configuration配置。示例如下:

 

public Map<String, Object> getComponentConfiguration() {  

         if(!_isDistributed) {  

           Map<String, Object> ret = new HashMap<String, Object>();  

           ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3);  

           return ret;  

         } else {  

             return null;  

         }  

 }  

这里便是设置了Topology中当前Component的线程数量上限。

 

nextTuple方法

 

这是Spout类中最重要的一个方法。发射一个TupleTopology都是通过这个方法来实现的。示例如下:

 

public void nextTuple() {  

     Utils.sleep(100);  

     final String[] words = new String[] {"twitter","facebook","google"};  

     final Random rand = new Random();  

     final String word = words[rand.nextInt(words.length)];  

     _collector.emit(new Values(word));  

}  

这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology

 

另外,除了上述几个方法之外,还有ackfailclose方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。

 

Bolts 接口

 

Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。Bolt类需要实现的主要方法有:

 

prepare方法

 

此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。示例如下:

 

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {         

         _collector = collector;  

注:BoltTuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。

 

declareOutputFields 方法

 

用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。示例如下:

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {  

       declarer.declare(new Fields("obj", "count", "length"));  

}  

此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "length"

 

getComponentConfiguration方法

 

Spout类一样,在Bolt中也可以有getComponentConfiguration方法。示例如下:

 

public Map<String, Object> getComponentConfiguration() {  

         Map<String, Object> conf = new HashMap<String, Object>();  

         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);  

         return conf;  

此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。

 

execute方法

 

这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。

 

(1) emit有一个参数:此唯一的参数是发送到下游BoltTuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

 

(2) emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。

 

这两种情况要根据自己的场景来确定。示例如下:

 

public void execute(Tuple tuple) {  

          _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));  

          _collector.ack(tuple);  

}  

注:输入Tuple一般在最后一行被ack

 

public void execute(Tuple tuple) {  

        _collector.emit(new Values(tuple.getString(0) + "!!!"));  

 } 

此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法类似,都是在当前Component关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。

 

注:cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。

 

有几点需要说明的地方:

 

1.每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。

 

2.open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,即负责运行组件中的task的线程的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。

 

3.nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射TupleBoltexecute不断的接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。

 

4.在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(Supervisor节点),在每一个任务上反序列化component

 

5. SpoutBolt之间、BoltBolt之间的通信,是通过zeroMQ的消息队列实现的。

 

二 作业的提交

 

下面的代码展示了以本地运行方式提交一个Topology作业

 

//Topology definition  

   TopologyBuilder builder = new TopologyBuilder();  

   builder.setSpout("word-reader",new WordReader());  

   builder.setBolt("word-normalizer", new WordNormalizer())  

     .shuffleGrouping("word-reader");  

   builder.setBolt("word-counter", new WordCount(),1)  

     .fieldsGrouping("word-normalizer", new Fields("word"));  

  

  //Configuration  

    Config conf = new Config();  

   conf.put("wordsFile", args[0]);  

   conf.setDebug(true);  

  

  //Topology run  

   conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  

   LocalCluster cluster = new LocalCluster();  

   cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());  

   Thread.sleep(2000);  

   cluster.shutdown(); 

此例中的builderTopologyBuilder对象,通过它的createTopology方法可以创建一个Topology对象,同时此builder还要定义当前Topology中用到的SpoutBolt对象,分别通过setSpout方法和setBolt方法来完成。

 

setSpout方法和setBolt方法中的第一个参数是当前的Component组件的StreamID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来确定TopologyTask数目。

 

通过一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。

 

下面对workerexecutor以及task做一下说明:

 

worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。其数目可以通过设置yaml中的topology.workers属性以及在代码中通过ConfigsetNumWorkers方法设定。

 

executor:产生于worker进程内部的线程,会执行同一个component的一个或者多个task。 其数目可以在Topology的入口类中setBoltsetSpout方法的最后一个参数指定,不指定的话,默认为1

 

task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。在代码中通过TopologyBuildersetNumTasks方法设定具体某个组件的task数目;

 

有几点需要说明的地方:

 

1.Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;

 

2.在设定Topology所关联的SpoutsBolts时,可以同时设置当前SpoutBoltexecutor数目和task数目,默认情况下,一个Topologytask的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;

 

3. 任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;

 

4. Supervisor节点会不断的轮询zookeeper集群,在zookeeperassignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;

 

5.一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。最后一步会不间断的执行,除非手动结束Topology

 

6.通过在Nimbus节点利用如下命令来终止一个Topology的运行:storm kill topologyName kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。

 

三 分组策略

 

1.shuffleGrouping 随机分组

 

builder.setBolt("word-normalizer", new WordNormalizer())

           .shuffleGrouping("word-reader");

它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

 

2.fieldsGrouping 域数据流组

 

builder.setBolt("word-counter", new WordCounter(),2)

           .fieldsGrouping("word-normalizer", new Fields("word"));

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt

 

在域数据流组中的所有域集合必须存在于数据源的域声明中

 

3.allGrouping 全部数据流组

 

builder.setBolt("word-counter", new WordCounter(),2)

           .fieldsGroupint("word-normalizer",new Fields("word"))

           .allGrouping("signals-spout","signals");

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向所有bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。

 

4.customGrouping 自定义数据流组

 

builder.setBolt("word-normalizer", new WordNormalizer())

           .customGrouping("word-reader", new ModuleGrouping());

5.directGrouping 直接数据流组

 

builder.setBolt("word-counter", new WordCounter(),2)

           .directGrouping("word-normalizer");

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。

 

6.全局数据流组

 

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

 

四 配置选项

 

在运行Topology之前,可以通过一些参数的配置来调节运行时的状态,参数的配置是通过Storm框架部署目录下的conf/storm.yaml文件来完成的。在此文件中可以配置运行时的Storm本地目录路径、运行时Worker的数目等。

 

在代码中,也可以设置Config的一些参数,但是优先级是不同的,不同位置配置Config参数的优先级顺序为:

 

default.yaml < storm.yaml <Topology内部的configuration <内部组件的special configuration < 外部组件的special configuration

 

storm.yaml中常用的几个选项为:

共有 人打赏支持
粉丝 8
博文 127
码字总数 19969
评论 (0)
×
如风达
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: