Jstorm源码阅读(1)—— topology 提交过程

原创
2017/02/15 20:09
阅读数 308

Jstorm源码阅读(1)—— topology 提交过程

Client端

client端即我们使用命令

jstorm jar xxxx.jar xxxx.xxxx.xxxx args...

提交topology的过程,所以我们直接从StormSubmitter类的submitTopology方法开始看,核心代码如下:

    try {
        String serConf = Utils.to_json(stormConf);
        if (localNimbus != null) {
            LOG.info("Submitting topology " + name + " in local mode");
            localNimbus.submitTopology(name, null, serConf, topology);
        } else {
            NimbusClient client = NimbusClient.getConfiguredClient(conf);
            try {
                if (topologyNameExists(client, conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(client, conf);
                LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
                if (opts != null) {
                    client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    client.getClient().submitTopology(name, path, serConf, topology);
                }
            } finally {
                client.close();
            }
        }
        LOG.info("Finished submitting topology: " + name);
    } catch (InvalidTopologyException e) {
        LOG.warn("Topology submission exception", e);
        throw e;
    } catch (AlreadyAliveException e) {
        LOG.warn("Topology already alive exception", e);
        throw e;
    } catch (TopologyAssignException e) {
        LOG.warn("Failed to assign " + e.get_msg(), e);
        throw new RuntimeException(e);
    } catch (TException e) {
        LOG.warn("Failed to assign ", e);
        throw new RuntimeException(e);
    }

由于是集群模式,这里走else逻辑,首先根据jstorm路径下的配置文件创建NimbusClient对象,然后判断提交的topology的名字是否是唯一的,然后调用submitJar上传jar包。submitJar方法核心代码如下:

String localJar = System.getProperty("storm.jar");
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
if (lib != null && lib.size() != 0) {
    for (String libName : lib) {
        String jarPath = path + "/lib/" + libName;
        client.getClient().beginLibUpload(jarPath);
        submitJar(conf, libPath.get(libName), jarPath, client);
    }
} else {
    if (localJar == null) {
        // no lib, no client jar
        throw new RuntimeException("No client app jar, please upload it");
    }
}
if (localJar != null) {
    submittedJar = submitJar(conf, localJar, uploadLocation, client);
} else {
    // no client jar, but with lib jar
    client.getClient().finishFileUpload(uploadLocation);
}

首先根据storm.jar属性获取到jar包的全路径。我们在调用jstorm命令时执行的其实是源码目录下的bin/jstorm.py文件,而jstorm.py里的jar函数就是我们在执行命令jstorm jar时调用的代码,可以看到里面在调用java程序时有"-Dstorm.jar=" + jarfile 这样的代码,所以在java中使用storm.jar能货渠道jar包的路径。

这里先向Nimbus服务器发起beginFileUpload请求,得到要上传的路径后,在submitJar中调用uploadChunk将jar包上传到服务器,文件传输完毕后,也是在submitJar方法中调用finishFileUpload结束上传jar包的过程i,返回到submitTopology方法中。调用submitTopology方法通知服务器做topology的后续处理工作。


Server端

服务端在启动NimbusServer的时候会实例化一个ServiceHandler类的对象,然后在启动thrift server时会把它作为参数传递进去,所有后续thrift server的调用都会回调到ServiceHandler类的方法中,所以我们直接看这个类中的submitTopology方法。

@Override
public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws TException {
    SubmitOptions options = new SubmitOptions(TopologyInitialStatus.ACTIVE);
    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
}

调用了submitTopologyWithOpts,首先是一些topology的合法性校验。

if (!Common.charValidate(topologyName)) {
    throw new InvalidTopologyException(topologyName + " is not a valid topology name");
}
checkTopologyActive(data, topologyName, false);

构造一些配置项

Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils.from_json(jsonConf);
if (serializedConf == null) {
    LOG.warn("Failed to serialized Configuration");
    throw new InvalidTopologyException("Failed to serialize topology configuration");
}
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
serializedConf.put(Config.TOPOLOGY_NAME, topologyName);
Map<Object, Object> stormConf;
stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);
Map<Object, Object> totalStormConf = new HashMap<Object, Object>(conf);
totalStormConf.putAll(stormConf);

标准化topology和进行一些校验

//标准化
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);
//校验ID、字段合法性,worker和acker数量合法性
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
//创建各种本地文件
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology);

//在zk上为每个spout和bolt创建task信息
setupZkTaskInfo(conf, topologyId, stormClusterState);
//为topology创建一个分发事件,放到队列中,等待被处理。真正的分发是由TopologyAssign在其他线程中处理的。
makeAssignment(topologyName, topologyId, options.get_initial_status());
//向监控topology的线程发起一个开始事件,该事件由TopologyMetricsRunnable在其他线程中处理。
StartTopologyEvent startEvent = new StartTopologyEvent();
startEvent.clusterName = this.data.getClusterName();
startEvent.topologyId = topologyId;
startEvent.timestamp = System.currentTimeMillis();
startEvent.sampleRate = metricsSampleRate;
this.data.getMetricRunnable().pushEvent(startEvent);

ServiceHandler的工作到这里就结束了,后面就没它什么事了,接下来我们看看TopologyAssign是怎么处理发送过来的TopologyAssignEvent的。这个类继承自Runnable,是在NimbusServer初始化的时候启动的一个线程。在它的run方法里循环的从队列中读取并由doTopologyAssignment方法处理分发事件。核心代码在mkAssignment方法中:

String topologyId = event.getTopologyId();
LOG.info("Determining assignment for " + topologyId);
//创建TP分发上下文,里面封装了一些集群、task和component信息
TopologyAssignContext context = prepareTopologyAssign(event);
Set<ResourceWorkerSlot> assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    //获取调度器,执行任务的真正调度
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}
Assignment assignment = null;
if (assignments != null && assignments.size() > 0) {
...
assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
StormClusterState stormClusterState = nimbusData.getStormClusterState();
//将调度器分配的结果设置到zk中
stormClusterState.set_assignment(topologyId, assignment);
...
}
return assignment;

调度器的部分就先不说了,总体来说一个topology的提交过程就算完事了,基本过程如下:

  1. client将jar包传给server,通知server提交了一个topology
  2. server做一些基本的校验后生成一个事件,发送给TopologyAssign
  3. TopologyAssign处理分发事件,并交给Scheduler来计算如何分配task和work到supervisor。
  4. 将task的分配结果写到zk,后续由supervisor订阅并执行task的启动和运行。
展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部