文档章节

storm shell命令源码分析-shell_submission.clj

Adel
 Adel
发布于 2016/04/13 17:43
字数 1372
阅读 29
收藏 0

当我们在shell里执行storm shell命令是会调用shell_submission.clj里的main函数。shell_submission.clj如下:

shell_submission.clj

;; ns函数声明一个命名空间“backtype.storm.command.shell-submission”

(ns backtype.storm.command.shell-submission

  ;; :import引用backtype.storm中的StormSubmitter类

  (:import [backtype.storm StormSubmitter])

  ;; :use引用backtype.storm thrift命名空间中的thrift、util、config和log,并建立连接,这样调用thrift、util、config和log中函数就可以直接使用函数名称不需要加完全限定名

  (:use [backtype.storm thrift util config log])

  ;; :require引用clojure.string,并使用别名str代替完全限定名clojure.string

  (:require [clojure.string :as str])

  ;; :gen-class生成java类

  (:gen-class))

  ;; storm shell命令所执行的main函数,gen-class的默认前缀"-",-main函数可以看成public函数,^String是类型提示符,用于声明参数tmpjarpath是一个字符串,-main函数可以接受多个实参,第一个参数赋值给tmpjarpath,其他参数全部保存在args中,args一个"序列"

  (defn -main [^String tmpjarpath & args]

  ;; conf绑定集群配置信息map,read-storm-config函数定义在backtype.storm.config命令空间,用于读取集群配置信息,返回包含集群配置信息的map,read-storm-config函数参见其定义部分

  (let [conf (read-storm-config)

        ;; 从集群配置信息中获取nimbus主机

        host (conf NIMBUS-HOST)

        ;; 从集群配置信息中获取nimbus thrift server的端口

        port (conf NIMBUS-THRIFT-PORT)

        ;; 调用StormSubmitter类的静态方法submitJar,将tmpjarpath所标识的jar文件上传到nimbus服务器上,jarpath保存jar文件在nimbus服务器上的路径,submitJar方法参见其定义部分

        jarpath (StormSubmitter/submitJar conf tmpjarpath)

        ;; concat函数将[host port jarpath]和args进行合并,并保存在args中

        args (concat args [host port jarpath])]

;; str/join将args中的参数用空格进行连接后,作为参数传递给exec-command!函数,执行jar文件中的main方法

(exec-command! (str/join " " args))

))

当Clojure源文件做为脚本执行时,它们将在运行时被编译为java字节码。它们同样可以提前编译为java字节码(AOT编译)。这改善了Clojure应用的启动时间,并生产了可以运用于java中的.class文件。如果编译过的命名空间中拥有一个叫做-main的函数,那么它就能够作为一个Java应用运行。命令行参数会作为参数传递给这个函数。

read-storm-config函数

;; read-storm-config函数调用Utils类的readStormConfig方法读取集群配置信息,并过滤掉信息中非法配置

(defn read-storm-config

  []

  ;; readStormConfig方法参见其定义部分

  (let [conf (clojurify-structure (Utils/readStormConfig))]

    ;; 调用validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息

    (validate-configs-with-schemas conf)

    conf))

readStormConfig方法

public static Map readStormConfig() {

    // 调用readDefaultConfig方法从defaults.yaml配置文件读取集群默认配置信息存入一个map对象ret中

    Map ret = readDefaultConfig();

    // confFile保存系统变量"storm.conf.file"的值,系统变量"storm.conf.file"保存了用户自定义配置文件的路径

    String confFile = System.getProperty("storm.conf.file");

    Map storm;

    // 如果没有用户自定义配置文件,那么调用findAndReadConfigFile方法读取"storm.yaml"配置文件,将配置信息保存在storm中,否则读取用户自定义配置文件

    if (confFile==null || confFile.equals("")) {

        storm = findAndReadConfigFile("storm.yaml", false);

    } else {

        storm = findAndReadConfigFile(confFile, true);

    }

    // 将"storm.yaml"配置文件或用户自定义的配置文件信息覆盖添加到默认配置信息ret中

    ret.putAll(storm);

    // 读取命令行提供的配置信息,并覆盖添加到之前的map对象中

    ret.putAll(readCommandLineOpts());

    // 返回保存了配置信息的map对象

    return ret;

}

submitJar方法
submitJar方法调用了StormSubmitter类的重载方法submitJar

/**

 * Submit jar file

 * @param conf the topology-specific configuration. See 
{@link
 Config}.

 * @param localJar file path of the jar file to submit

 * @return the remote location of the submitted jar

 */

public static String submitJar(Map conf, String localJar) {

    return submitJar(conf, localJar, null);

}

重载方法submitJar
submitJar通过thrift client调用nimbus thrift server中的beginFileUpload函数获取目标路径,然后将jar上传到nimbus的目标路径上

/**
 * Submit jar file
 * @param conf the topology-specific configuration. See {@link Config}.
 * @param localJar file path of the jar file to submit
 * @param listener progress listener to track the jar file upload
 * @return the remote location of the submitted jar
 */
public static String submitJar(Map conf, String localJar, ProgressListener listener) {
    if (localJar == null) {
        throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
    }
            // 创建nimbus client
    NimbusClient client = NimbusClient.getConfiguredClient(conf);
    try {
            // 调用nimbus thrift server中的beginFileUpload函数获取目标路径,beginFileUpload函数参见其定义部分
        String uploadLocation = client.getClient().beginFileUpload();
        LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
        BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
        long totalSize = new File(localJar).length();
        if (listener != null) {
            listener.onStart(localJar, uploadLocation, totalSize);
        }
        long bytesUploaded = 0;
        while(true) {
            byte[] toSubmit = is.read();
            bytesUploaded += toSubmit.length;
            if (listener != null) {
                listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
            }
            if(toSubmit.length==0) break;
            // 调用nimbus thrift server中的uploadChunk函数将jar文件上传nimbus服务器,uploadChunk函数参见其定义部分
            client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
        }
        // 调用nimbus thrift server中的finishFileUpload完成jar文件上传,finishFileUpload函数参见其定义部分
        client.getClient().finishFileUpload(uploadLocation);
        if (listener != null) {
            listener.onCompleted(localJar, uploadLocation, totalSize);
        }
        LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
        // 返回jar文件上传nimbus的路径
        return uploadLocation;
    } catch(Exception e) {
        throw new RuntimeException(e);            
    } finally {
        client.close();
    }
}

beginFileUpload函数

(beginFileUpload [this]

    ;; fileloc就是jar上传到nimbus上的目录"{storm.local.dir}/nimubs/inbox/stormjar-(uuid).jar",storm.local.dir是在配置信息中设置的

    (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]

      ;; (:uploaders nimbus)获取nimbus元数据中的TimeCacheMap,关于TimeCacheMap将在以后博客详细分析,将fileloc及其对应的FileOutputStream放入TimeCacheMap

      (.put (:uploaders nimbus)

            fileloc

            (Channels/newChannel (FileOutputStream. fileloc)))

      (log-message "Uploading file from client to " fileloc)

      ;; 返回上传路径

      fileloc

      ))

shell_submission.clj就分析到这里了,分析过程只列举了一些重要的函数,还有一些辅助函数没有列出,感兴趣的可以自己查看下。

本文转载自:https://segmentfault.com/a/1190000000658810

Adel
粉丝 10
博文 71
码字总数 61751
作品 0
海淀
程序员
私信 提问
storm client command

最近在研究实时日志分析,storm确实不错,以下是命令参数: storm help Syntax: storm jar topology-jar-path class 运行jar包中类的主函数和指定的参数 Commands: activate storm activate ...

China_OS
2014/02/22
1K
0
Apache Storm 1.0.5 发布,分布式实时计算

Apache Storm 1.0.4 已发布,Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm ...

王练
2017/09/16
856
5
Spark cluster 部署

Spark 框架 Spark与Storm的对比 对于Storm来说: 1、建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析 2、此外,如果对于实时计算的...

meteor_hy
2018/06/27
0
0
windows 安装 storm 及 eclipse 调试 TopN 实例

一:安装JDK 下载地址:地址一 地址二 配置Java环境变量 JAVAHOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考): D:javajdk1.8 %JAVAHOME%/bin;%JAVAHOME%/jre/bin ....

大数据之路
2012/06/08
311
1
Storm概念讲解和工作原理介绍

Strom的结构 Storm与传统关系型数据库 传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存 传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据 关系型数据库重...

张超
2015/04/26
2.8K
0

没有更多内容

加载失败,请刷新页面

加载更多

Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0
Linux创建yum仓库

第一步、搞定自己的光盘 #创建文件夹 mkdir -p /media/cdrom #挂载光盘 mount /dev/cdrom /media/cdrom #编辑配置文件使其永久生效 vim /etc/fstab 第二步,编辑yun源 vim /ect yum.repos.d...

究极小怪兽zzz
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部