文档章节

Storm 分布式RPC

 大胖和二胖
发布于 2016/11/10 14:57
字数 719
阅读 59
收藏 0

关于这部分的基本概念,大家自行上网上查资料去,这里我直接上例子。

首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容

 drpc.servers:
  - "192.168.1.118"

之后通过storm drpc启动分布式RPC服务。

之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:

public class BasicDRPCTopology {
    public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }

    }

    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);

        Config conf = new Config();
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
    }
}

从main函数开始,简单解释一下:

首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。

之后我们加入一个自己的bolt,并行数量为3

之后用StormSubmitter把这个topology提交上去就行了。

代码完成之后,打一个jar包,用storm jar把topology提交到集群上。

客户端调用,非常简单

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("exclamation", "china");
        System.out.println(result);

到此为止,一个最简单的DRPC调用的工作已经完成了。

等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。

源码上有这么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。

trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。

那么上第二份代码:

public class TridentDRPCTopology {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
    }

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).
                groupBy(new Fields("word")).
                aggregate(new One(), new Fields("one")).
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
        return topology.build();
    }

    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static class One implements CombinerAggregator<Integer> {
        @Override
        public Integer init(TridentTuple tuple) {
            return 1;
        }

        @Override
        public Integer combine(Integer val1, Integer val2) {
            return 1;
        }

        @Override
        public Integer zero() {
            return 1;
        }
    }
}

这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).    //用空格分词
                groupBy(new Fields("word")).    //分组
                aggregate(new One(), new Fields("one")).    //给每组的数量设定为1
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));    //sum计算总和

这样的方式看起来跟spark当中对RDD的操作是有些像的。

好了,还是打包,提交。

然后是客户端测试:

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
        System.out.println(result);

 

© 著作权归作者所有

粉丝 23
博文 69
码字总数 50842
作品 0
沈阳
架构师
私信 提问
Apache Storm 0.9.7 发布,分布式实时计算

Apache Storm 0.9.7 发布了,Apache Storm 的前身是 Twitter Storm 平台,目前已经归于 Apache 基金会管辖。 Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像...

开源中国股侠
2016/09/08
796
1
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
07/20
817
0
Storm同步调用之DRPC模型探讨

  摘要:Storm的编程模型是一个有向无环图,决定了storm的spout接收到外部系统的请求后,spout并不能得到bolt的处理结果并将结果返回给外部请求。所以也就决定了storm无法提供对外部系统的同...

刘洋intsmaze
2017/09/28
0
0
Apache Storm 1.0.5 发布,分布式实时计算

Apache Storm 1.0.4 已发布,Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm ...

王练
2017/09/16
863
5
Apache Storm 0.10.2 发布,分布式实时计算

Apache Storm 0.10.2 发布了,Apache Storm 的前身是 Twitter Storm 平台,目前已经归于 Apache 基金会管辖。 Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像...

局长
2016/09/15
889
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部