文档章节

storm从入门到放弃教程(5)--深入理解Streams (数据流)

j
 java_龙
发布于 2017/04/19 10:12
字数 1800
阅读 627
收藏 20

概述

     上一篇【 storm开发环境搭建 】 博文连接:https://my.oschina.net/u/2342969/blog/878765

本篇会深入理解Streams,欢迎同志(此同志非彼同志)们通过私信/评论等方式共同学习了解.

     Streams是storm中一个核心的概念,它是在分布式并行处理和创建的无限序列元组,Streams通过给流元组中字段命名来定义,默认情况下,元组可以包含整型,长整型,短整型,字节,字符串,布尔型,双精度浮点型,单精度浮点型,字节数组,也可以自定义序列化类型。

      下面共同学习一下 Tuple(元组)、OutputFieldsDeclarer、 元组中动态类型以及序列器

Tuple(元组)

    Tuple是storm中主要的数据结构,是storm中使用的基本单元、元组。元组是一个值列表,其中,每个值可以是任意类型。动态类型的元组不需要被定义,元组有类似 getInteger 和getString的帮助方法,无须手动转换结果类型。

     storm需要知道如何序列化所有的值,默认情况下,storm知道如何序列化简单类型,比如字符串、字节数组,如果想使用复杂的对象,则需要注册实现一个该类型的序列器。

     在storm中tuple接口集成了Iuple接口 ,实现类为TupleImpl。

OutputFieldsDeclarer

     tuple的数据结构类似于map的键值对,其中键定义在OutputFieldsDeclarer方法的Fields对象中。

通过以下例子,可以帮助大家更好的理解:

//例2-2 src/main/java/bolts/WordNormalizer.java
package bolts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;
    public void cleanup(){}
    /**
     * *bolt*从单词文件接收到文本行,并标准化它。
     * 文本行会全部转化成小写,并切分它,从中得到所有单词。
     */
    public void execute(Tuple input){
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                //发布这个单词
                List a = new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    /**
     * 这个*bolt*只会发布“word”域
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

    创建了发送一个字段(“word”)的Bolt,此时tuple的键为“word”,值为execute方法中发送的Values对象。

序列器

    本次介绍的storm0.6.0(含)及后续版本中如何使用序列器,storm在0.6.0之前使用不同的序列器,这里就不介绍老版本的。

     tuple可以由任意类型组合而成,因为storm是分布式的,所以它需要知道在task间如何序列化和反序列化数据的。

     storm使用Kryo进行序列化,Kryo是java开发中一个快速灵活序列器。默认情况下,storm可以序列化基础类型,比如字符串,字节,数组,ArrayList, HashMap, HashSet和 Clojure 集合类型,如果需要使用其他类型,需要自定义序列器。

动态类型

     在元组中没有对应类型的字段。在字段中放入对象和storm动态序列化数据,得到序列化接口前,我们了解一下为什么storm元组是动态类型。

      增加静态类型会大大增加storm API的复杂性, Hadoop  中,使用静态类型的键和值,在使用是需要大量的注释,对于hadoop API使用以及类型安全是一个不值得的负担,动态类型使用起来会很简单。

此外,storm 元组没有一个合理的方式使用静态类型,假如一个bolt订阅了多个流,那些流中元组会有不同类型传输在字段中。当一个bolt在execute方法接收元组,可以接收任何流的元组,就会有很多类型的元组。这样在一个bolt中,就需要为每个类型的tuple生命不同的方法订阅,显然,storm选择了简单方式,使用动态类型。

     最后,另外使用动态类型的理由是storm可以直接被 Clojure  和 JRuby  这类动态类型的语言使用。

自定义序列器

    综上所述,storm 使用Kryo 作为序列器。为了实现自定义序列器,就需要用Kryo注册一个新的序列器,

在Kryo的Github主页: https://github.com/EsotericSoftware/kryo,有更加详细的介绍,这里仅做一下简单介绍。

      增加自定义序列器,需要在拓扑配置中添加“ topology.kryo.register ”属性,它可以配置一组序列器列表,每个序列器可以选择一下两种方式之一:

  1. 用类名注册,在例子中,storm会使用 Kryo  的“FieldsSerializer”序列化类--这可能不是最好的方式,在Kryo文档中有很多种方式。
  2. 从一个类名映射到实现了“ com.esotericsoftware.kryo.Serializer ”的注册器。

例子如下:   

topology.kryo.register:
  - com.mycompany.CustomType1
  - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer
  - com.mycompany.CustomType3

“com.mycompany.CustomType1“ 和“com.mycompany.CustomType3“ 使用“FieldsSerializer”序列化,反之,“com.mycompany.CustomType2“会使用”com.mycompany.serializer.CustomType2Serializer“ 序列化。

    storm提供了在拓扑配置中注册序列器的助手,Config类调用registerSerialization方法可以将一个序列器放入配置中。其中有一个高级设置“Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS”,如果把它设置为true,storm将会忽略在类路径无有效代码的序列器,否则,storm找不到序列器,将会排除异常。当在集群中运行了很多使用了不同序列器的拓扑,想通过“storm.yaml”文件为所有拓扑配置好序列器,它就非常有用的。

JAVA序列器

     当storm遇到一个没有序列器注册的类型,它可能会使用java序列器,如果此类型也无法被java序列器序列化,storm就会报出一个错误。

     需要注意的是,无论在CPU消耗方面还是序列化对象的大小, java序列器都是非常耗费资源的。故在生产上使用拓扑的话,强烈建议使用自定义序列器。java序列器在那里,是为了容易设计新的原型。

     通过设置“Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION”项为false,可以关闭java序列器。

特定组件序列化注册

     Storm 0.7.0  可以设置特殊组件配置,当然,如果一个组件定义一个序列化,这个序列器需要对其他bolt可用,否则其他bolt将无法接收那个组件的消息。

     一个拓扑被提交,一组序列器会被拓扑选择为所有组件发送消息使用,这是通过特殊组件序列化注册信息与普通组件序列化注册信息合并实现。当为同一个类注册了多个序列化时,序列器会任意选择一个。

     如果两个特定组件序列器有冲突,则会强制一个特定的类做序列器,只需在拓扑特定配置定义想要的序列器,拓扑特定配置优先于序列器注册的特定组件。

    

© 著作权归作者所有

j
粉丝 73
博文 102
码字总数 138333
作品 0
成都
程序员
私信 提问
加载中

评论(4)

j
java_龙 博主

引用来自“whaon”的评论

collector.emit(a,new Values(word));,这里为什么要加个a
collector.emit(new Values(word));而不这样?

@whaon 我IDE显示的原因,实际是word,没有a
whaon
whaon
collector.emit(a,new Values(word));,这里为什么要加个a
collector.emit(new Values(word));而不这样?
j
java_龙 博主

引用来自“symfx”的评论

这标题...
哈哈 可以被带入坑是吧😏
symfx
symfx
这标题...
聊聊storm trident的operations

序 本文主要研究一下storm trident的operations function filter projection Function storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java Function定义了exe......

go4it
2018/11/04
21
0
Apache Storm简介及安装部署

Apache Storm是一个分布式的、可靠的、容错的实时数据流处理框架。它与Spark Streaming的最大区别在于它是逐个处理流式数据事件,而Spark Streaming是微批次处理,因此,它比Spark Streaming...

风火数据
2018/07/20
0
0
聊聊storm的messageTimeout

序 本文主要研究一下storm的messageTimeout TOPOLOGYMESSAGETIMEOUT_SECS storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java defaults.yaml中topology.enable.message.timeout......

go4it
2018/10/31
24
0
聊聊flink如何兼容StormTopology

序 本文主要研究一下flink如何兼容StormTopology 实例 这里使用FlinkLocalCluster.getLocalCluster()来创建或获取FlinkLocalCluster,之后调用FlinkLocalCluster.submitTopology来提交topol...

go4it
2018/11/23
71
0
Kafka Streams 剖析

1.概述   Kafka Streams 是一个用来处理流式数据的库,属于Java类库,它并不是一个流处理框架,和Storm,Spark Streaming这类流处理框架是明显不一样的。那这样一个库是做什么的,能应用到...

smartloli
2017/09/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
7
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
7
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
9
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部