Storm Component基本接口

原创
2017/08/14 09:31
阅读数 127

IComponent

IComponent接口是所有组件的接口。

主要包含两个方法:

  1. declareOutputFields:为拓扑的所有流组件生命输出模式。
  2. getComponentConfiguration:声明指定组件大的配置。只有"topology.*"配置的一个子集可以被覆盖。当使用TopologyBuilder构建拓扑是,组件配置可以被进一步覆盖。
package org.apache.storm.topology;
import java.io.Serializable;
import java.util.Map;
/**
 * Common methods for all possible components in a topology. This interface is used
 * when defining topologies using the Java API. 
 */
public interface IComponent extends Serializable {
    /**
     * Declare the output schema for all the streams of this topology.
     *
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);
    /**
     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can
     * be overridden. The component configuration can be further overridden when constructing the 
     * topology using {@link TopologyBuilder}
     *
     */
    Map<String, Object> getComponentConfiguration();
}


ISpout

ISpout是实现Spout的核心接口。Spout负责提供消息给拓扑进行处理。Storm将跟踪基于Spout发射的元组产生的有向无环图。当Storm检测到有向无环图的每个元组已经成功被处理时,它将发送一个ack信息到Spout。

如果一个元组在配置的超时时间之前不能被完全处理,Storm将发送fail信息到Spout。

当一个Spout发送一个元组时,可以使用messageId来标记元组。消息id可以是任何类型。当Storm进行ack或者fail消息时,它可以使用messageId来识别是哪些元组。如果Spout漏掉了messageId,或者将它设置为null,那么Storm将不会跟踪信息,并且Spout也不会收到任何ack或者fail信息的回调。

Storm在同一个线程里执行ack()、fail()和nextTuple()方法。以为这ISpout的实现并不想需要担心这些方法之间的并发问题。然而,这也以为这ISpout的实现必须确保nextTuple()方法是非阻塞,否则nextTuple()方法可能会组织等待处理的ack()和fail()方法。

包含如下方法:

  1. open:open()方法在该组件的一个任务在集群的工作进程内被初始化时调用,提供了Spout执行所需要的环境。conf参数是这个Spout的Storm配置,提供给拓扑与这台主机的集群配置一起进行合并。Context参数可以用来获取关于这个任务在拓扑中位置信息,包括该任务的id、该任务的组件id、输入和输出信息等。  collector参数是收集器,用于从这个Spout发射元组。元组可以随时被发射,包括open()和close()方法。收集器是线程安全的,应该作为这个Spout对象的实例变量进行保存。
  2. close:close()方法当一个ISpout即将关闭时被调用。不能保证close()方法一定会被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。如果在本地模式下运行Storm,当拓扑被杀死的时候,可以保证close()方法一定会被调用。
  3. activate:activate()方法当Spout已经从失效模式中激活时被调用。该Spout的nextTuple()方法很快会调用。当使用Storm客户端操作拓扑时,Spout可以在失效状态之后变成激活模式。
  4. deactivate:当Spout已经失效时被调用。当Spout失效期间,nextTuple不会被调用。Spout将来可能会也可能不会被重新激活。
  5. nextTuple:当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器OutputCollecotr。nextTuple()方法应该是非阻塞的,所以,如果Spout没有元组可以发射,该方法应该返回。nextTuple()、ack()和fail()方法都在Spout任务的单一线程内紧密循环被调用。当没有元组可以发射时,可以让nextTuple()去sleep很短的时间,例如1毫秒,这样就不会浪费太多的CPU资源。
  6. ack:Storm已经断定该Spout发射的标识符为msgId的元组已经被完全处理时,会调用ack方法。通常情况下,ack()方法会将该信息移出队列以防止它被重发。
  7. fail:该Spout发射的标识符为msgId的元组未能被完全处理时,会调用fail()方法。通常情况下,fail方法会将消息放回队列中,并在稍后重发消息。
package org.apache.storm.spout;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
import java.io.Serializable;
/**
 * ISpout is the core interface for implementing spouts. A Spout is responsible
 * for feeding messages into the topology for processing. For every tuple emitted by
 * a spout, Storm will track the (potentially very large) DAG of tuples generated
 * based on a tuple emitted by the spout. When Storm detects that every tuple in
 * that DAG has been successfully processed, it will send an ack message to the Spout.
 *
 * If a tuple fails to be fully processed within the configured timeout for the
 * topology (see {@link org.apache.storm.Config}), Storm will send a fail message to the spout
 * for the message.
 *
 * When a Spout emits a tuple, it can tag the tuple with a message id. The message id
 * can be any type. When Storm acks or fails a message, it will pass back to the
 * spout the same message id to identify which tuple it's referring to. If the spout leaves out
 * the message id, or sets it to null, then Storm will not track the message and the spout
 * will not receive any ack or fail callbacks for the message.
 *
 * Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor
 * of an ISpout does not need to worry about concurrency issues between those methods. However, it 
 * also means that an implementor must ensure that nextTuple is non-blocking: otherwise 
 * the method could block acks and fails that are pending to be processed.
 */
public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the spout with the environment in which the spout executes.
     *
     * This includes the:
     *
     * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
     */
    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
    /**
     * Called when an ISpout is going to be shutdown. There is no guarentee that close
     * will be called, because the supervisor kill -9's worker processes on the cluster.
     *
     * The one context where close is guaranteed to be called is a topology is
     * killed when running Storm in local mode.
     */
    void close();
    
    /**
     * Called when a spout has been activated out of a deactivated mode.
     * nextTuple will be called on this spout soon. A spout can become activated
     * after having been deactivated when the topology is manipulated using the 
     * `storm` client. 
     */
    void activate();
    
    /**
     * Called when a spout has been deactivated. nextTuple will not be called while
     * a spout is deactivated. The spout may or may not be reactivated in the future.
     */
    void deactivate();
    /**
     * When this method is called, Storm is requesting that the Spout emit tuples to the 
     * output collector. This method should be non-blocking, so if the Spout has no tuples
     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
     * to have nextTuple sleep for a short amount of time (like a single millisecond)
     * so as not to waste too much CPU.
     */
    void nextTuple();
    /**
     * Storm has determined that the tuple emitted by this spout with the msgId identifier
     * has been fully processed. Typically, an implementation of this method will take that
     * message off the queue and prevent it from being replayed.
     */
    void ack(Object msgId);
    /**
     * The tuple emitted by this spout with the msgId identifier has failed to be
     * fully processed. Typically, an implementation of this method will put that
     * message back on the queue to be replayed at a later time.
     */
    void fail(Object msgId);
}


IRichSpout

IRichSpout继承了ISpout接口和IComponent接口。

package org.apache.storm.topology;
import org.apache.storm.spout.ISpout;
/**
 * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
 * to use to implement components of the topology.
 *
 */
public interface IRichSpout extends ISpout, IComponent {
}


IBolt

IBolt是实现Bolt的核心接口。IBolt表示一个以元组作为输入并生成元组作为输出的组件。IBolt可以完成过滤、连接、函数、聚合等任何功能。IBolt没有立即处理元组,可以保留元组以后再处理。

Bolt的生命周期如下:在客户端主机上创建IBolt对象,IBolt被序列化到拓扑(使用Java序列化)并提交到集群的主控节点(Nimbus)。然后supervisor启动工作进程(Worker)反序列化对象,调用对象上的prepare方法,然后开始处理元组。

如果你喜欢参数化一个IBolt,应该通过其构造函数设置参数并作为实例变量保存参数化状态,然后,实例变量会序列化,并发送给跨集群的每个任务来执行这个Bolt。如果使用Java来定义Bolt,应该使用IRichBolt接口,IRichBolt接口添加了使用Java TopologyBuilder API的必要方法。

IBolt如下方法:

  1. prepare:prepare()方法在该组件的一个任务是集群的工作进程内被初始化时调用,提供了Bolt执行时所需要的环境。topoConf参数是这个Bolt的Storm配置,提供给拓扑和这台主机上的集群配置一起进行合并。context参数可以用来获取关于这个任务在拓扑中的位置信息,包括该任务的id、该任务的组件id、输入输出信息等。collector参数是收集器,用于从这个Bolt发射元组。元组可以随社被发射,包括prepare()和cleanup()方法。收集器是线程安全的,应该作为这个Bolt对象的实例变量进行保存。
  2. execute:execute()方法处理一个输入元组。元组对象包括元组来自哪个component/stream/task的元数据。元组的值可以使用Tuple#getValue()进行访问。IBolt没有立即处理元组,而是完全的捕获一个元组并在以后进行处理,例如,做聚合或者连接计算。元组应该使用prepare方法提供的OutputCollector进行发射。使用OutputCollector在某种程度上要求所有输入元组是ack或者fail。否则,Storm将无法确定来自Spout的元组什么时候会处理完成。常规做法是,在executor方法结束是对输入元组调用ack方法,而IBasicBolt会自动处理该部分。input参数为被处理的输入元组。
  3. cleanup:cleanup方法当一个IBolt即将关闭时被调用。不能保证cleanup方法一定被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。如果在本地模式下运行Storm,当拓扑被杀死的时候,可以保证cleanup()方法一定会被调用。
package org.apache.storm.task;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
import java.io.Serializable;
/**
 * An IBolt represents a component that takes tuples as input and produces tuples
 * as output. An IBolt can do everything from filtering to joining to functions
 * to aggregations. It does not have to process a tuple immediately and may
 * hold onto tuples to process later.
 *
 * A bolt's lifecycle is as follows:
 *
 * IBolt object created on client machine. The IBolt is serialized into the topology
 * (using Java serialization) and submitted to the master machine of the cluster (Nimbus).
 * Nimbus then launches workers which deserialize the object, call prepare on it, and then
 * start processing tuples.
 *
 * If you want to parameterize an IBolt, you should set the parameters through its
 * constructor and save the parameterization state as instance variables (which will
 * then get serialized and shipped to every task executing this bolt across the cluster).
 *
 * When defining bolts in Java, you should use the IRichBolt interface which adds
 * necessary methods for using the Java TopologyBuilder API.
 */
public interface IBolt extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the bolt with the environment in which the bolt executes.
     *
     * This includes the:
     * 
     * @param topoConf The Storm configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object.
     */
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);
    /**
     * Process a single tuple of input. The Tuple object contains metadata on it
     * about which component/stream/task it came from. The values of the Tuple can
     * be accessed using Tuple#getValue. The IBolt does not have to process the Tuple
     * immediately. It is perfectly fine to hang onto a tuple and process it later
     * (for instance, to do an aggregation or join).
     *
     * Tuples should be emitted using the OutputCollector provided through the prepare method.
     * It is required that all input tuples are acked or failed at some point using the OutputCollector.
     * Otherwise, Storm will be unable to determine when tuples coming off the spouts
     * have been completed.
     *
     * For the common case of acking an input tuple at the end of the execute method,
     * see IBasicBolt which automates this.
     * 
     * @param input The input tuple to be processed.
     */
    void execute(Tuple input);
    /**
     * Called when an IBolt is going to be shutdown. There is no guarentee that cleanup
     * will be called, because the supervisor kill -9's worker processes on the cluster.
     *
     * The one context where cleanup is guaranteed to be called is when a topology
     * is killed when running Storm in local mode.
     */
    void cleanup();
}

IRichBolt

IRichBolt继承了IBolt接口和IComponent接口。

 package org.apache.storm.topology;
import org.apache.storm.task.IBolt;
/**
 * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
 * to use to implement components of the topology.
 *
 */
public interface IRichBolt extends IBolt, IComponent {
}

IBasicBolt

IBasicBolt继承了IComponent接口。

IBasicBolt与IRichBolt具有一样的同名方法,但是IBasicBolt的execute方法会自动处理Acking机制。

package org.apache.storm.topology;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
public interface IBasicBolt extends IComponent {
    void prepare(Map<String, Object> topoConf, TopologyContext context);
    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     * 
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     */
    void execute(Tuple input, BasicOutputCollector collector);
    void cleanup();
}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部