文档章节

Jstorm源码阅读(1)—— topology 提交过程

纳兰清风
 纳兰清风
发布于 2017/02/15 20:09
字数 1225
阅读 140
收藏 1

Jstorm源码阅读(1)—— topology 提交过程

Client端

client端即我们使用命令

jstorm jar xxxx.jar xxxx.xxxx.xxxx args...

提交topology的过程,所以我们直接从StormSubmitter类的submitTopology方法开始看,核心代码如下:

    try {
        String serConf = Utils.to_json(stormConf);
        if (localNimbus != null) {
            LOG.info("Submitting topology " + name + " in local mode");
            localNimbus.submitTopology(name, null, serConf, topology);
        } else {
            NimbusClient client = NimbusClient.getConfiguredClient(conf);
            try {
                if (topologyNameExists(client, conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(client, conf);
                LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
                if (opts != null) {
                    client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    client.getClient().submitTopology(name, path, serConf, topology);
                }
            } finally {
                client.close();
            }
        }
        LOG.info("Finished submitting topology: " + name);
    } catch (InvalidTopologyException e) {
        LOG.warn("Topology submission exception", e);
        throw e;
    } catch (AlreadyAliveException e) {
        LOG.warn("Topology already alive exception", e);
        throw e;
    } catch (TopologyAssignException e) {
        LOG.warn("Failed to assign " + e.get_msg(), e);
        throw new RuntimeException(e);
    } catch (TException e) {
        LOG.warn("Failed to assign ", e);
        throw new RuntimeException(e);
    }

由于是集群模式,这里走else逻辑,首先根据jstorm路径下的配置文件创建NimbusClient对象,然后判断提交的topology的名字是否是唯一的,然后调用submitJar上传jar包。submitJar方法核心代码如下:

String localJar = System.getProperty("storm.jar");
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
if (lib != null && lib.size() != 0) {
    for (String libName : lib) {
        String jarPath = path + "/lib/" + libName;
        client.getClient().beginLibUpload(jarPath);
        submitJar(conf, libPath.get(libName), jarPath, client);
    }
} else {
    if (localJar == null) {
        // no lib, no client jar
        throw new RuntimeException("No client app jar, please upload it");
    }
}
if (localJar != null) {
    submittedJar = submitJar(conf, localJar, uploadLocation, client);
} else {
    // no client jar, but with lib jar
    client.getClient().finishFileUpload(uploadLocation);
}

首先根据storm.jar属性获取到jar包的全路径。我们在调用jstorm命令时执行的其实是源码目录下的bin/jstorm.py文件,而jstorm.py里的jar函数就是我们在执行命令jstorm jar时调用的代码,可以看到里面在调用java程序时有"-Dstorm.jar=" + jarfile 这样的代码,所以在java中使用storm.jar能货渠道jar包的路径。

这里先向Nimbus服务器发起beginFileUpload请求,得到要上传的路径后,在submitJar中调用uploadChunk将jar包上传到服务器,文件传输完毕后,也是在submitJar方法中调用finishFileUpload结束上传jar包的过程i,返回到submitTopology方法中。调用submitTopology方法通知服务器做topology的后续处理工作。


Server端

服务端在启动NimbusServer的时候会实例化一个ServiceHandler类的对象,然后在启动thrift server时会把它作为参数传递进去,所有后续thrift server的调用都会回调到ServiceHandler类的方法中,所以我们直接看这个类中的submitTopology方法。

@Override
public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws TException {
    SubmitOptions options = new SubmitOptions(TopologyInitialStatus.ACTIVE);
    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
}

调用了submitTopologyWithOpts,首先是一些topology的合法性校验。

if (!Common.charValidate(topologyName)) {
    throw new InvalidTopologyException(topologyName + " is not a valid topology name");
}
checkTopologyActive(data, topologyName, false);

构造一些配置项

Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils.from_json(jsonConf);
if (serializedConf == null) {
    LOG.warn("Failed to serialized Configuration");
    throw new InvalidTopologyException("Failed to serialize topology configuration");
}
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
serializedConf.put(Config.TOPOLOGY_NAME, topologyName);
Map<Object, Object> stormConf;
stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);
Map<Object, Object> totalStormConf = new HashMap<Object, Object>(conf);
totalStormConf.putAll(stormConf);

标准化topology和进行一些校验

//标准化
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);
//校验ID、字段合法性,worker和acker数量合法性
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
//创建各种本地文件
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology);

//在zk上为每个spout和bolt创建task信息
setupZkTaskInfo(conf, topologyId, stormClusterState);
//为topology创建一个分发事件,放到队列中,等待被处理。真正的分发是由TopologyAssign在其他线程中处理的。
makeAssignment(topologyName, topologyId, options.get_initial_status());
//向监控topology的线程发起一个开始事件,该事件由TopologyMetricsRunnable在其他线程中处理。
StartTopologyEvent startEvent = new StartTopologyEvent();
startEvent.clusterName = this.data.getClusterName();
startEvent.topologyId = topologyId;
startEvent.timestamp = System.currentTimeMillis();
startEvent.sampleRate = metricsSampleRate;
this.data.getMetricRunnable().pushEvent(startEvent);

ServiceHandler的工作到这里就结束了,后面就没它什么事了,接下来我们看看TopologyAssign是怎么处理发送过来的TopologyAssignEvent的。这个类继承自Runnable,是在NimbusServer初始化的时候启动的一个线程。在它的run方法里循环的从队列中读取并由doTopologyAssignment方法处理分发事件。核心代码在mkAssignment方法中:

String topologyId = event.getTopologyId();
LOG.info("Determining assignment for " + topologyId);
//创建TP分发上下文,里面封装了一些集群、task和component信息
TopologyAssignContext context = prepareTopologyAssign(event);
Set<ResourceWorkerSlot> assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    //获取调度器,执行任务的真正调度
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}
Assignment assignment = null;
if (assignments != null && assignments.size() > 0) {
...
assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
StormClusterState stormClusterState = nimbusData.getStormClusterState();
//将调度器分配的结果设置到zk中
stormClusterState.set_assignment(topologyId, assignment);
...
}
return assignment;

调度器的部分就先不说了,总体来说一个topology的提交过程就算完事了,基本过程如下:

  1. client将jar包传给server,通知server提交了一个topology
  2. server做一些基本的校验后生成一个事件,发送给TopologyAssign
  3. TopologyAssign处理分发事件,并交给Scheduler来计算如何分配task和work到supervisor。
  4. 将task的分配结果写到zk,后续由supervisor订阅并执行task的启动和运行。

© 著作权归作者所有

纳兰清风

纳兰清风

粉丝 33
博文 36
码字总数 37100
作品 0
朝阳
程序员
私信 提问
JStorm的客户端提交问题

平时我们发布Topology,都是在后台用 ./jstorm jar xxxxx.jar com.xxxx.xxxTopolog arg0,arg1 来将Topology发布到storm上 现在我用程序直接去发布Topology,但老是出现问题: Topology发布的...

锋_行者
2015/10/25
947
1
JStorm 2.2.1 发布,分布式计算系统

JStorm 2.2.1 发布,更新内容如下: 新功能 Performance is improved by 200%~300%, compared to Release 2.1.1 and 0.9.8.1 in several testing scenarios, while 120%~200% compared to Fl......

淡漠悠然
2017/01/10
2.4K
2
Storm 和JStorm

关于流处理框架,在先前的文章汇总已经介绍过Strom,今天学习的是来自阿里的的流处理框架JStorm。简单的概述Storm就是:JStorm 比Storm更稳定,更强大,更快,Storm上跑的程序,一行代码不变...

止静
2014/08/30
9.6K
1
JStorm 2.1.1 发布,分布式计算系统

Storm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7 * 24小时运行起来,一旦中间一个worker 发生...

淡漠悠然
2016/03/07
2.3K
6
jstorm集群模式下bolt不能访问文件

linux jstorm分布式环境下判断一个文件(绝对路径)是否存在 bolt报出异常: display-log-bolt:2-BoltException: java.io.IOException: No such file or directory at java.io.UnixFileSyst......

Alahr
2017/10/02
154
0

没有更多内容

加载失败,请刷新页面

加载更多

哪些情况下适合使用云服务器?

我们一直在说云服务器价格适中,具备弹性扩展机制,适合部署中小规模的网站或应用。那么云服务器到底适用于哪些情况呢?如果您需要经常原始计算能力,那么使用独立服务器就能满足需求,因为他...

云漫网络Ruan
今天
10
0
Java 中的 String 有没有长度限制

转载: https://juejin.im/post/5d53653f5188257315539f9a String是Java中很重要的一个数据类型,除了基本数据类型以外,String是被使用的最广泛的了,但是,关于String,其实还是有很多东西...

低至一折起
今天
22
0
OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
11
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
9
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部