文档章节

storm翻译(3)Distributed RPC(分布式远程调用)

岩之有理
 岩之有理
发布于 2015/02/25 16:47
字数 2178
阅读 347
收藏 4
点赞 0
评论 0

原文地址:http://storm.apache.org/documentation/Distributed-RPC.html

分布式RPC的目的是在storm进行大量的实时计算时,能够并行的调用storm上的函数。Storm topology可以将函数参数作为输入Stream,并且将被调用方法产生的结果作为返回发送出去。

与其说DRPCstorm的一个特点,不如说它只是storm基本概念如steamsspoutsboltstopologies的一种表达方式。DRPC可以独立于storm作为一个库发布,但在和storm捆绑在一起时将会非常有用。

高级概述

分布式RPC被“DRPC server”实现(storm包中已经有了对应的实例)。DRPC server协调接收一个RPC请求并将这个请求发送给storm topology,然后接收storm topology算出的结果,再将结果发送给等待中的客户端。从客户端的视角来看,分布式RPC调用过程跟普通的RPC调用过程一样。举例:这里有一个客户端调用“reach”方法,输入参数是http://twitter.com,然后得到计算结果的例子。

DRPCClient client = new DRPCClient("drpc-host", 3772);

String result = client.execute("reach", "http://twitter.com");

这个DRPC的工作流可以描述为:

                 

一个客户端向DRPC Server发送了想要调用的方法名称和方法参数。实现了这个方法的topology用一个DRPCSpoutDRPC Server接收了函数调用Stream。每个远程方法在DRPC Server上都有一个唯一的IDTopology计算出结果之后,使用一个ReturnResultsbolt连接DPRC Server后,将结果交给它。DRPC Server根据方法ID匹配出结果,然后唤醒等待的客户端,将结果发送给客户端

LinearDRPCTopologyBuilder(线性)

Storm中有一个LinearDRPCTopologyBuilder topology 生成,已经自动实现了DPRC调用过程中的绝大部分。包括:

1:配置spout

2:将结果返回给DPRC Server

3:为bolt提供了简单的tuple之间的聚合操作

让我们看一个简单的例子,这里有一个DPRC topology的实现,他可以给输入的参数后面添加一个“!”。

public static class ExclaimBolt extends BaseBasicBolt {

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String input = tuple.getString(1);

        collector.emit(new Values(tuple.getValue(0), input + "!"));

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "result"));

    }

}

 

public static void main(String[] args) throws Exception {

    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

    builder.addBolt(new ExclaimBolt(), 3);

// ...

}

就像我们看到的,代码非常简单。当我们创建一个LinearDRPCTopologyBuilder时,你告诉它实现了DRPC功能的topology的名字。一个DRPC server可以配置很多topology名称,这些名称不能重复。第一个bolt不在代码中,应该是LinearDRPCTopologyBuilder内部的译者注)会接受2tuples,第一个属性是请求id,第二个是方法的参数。LinearDRPCTopologyBuilder中最后一个bolt返回了2tuples的输出StreamStream的格式为[id, result],当然了,过程中产生的所有tuple的第一个属性都是请求id

在这个例子中,ExclaimBolt只是简单的在tuple的第二种属性上添加了一个“!”。LinearDRPCTopologyBuilder完成了连接DRPC server并返回结果的其他过程。

本地DRPC模式

DRPC可以在本地运行,例子:

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster();

 

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

 

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

 

cluster.shutdown();

drpc.shutdown();

第一步,创建一个LocalDRPC对象。这个队形模拟DPRC server的运行,就像LocalCluster模拟集群运行一样。然后你创建一个LocalCluster在本地运行topologyLinearDRPCTopologyBuilder有独立的方法分布创建本地topology和远程topology。本地运行的LocalDRPC不需要绑定端口,因为在本地topology不需要端口来传递对象。这就是为什么createLocalTopologyLocalDRPC作为输入。

启动之后,你就可以看到DRPC调用过程在本地执行。

远程DRPC模式

在实际的集群上使用DRPC也非常简单,只需要三步:

1:启动DRPC server

2:配置DRPC server地址

3:将DRPC topology提交到storm集群

启动DRPC server就行启动NimbusUI一样简单
bin/storm drpc

接下来,你需要为storm集群配置DRPC的地址,才能DRPCSpout让知道在哪里读取方法调用。可以在storm.yaml中配置或者通过topology配置。在storm.yaml中配置如下

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,你要通过StormSubmitter启动DPRC topology,就想启动任何topology一样。远程模式运行上面的例子你可以:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用来为storm集群创建topology

一个更复杂的例子

上面那个感叹号例子用来熟悉DPRC的概念还是过于简单。让我们看一个更复杂的例子,一个真正需要通过并行运行storm来计算的DRPC方法。例子就是计算Twitter上的URLreach

URLreach就是这个URL暴漏给多少用户。为了计算reach,你需要:

1:获取这个URL的所有twitter

2:获取这些twitterfollower

3:去掉重复的follower

4:计算每个URLfollower

一个真实的计算需要数千次的数据库交互和上百万的flowwer记录的计算。这是非常非常碉堡的计算。但正如你所看到的,基于storm实现这个功能却非常的简单。在单台机器上,reach要花上数分钟来计算;但是在storm集群上,你可以在数秒钟就计算出最难算的URLreach

storm-starter上有一个reach topologyhere)样例,这里告诉你如何定义一个reach topology

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");

builder.addBolt(new GetTweeters(), 3);

builder.addBolt(new GetFollowers(), 12)

        .shuffleGrouping();

builder.addBolt(new PartialUniquer(), 6)

        .fieldsGrouping(new Fields("id", "follower"));

builder.addBolt(new CountAggregator(), 2)

.fieldsGrouping(new Fields("id"));

这个topology计算一共需要四步:

1:GetTweeters获取了tweeted这个URL的用户。它将输入stream[id, url]转换成了一个输出Stream[id, tweeter]。每一个URL映射了多个用户tuple

2:获取了每一个tweeterfollowers(粉丝)。它将输入stream[id, tweeter]转换成了一个输出Stream [id, follower]。经过这个过程,由于一个follower是同时tweet了同一个URL的多个用户的粉丝,就会产生一些重复的follower tuple

3.PartialUniquerfollower Stream按照followerid分组,保证同一个follower会进入同一个task。所以每一个PartialUniquer task都会接收到相互独立的follower集合。当PartialUniquer接收了所有根据request id分配给它的follower tuples,它就会将去重之后的follower集合的数量发射出去。

4.最终,CountAggregator接收了每一个PartialUniquer task发射的数量,并且通过计算总和来完成reach的计算过程。

下面来看一下PartialUniquer bolt

public class PartialUniquer extends BaseBatchBolt {

    BatchOutputCollector _collector;

    Object _id;

    Set<String> _followers = new HashSet<String>();

 

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

 

    @Override

    public void execute(Tuple tuple) {

        _followers.add(tuple.getString(1));

    }

 

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _followers.size()));

    }

 

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "partial-count"));

    }

}

PartialUniquer通过继承BaseBatchBolt实现了IBatchBolt接口。批处理bolt提供了一流的API可以将批量的tuples当作一个批次来处理。每一个request id都会创建一个batch bolt实例,而storm负责在适当的时候清理这些实例。

每当PartialUniquerexecute方法中接收到一个follower tuple,它就将follower放到request id对应的set集合中。

当发射到这个bolt的所有tuples都被处理之后,batch bolts中的finishBatch方法将会被调用。 PartialUniquer发射了一个包含follower数量的tuple

在后台,CoordinatedBolt用来判断bolt是否全部接受了指定request id的所有tupleCoordinatedBolt利用直接Stream来管理这次协调。

这个topology的剩余部分就显而易见了。就像你看到的,reach计算的每一步都是并行计算,而且实现DRPC topology是多么的简单。

Non-linear(非线性)DRPC topologies

LinearDRPCTopologyBuilder只能处理线下的DRPC topologies:整个计算可以分割为多个独立的顺序步骤。它很难处理包含有bolt分支和bolt合并的复杂topology。目前,为了实现复杂的功能,你只能通过直接使用CoordinatedBolt

LinearDRPCTopologyBuilder如何工作

DRPCSpout发射了[args, return-info].return-info中包含DRPC的地址和端口,就像DRPCid一样。

构建一个topology包含:

  • DRPCSpout

  • PrepareRequest(准备请求:产生一个request id并为return-info和参数分别创建一个Stream

  • CoordinatedBolt wrappers and direct groupingsCoordinatedBolt封装和直接分组)

  • JoinResult(将result-info加入结果)

  • ReturnResult(连接DRPC server 并返回结果)

LinearDRPCTopologyBuilder是一个非常好storm高级抽象的例子

高级

KeyedFairBolt封装同时处理多个请求

如何直接使用CoordinatedBolt

译者:需要详细了解CoordinatedBolt,个人推荐徐明明的一个博客http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/



© 著作权归作者所有

共有 人打赏支持
岩之有理
粉丝 7
博文 18
码字总数 6134
作品 0
徐汇
高级程序员
Apache Storm 1.1.3 和 1.2.2 发布,分布式实时计算

Apache Storm 1.1.3 和 1.2.2 已发布,这是一个常规维护版本,其中包含许多重要的错误修复,可以提高 Storm 的性能,稳定性和容错能力。建议以前版本的用户升级到最新版本。 更新内容较多,详...

局长 ⋅ 06/06 ⋅ 0

大数据Storm相比于Spark、Hadoop有哪些优势(摘录)

一、可能很多初学大数据的伙伴不知道strom是什么,先给大家介绍一下strom: 分布式实时计算系统,storm对于实时计算的意义类似于hadoop对于批处理的意义。 storm的适用场景。 流数据处理。S...

风火数据 ⋅ 06/01 ⋅ 0

Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli ⋅ 2015/06/18 ⋅ 0

年薪40万的大数据工程师是如何安装Strom

Strom集群的安装配置 主机规划 一、准备服务器 l 关闭防火墙 chkconfig iptables off && setenforce 0 l 创建用户 groupadd hadoop && useradd hadoop  && usermod -a -G hadoop hadoop l ......

爱尚实训 ⋅ 04/23 ⋅ 0

[HCNA Cloud]FusionInsight架构与原理

大数据是指无法再一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合。 Yarn是Hadoop2.0中的资源管理系统,它是一个通用的资源管理模块,可为各类应用程序进行资源管理和调度。...

Grodd ⋅ 04/25 ⋅ 0

大数据经典学习路线(及供参考)之 三

3.Storm实时计算部分阶段 实时课程分为两个部分:流式计算核心技术和流式计算计算案例实战。 1.流式计算核心技术 流式计算核心技术主要分为两个核心技术点:Storm和Kafka,学完此阶段能够掌握...

柯西带你学编程 ⋅ 05/22 ⋅ 0

大数据学习之(Storm)-原理详解!

角色 Client client的主要作用是提交topology到集群 Worker Worker是运行在Supervisor节点上的一个独立的JVM进程,主要作用是运行topology,一个topology可以包含多个worker,但一个worker只...

qq5af153121eb2c ⋅ 05/08 ⋅ 0

Storm笔记整理(一):简介与设计思想

[TOC] 实时计算概述 有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统...

xpleaf ⋅ 04/12 ⋅ 0

Storm笔记整理(三):Storm集群安装部署与Topology作业提交

[TOC] Storm分布式集群安装部署 概述 Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,...

xpleaf ⋅ 04/13 ⋅ 0

Storm笔记整理(二):Storm本地开发案例—总和计算与单词统计

[TOC] 概述 在Strom的API中提供了对象,这样在不用搭建Storm环境或者Storm集群的情况下也能够开发Storm的程序,非常方便。 基于Maven构建工程项目,其所需要的依赖如下: Storm本地开发案例1...

xpleaf ⋅ 04/12 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 40分钟前 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

Java正无穷大、负无穷大以及NaN

问题来源:用Java代码写了一个计算公式,包含除法和对数和取反,在页面上出现了-infinity,不知道这是什么问题,网上找答案才明白意思是负的无穷大。 思考:为什么会出现这种情况呢?这是哪里...

young_chen ⋅ 昨天 ⋅ 0

前台对中文编码,后台解码

前台:encodeURI(sbzt) 后台:String param = URLDecoder.decode(sbzt,"UTF-8");

west_coast ⋅ 昨天 ⋅ 0

实验楼—MySQL基础课程-挑战3实验报告

按照文档要求创建数据库 sudo sercice mysql startwget http://labfile.oss.aliyuncs.com/courses/9/createdb2.sqlvim /home/shiyanlou/createdb2.sql#查看下数据库代码 代码创建了grade......

zhangjin7 ⋅ 昨天 ⋅ 0

一起读书《深入浅出nodejs》-node模块机制

node 模块机制 前言 说到node,就不免得提到JavaScript。JavaScript自诞生以来,经历了工具类库、组件库、前端框架、前端应用的变迁。通过无数开发人员的努力,JavaScript不断被类聚和抽象,...

小草先森 ⋅ 昨天 ⋅ 0

Java桌球小游戏

其实算不上一个游戏,就是两张图片,不停的重画,改变ball图片的位置。一个左右直线碰撞的,一个有角度碰撞的。 左右直线碰撞 package com.bjsxt.test;import javax.swing.*;import j...

森林之下 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部