elasticsearch 插件开发 (四) 源代码分析

原创
2014/09/26 13:52
阅读数 786

        elasticsearch 的 基础类,主要分成 Component (组件) 和 Module (模块)。

  1. 组件

    1. CloseableComponent                可关闭组件

    2. AbstractComponent                 可配置组件

    3. LifecycleComponent                 活动可关闭组件

    4. AbstractLifecycleComponent    可配置的活动可关闭组件 (。。。有点长)

  2. 模块

    1. Module                                    模块

    2. PreProcessModule                   预处理模块

    3. SpawnModules                       新模块

一.     AbstractPlugin 类 ,一个插件类要继承自它或者实现Plugin接口

插件类需要实现的Plugin接口,每个方法所对应的组件类型是

  • modules()        Module

  • services()        LifecycleComponent

  • indexModules()        Module

  • indexServices()        CloseableIndexComponent

  • shardModules()        Module

  • shardServices()        CloseableIndexComponent


二.     现在插件实现类,有了,那他是怎么被加载到整个系统里面的呢?那就要请出我们插件组的各个成员了。


  • PluginManager类,插件管理类,负责插件的安装,卸载,下载等工作。

  • PluginsHelper类,插件帮助类,负责列出环境下面所有的site插件。

  • PluginsService类,插件服务类,负责插件的加载,实例化和维护插件信息。


整个节点启动的时候,InternalNode的构造方法里加载配置文件以后,首先会实例化 PluginsService 类,还有PluginsModule模块 。

this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
this.environment = tuple.v2();

CompressorFactory.configure(settings);

NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment);

ModulesBuilder modules = new ModulesBuilder();
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));
modules.add(new NodeModule(this));

PluginsService类的构造方法里,会开始加载插件类,从配置文件和Classpath里面,并且处理 plugin.mandatory 配置的强依赖插件,和模块引用

   public PluginsService(Settings settings, Environment environment) {
        super(settings);
        this.environment = environment;

        Map<String, Plugin> plugins = Maps.newHashMap();

        //首先,我们从配置文件加载,默认的插件类
        String[] defaultPluginsClasses = settings.getAsArray("plugin.types");
        for (String pluginClass : defaultPluginsClasses) {
            Plugin plugin = loadPlugin(pluginClass, settings);
            plugins.put(plugin.name(), plugin);
        }

        // 现在, 我们查找,所有的在ClassPath下面的插件
        loadPluginsIntoClassLoader();
        plugins.putAll(loadPluginsFromClasspath(settings)); //加载JVM插件
        Set<String> sitePlugins = PluginsHelper.sitePlugins(this.environment); //加载站点插件
        
        //强制依赖的插件,如果没有找到
        String[] mandatoryPlugins = settings.getAsArray("plugin.mandatory", null);
        if (mandatoryPlugins != null) {
            Set<String> missingPlugins = Sets.newHashSet();
            for (String mandatoryPlugin : mandatoryPlugins) {
                if (!plugins.containsKey(mandatoryPlugin) && !sitePlugins.contains(mandatoryPlugin) && !missingPlugins.contains(mandatoryPlugin)) {
                    missingPlugins.add(mandatoryPlugin);
                }
            }
            if (!missingPlugins.isEmpty()) {
            	//抛出异常,整个节点启动失败!
                throw new ElasticSearchException("Missing mandatory plugins [" + Strings.collectionToDelimitedString(missingPlugins, ", ") + "]");
            }
        }
        logger.info("loaded {}, sites {}", plugins.keySet(), sitePlugins);
        this.plugins = ImmutableMap.copyOf(plugins);

        //现在,所有插件都加载好了,处理插件实现类的 onModule 方法的引用 ,这里有 依赖注入的秘密。
        
        MapBuilder<Plugin, List<OnModuleReference>> onModuleReferences = MapBuilder.newMapBuilder();
        for (Plugin plugin : plugins.values()) {
            List<OnModuleReference> list = Lists.newArrayList();
            //....
        }
        this.onModuleReferences = onModuleReferences.immutableMap();
        this.refreshInterval = componentSettings.getAsTime("info_refresh_interval", TimeValue.timeValueSeconds(10));

    }

PluginsService.info() 方法,是插件信息类,NodeService 会调用 ,是节点信息类的一部分,就是REST接口 /nodes 返回的内容

//PluginInfo类的字段
static final class Fields {
        static final XContentBuilderString NAME = new XContentBuilderString("name");
        static final XContentBuilderString DESCRIPTION = new XContentBuilderString("description");
        static final XContentBuilderString URL = new XContentBuilderString("url");
        static final XContentBuilderString JVM = new XContentBuilderString("jvm");
        static final XContentBuilderString SITE = new XContentBuilderString("site");
}

InternalNode 会继续调用 modules.createInjector() 方法去实例化所有的模块。PluginsModule模块会去实例化和调用我们的插件

// 创建Plugin 类覆盖  modules 方法的模块
    @Override
    public Iterable<? extends Module> spawnModules() {
        List<Module> modules = Lists.newArrayList();
        Collection<Class<? extends Module>> modulesClasses = pluginsService.modules();
        for (Class<? extends Module> moduleClass : modulesClasses) {
            modules.add(createModule(moduleClass, settings));
        }
        modules.addAll(pluginsService.modules(settings));
        return modules;
    }
    // 处理Plugin类实现了 onModule 方法的类
    @Override
    public void processModule(Module module) {
        pluginsService.processModule(module);
    }

InternalNode 的 start()方法,就是节点启动的时候,会启动Plugin类覆盖了services方法的服务

 for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
            injector.getInstance(plugin).start();
 }

InternalIndicesService.createIndex() 方法,也就是创建索引的时候,会创建 Plugin类 覆盖了 indexModules() 的模块

InternalIndexService.createShard() 方法,创建分片的时候,会去创建Plugin类 覆盖了 shardModules() 的模块

同理删除索引和分片的时候,会销毁模块和关闭服务。也就是插件扩展的服务和模块是有3个生命周期的。

  • Global    节点级别

  • Index    索引级别

  • Shard    分片级别

三.     插件类有了,插件也被加载进系统了,那它是怎么扩展现有模块服务的,那些模块可以扩展,那些不可以?

可扩展的模块,一般都提供了 addXXX,registerXXX 等方法

//智能提示
public void onModule(SuggestModule suggestModule) {
    suggestModule.registerSuggester(MySuggester.class);
}
//REST
public void onModule(RestModule restModule) {
    restModule.addRestAction(MyRestAction.class);
}
//高亮
public void onModule(HighlightModule highlightModule) {
    highlightModule.registerHighlighter(MyHighlighter.class);
}

可替换的模块,一般是实现了SpawnModules接口的模块,比如DiscoveryModule

@Override
    public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultDiscoveryModule;
        if (settings.getAsBoolean("node.local", false)) {
            defaultDiscoveryModule = LocalDiscoveryModule.class;
        } else {
            defaultDiscoveryModule = ZenDiscoveryModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass("discovery.type", defaultDiscoveryModule, "org.elasticsearch.discovery.", "DiscoveryModule"), settings));
}

根据配置项discovery.type来确定加载那个模块

不可以扩展或替换的组件,比如  Internal 开头的组件,InternalClusterService,InternalIndicesService    等是不可以替换的。


展开阅读全文
打赏
0
3 收藏
分享
加载中
整个插件开发及源码分析的总结很到位 79
2015/05/12 17:20
回复
举报
更多评论
打赏
1 评论
3 收藏
0
分享
返回顶部
顶部