文档章节

elasticsearch 插件开发 (四) 源代码分析

纳兰琴
 纳兰琴
发布于 2014/09/26 13:52
字数 1066
阅读 685
收藏 3

        elasticsearch 的 基础类,主要分成 Component (组件) 和 Module (模块)。

  1. 组件

    1. CloseableComponent                可关闭组件

    2. AbstractComponent                 可配置组件

    3. LifecycleComponent                 活动可关闭组件

    4. AbstractLifecycleComponent    可配置的活动可关闭组件 (。。。有点长)

  2. 模块

    1. Module                                    模块

    2. PreProcessModule                   预处理模块

    3. SpawnModules                       新模块

一.     AbstractPlugin 类 ,一个插件类要继承自它或者实现Plugin接口

插件类需要实现的Plugin接口,每个方法所对应的组件类型是

  • modules()        Module

  • services()        LifecycleComponent

  • indexModules()        Module

  • indexServices()        CloseableIndexComponent

  • shardModules()        Module

  • shardServices()        CloseableIndexComponent


二.     现在插件实现类,有了,那他是怎么被加载到整个系统里面的呢?那就要请出我们插件组的各个成员了。


  • PluginManager类,插件管理类,负责插件的安装,卸载,下载等工作。

  • PluginsHelper类,插件帮助类,负责列出环境下面所有的site插件。

  • PluginsService类,插件服务类,负责插件的加载,实例化和维护插件信息。


整个节点启动的时候,InternalNode的构造方法里加载配置文件以后,首先会实例化 PluginsService 类,还有PluginsModule模块 。

this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
this.environment = tuple.v2();

CompressorFactory.configure(settings);

NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment);

ModulesBuilder modules = new ModulesBuilder();
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));
modules.add(new NodeModule(this));

PluginsService类的构造方法里,会开始加载插件类,从配置文件和Classpath里面,并且处理 plugin.mandatory 配置的强依赖插件,和模块引用

   public PluginsService(Settings settings, Environment environment) {
        super(settings);
        this.environment = environment;

        Map<String, Plugin> plugins = Maps.newHashMap();

        //首先,我们从配置文件加载,默认的插件类
        String[] defaultPluginsClasses = settings.getAsArray("plugin.types");
        for (String pluginClass : defaultPluginsClasses) {
            Plugin plugin = loadPlugin(pluginClass, settings);
            plugins.put(plugin.name(), plugin);
        }

        // 现在, 我们查找,所有的在ClassPath下面的插件
        loadPluginsIntoClassLoader();
        plugins.putAll(loadPluginsFromClasspath(settings)); //加载JVM插件
        Set<String> sitePlugins = PluginsHelper.sitePlugins(this.environment); //加载站点插件
        
        //强制依赖的插件,如果没有找到
        String[] mandatoryPlugins = settings.getAsArray("plugin.mandatory", null);
        if (mandatoryPlugins != null) {
            Set<String> missingPlugins = Sets.newHashSet();
            for (String mandatoryPlugin : mandatoryPlugins) {
                if (!plugins.containsKey(mandatoryPlugin) && !sitePlugins.contains(mandatoryPlugin) && !missingPlugins.contains(mandatoryPlugin)) {
                    missingPlugins.add(mandatoryPlugin);
                }
            }
            if (!missingPlugins.isEmpty()) {
            	//抛出异常,整个节点启动失败!
                throw new ElasticSearchException("Missing mandatory plugins [" + Strings.collectionToDelimitedString(missingPlugins, ", ") + "]");
            }
        }
        logger.info("loaded {}, sites {}", plugins.keySet(), sitePlugins);
        this.plugins = ImmutableMap.copyOf(plugins);

        //现在,所有插件都加载好了,处理插件实现类的 onModule 方法的引用 ,这里有 依赖注入的秘密。
        
        MapBuilder<Plugin, List<OnModuleReference>> onModuleReferences = MapBuilder.newMapBuilder();
        for (Plugin plugin : plugins.values()) {
            List<OnModuleReference> list = Lists.newArrayList();
            //....
        }
        this.onModuleReferences = onModuleReferences.immutableMap();
        this.refreshInterval = componentSettings.getAsTime("info_refresh_interval", TimeValue.timeValueSeconds(10));

    }

PluginsService.info() 方法,是插件信息类,NodeService 会调用 ,是节点信息类的一部分,就是REST接口 /nodes 返回的内容

//PluginInfo类的字段
static final class Fields {
        static final XContentBuilderString NAME = new XContentBuilderString("name");
        static final XContentBuilderString DESCRIPTION = new XContentBuilderString("description");
        static final XContentBuilderString URL = new XContentBuilderString("url");
        static final XContentBuilderString JVM = new XContentBuilderString("jvm");
        static final XContentBuilderString SITE = new XContentBuilderString("site");
}

InternalNode 会继续调用 modules.createInjector() 方法去实例化所有的模块。PluginsModule模块会去实例化和调用我们的插件

// 创建Plugin 类覆盖  modules 方法的模块
    @Override
    public Iterable<? extends Module> spawnModules() {
        List<Module> modules = Lists.newArrayList();
        Collection<Class<? extends Module>> modulesClasses = pluginsService.modules();
        for (Class<? extends Module> moduleClass : modulesClasses) {
            modules.add(createModule(moduleClass, settings));
        }
        modules.addAll(pluginsService.modules(settings));
        return modules;
    }
    // 处理Plugin类实现了 onModule 方法的类
    @Override
    public void processModule(Module module) {
        pluginsService.processModule(module);
    }

InternalNode 的 start()方法,就是节点启动的时候,会启动Plugin类覆盖了services方法的服务

 for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
            injector.getInstance(plugin).start();
 }

InternalIndicesService.createIndex() 方法,也就是创建索引的时候,会创建 Plugin类 覆盖了 indexModules() 的模块

InternalIndexService.createShard() 方法,创建分片的时候,会去创建Plugin类 覆盖了 shardModules() 的模块

同理删除索引和分片的时候,会销毁模块和关闭服务。也就是插件扩展的服务和模块是有3个生命周期的。

  • Global    节点级别

  • Index    索引级别

  • Shard    分片级别

三.     插件类有了,插件也被加载进系统了,那它是怎么扩展现有模块服务的,那些模块可以扩展,那些不可以?

可扩展的模块,一般都提供了 addXXX,registerXXX 等方法

//智能提示
public void onModule(SuggestModule suggestModule) {
    suggestModule.registerSuggester(MySuggester.class);
}
//REST
public void onModule(RestModule restModule) {
    restModule.addRestAction(MyRestAction.class);
}
//高亮
public void onModule(HighlightModule highlightModule) {
    highlightModule.registerHighlighter(MyHighlighter.class);
}

可替换的模块,一般是实现了SpawnModules接口的模块,比如DiscoveryModule

@Override
    public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultDiscoveryModule;
        if (settings.getAsBoolean("node.local", false)) {
            defaultDiscoveryModule = LocalDiscoveryModule.class;
        } else {
            defaultDiscoveryModule = ZenDiscoveryModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass("discovery.type", defaultDiscoveryModule, "org.elasticsearch.discovery.", "DiscoveryModule"), settings));
}

根据配置项discovery.type来确定加载那个模块

不可以扩展或替换的组件,比如  Internal 开头的组件,InternalClusterService,InternalIndicesService    等是不可以替换的。


© 著作权归作者所有

纳兰琴
粉丝 51
博文 23
码字总数 13442
作品 0
杭州
高级程序员
私信 提问
加载中

评论(1)

timmy-duncan
timmy-duncan
整个插件开发及源码分析的总结很到位 79
基于Bro的应用层数据包识别工具

  拓扑介绍   应用识别系统将会通过bro识别应用并生成日志。ELK部署在远程端,用于收集,分析,存储和识别所有日志。BRO安装在IP为192.168.1.147的机器上,ELK安装在IP为192.168.1.142的...

FreeBuf
2018/08/13
0
0
Elastic 在年度用户大会 Elastic{ON} 2018 上发布众多新功能和技术预览

下载超过 2.25 亿次,Elastic 公开 X-Pack 源代码 旧金山 (Elastic{ON} 2018) – 2018 年 2 月 27 日 – Elastic,Elasticsearch 和 Elastic Stack背后的公司,今天宣布其产品累计下载次数达...

Medcl
2018/03/01
7
0
ES(elasticsearch)搜索引擎

ES(elasticsearch)搜索引擎 0、授人以渔,少走半年弯路! 死磕 Elasticsearch 方法论:普通程序员高效精进的 10 大狠招! 一、Elasitcsearch基础篇 1.1 Elasitcsearch基础认知 1、Elasticse...

Ocean_K
2018/09/11
2.3K
6
小白都会超详细--ELK日志管理平台搭建教程

目录 一、介绍 二、安装JDK 三、安装Elasticsearch 四、安装Logstash 五、安装Kibana 六、Kibana简单使用 系统环境:CentOS Linux release 7.4.1708 (Core) 当前问题状况 开发人员不能登录线...

渣渣辉
2018/07/15
0
0
ElasticSearch+kibana+logstash集群部署

ELK原理与介绍   ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用...

若此生无缘
2018/08/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

RxJava进行单元测试的方式

@Test public void completeTask_retrievedTaskIsComplete() { // Given a new task in the persistent repository final Task newTask = new Task(TITLE, ""); ......

SuShine
26分钟前
5
0
正则表达式大全

检验手机号码 # 要求:手机号码必须为11位数字,以1开头,第二位为1或5或8。import redef verify_mobile(): mob = input("请输入手机号码:") ret = re.match(r"1[358]\d{9}", m......

彩色泡泡糖
30分钟前
5
0
QT之border-image属性

一、border-image的兼容性 border-image可以说是CSS3中的一员大将,将来一定会大放光彩,其应用潜力真的是非常的惊人。可惜目前支持的浏览器有限,仅Firefox3.5,chrome浏览器,Safari3+支持...

shzwork
31分钟前
6
0
Kubernetes Operator简易教程

1. 安装operator-sdk //安装 operator-sdk$ apt-get install operator-sdk.....$ operator-sdk versionoperator-sdk version: v0.7.0$ go versiongo version go1.11.4 darwin/amd64 2......

Robotcl_Blog
31分钟前
5
0
再谈DAG任务分解和Shuffle RDD

1、DagScheduler分析 DagScheduler功能主要是负责RDD的各个stage的分解和任务提交。Stage分解是从触发任务调度过程的finalStage开始倒推寻找父stage,如果父stage没有提交任务则循环提交缺失...

守望者之父
37分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部