文档章节

elasticsearch 源代码分析之引擎

纳兰琴
 纳兰琴
发布于 2015/01/06 17:15
字数 1237
阅读 1164
收藏 1

        Engine是ES最接近神Lucene的地方,是对Lucene分布式环境访问的一层封装。这个类的接口,是个命令模式,所以很自然的实现了操作日志 translog。引擎旧版本的实现类叫RobinEngine,新版本改名了,而且加了几个版本类型。不过这对我们分析源码影响不大。它主要的2个内容是 操作日志数据版本。亮点是锁的运用。我们这里不罗列代码了,就从 封装并发  2个角度看下代码。

启动引擎

        既然是引擎,开始讨论之前,先启动了它再说。ES是用guice管理的实例。我们为了直观一点,就直接New了。

public Engine createEngine() throws IOException {
		 	Index index=new Index("index");
		 	
		 	ShardId shardId = new ShardId(index, 1);
	    	
		 	ThreadPool threadPool = new ThreadPool();
		 	
		 	CodecService cs = new CodecService(shardId.index());
		 	
	        AnalysisService as = new AnalysisService(shardId.index());
	        
	        SimilarityService ss = new SimilarityService(shardId.index());
		 	
		 	Translog translog = new FsTranslog(shardId, EMPTY_SETTINGS, new File("c:/fs-translog"));
		 	
		 	DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
		 	
	        Store store =  new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
	       
	        SnapshotDeletionPolicy sdp = new SnapshotDeletionPolicy( new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
	        
	        MergeSchedulerProvider<?> scp = new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
	        
	        MergePolicyProvider<?> mpp = new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(index, EMPTY_SETTINGS));
	        
	        IndexSettingsService iss = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
	        
	        ShardIndexingService sis = new ShardIndexingService(shardId, EMPTY_SETTINGS,new ShardSlowLogIndexingService(shardId,EMPTY_SETTINGS, iss));
	        
	        Engine engine =  new RobinEngine(shardId,EMPTY_SETTINGS,threadPool,iss,sis,null,store, sdp, translog,mpp, scp,as, ss,cs);
	        
	        return engine;
	}


对Lucene的封装

        封装后,其实就是用JSON语法查询,返回JSON内容。这个在引擎这个层面,还没实现。回想下Lucene的CURD。我们再看看ES引擎的CURD是什么样的。首先对Document封装了下,叫ParsedDocument

private ParsedDocument createParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl,Analyzer analyzer, BytesReference source, boolean mappingsModified) {
	        Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
	        Field versionField = new NumericDocValuesField("_version", 0);
	        Document document=new Document();
	        document.add(uidField);
	        document.add(versionField);
	        document.add(new Field("_source",source.toBytes()));
	        document.add(new TextField("name", "myname", Field.Store.NO));
	        return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, mappingsModified);
	}

Engine engine = createEngine();
engine.start();
String json="{\"name\":\"myname\"}";
BytesReference source = new BytesArray(json.getBytes());
ParsedDocument doc = createParsedDocument("2", "myid", "mytype", null, -1, -1, Lucene.STANDARD_ANALYZER, source, false);
//增加
Engine.Create create = new Engine.Create(null, newUid("2"), doc);
engine.create(create);
create.version(2);
create.versionType(VersionType.EXTERNAL);
engine.create(create);
//删除
Engine.Delete delete = new Engine.Delete("mytype", "myid", newUid("2"));
engine.delete(delete);
//修改类似增加,省略了
//查询
TopDocs result=null;
try {
    Query query=new MatchAllDocsQuery();
    result=engine.searcher().searcher().search(query,10);
    System.out.println(result.totalHits);
} catch (Exception e) {
    e.printStackTrace();
}
//获取
Engine.GetResult gr = engine.get(new Engine.Get(true, newUid("1")));
System.out.println(gr.source().source.toUtf8());

从用法上来看这个小家伙,也没那么厉害啊。没关系,到后来他会越来越厉害,直到最后成长成高大上的ES。


query和Get的比较

query的流程我们是清楚的,那Lucene没有的这个Get操作是什么样的逻辑呢? 我们看下

最理想的情况下,直接从translog里,返回数据。否则会根据UID在TermsEnum里定位这个词条(二分查找?),然后根据指针从fdt文件里,返回内容。

就说如果把query分,query和fatch 2个阶段的话,他是前面的阶段不一样的。比起termQuery少了一些步骤。

//我简化了代码,演示代码
for(AtomicReaderContext context : reader.leaves()){
			try {
				Terms terms=context.reader().terms("brand");
				TermsEnum te= terms.iterator(null);
				BytesRef br=new BytesRef("陌".getBytes());
				
				if(te.seekExact(br,false)){
					DocsEnum docs = te.docs(null, null);
					for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) {
						System.out.println(reader.document(d).getBinaryValue("_source").utf8ToString());
		            }
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

refresh和Flush的区别

refresh调用的就是Lucene的 searcherManager.maybeRefresh() ,Flush的话分,3种情况

static class Flush {

        public static enum Type {
            /**
             * 创建一个新的Writer
             */
            NEW_WRITER,
            /**
             * 提交writer
             */
            COMMIT,
            /**
             * 提交translog.
             */
            COMMIT_TRANSLOG
        }

refresh的话更轻量一点。他默认会自动刷新

@Override
    public TimeValue defaultRefreshInterval() {
        return new TimeValue(1, TimeUnit.SECONDS);
    }


并发控制

它每一个操作的逻辑都差不多,我们选一个创建看下。引擎是整个ES用锁用的最频繁的地方,一层套一层的用啊,不出事,也是怪事一件。

    @Override
    public void create(Create create) throws EngineException {
        rwl.readLock().lock();
        try {
            IndexWriter writer = this.indexWriter;
            if (writer == null) {
                throw new EngineClosedException(shardId, failedEngine);
            }
            innerCreate(create, writer);
            dirty = true;
            possibleMergeNeeded = true;
            flushNeeded = true;
        } catch (IOException e) {
            throw new CreateFailedEngineException(shardId, create, e);
        } catch (OutOfMemoryError e) {
            failEngine(e);
            throw new CreateFailedEngineException(shardId, create, e);
        } catch (IllegalStateException e) {
            if (e.getMessage().contains("OutOfMemoryError")) {
                failEngine(e);
            }
            throw new CreateFailedEngineException(shardId, create, e);
        } finally {
            rwl.readLock().unlock();
        }
    }

    private void innerCreate(Create create, IndexWriter writer) throws IOException {
        synchronized (dirtyLock(create.uid())) {
            //....省略下数据版本的验证,不在这里讲
            if (create.docs().size() > 1) {
                writer.addDocuments(create.docs(), create.analyzer());
            } else {
                writer.addDocument(create.docs().get(0), create.analyzer());
            }
            Translog.Location translogLocation = translog.add(new Translog.Create(create));

            //versionMap.put(versionKey, new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));

            indexingService.postCreateUnderLock(create);
        }
    }

首先是个读写锁。很多操作都加读锁了,写锁只有启动关闭引擎,recover-phase3 ,NEW_WRITER类型的flush的时候才加的。意思就是。。。在我启动关闭引擎,数据恢复和重新创建indexWriter的时候,是不能干任何事情的。接下来是个对象锁。UID加锁,用的锁分段技术,就是ConcurrentHashMap的原理。减少了大量锁对象的创建。要知道UID可是个海量对象啊。这里要是用个String,分分钟就OO吧。

private final Object[] dirtyLocks;
this.dirtyLocks = new Object[indexConcurrency * 50]; // 默认最多会有8*50个锁对象...
for (int i = 0; i < dirtyLocks.length; i++) {
        dirtyLocks[i] = new Object();
}
        
private Object dirtyLock(BytesRef uid) {
        int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
        // abs returns Integer.MIN_VALUE, so we need to protect against it...
        if (hash == Integer.MIN_VALUE) {
            hash = 0;
        }
        return dirtyLocks[Math.abs(hash) % dirtyLocks.length];
}


© 著作权归作者所有

共有 人打赏支持
纳兰琴
粉丝 49
博文 23
码字总数 13442
作品 0
杭州
高级程序员
私信 提问
Elastic 在年度用户大会 Elastic{ON} 2018 上发布众多新功能和技术预览

下载超过 2.25 亿次,Elastic 公开 X-Pack 源代码 旧金山 (Elastic{ON} 2018) – 2018 年 2 月 27 日 – Elastic,Elasticsearch 和 Elastic Stack背后的公司,今天宣布其产品累计下载次数达...

Medcl
03/01
0
0
centos 7( linux )下安装elasticsearch教程

目录 概述 环境准备 elaticsearch简介 安装elasticsearch 彩蛋 概述 很久没有写博客了,最近在做全文检索的项目,发现elasticsearch踩了不少坑,百度点进去又是坑,在此记录一下自己的踩坑历程。...

java_龙
10/15
0
0
简单搭建 ELK + OpenWAF 环境

ELK 是比较火的开源日志分析系统 文章主要介绍,ELK 的 docker 部署及与 OpenWAF 的结合 OpenWAF简介 OpenWAF是第一个全方位开源的Web应用防护系统(WAF),他基于nginx_lua API分析HTTP请求...

温柔魔君
2017/07/05
0
0
当ES赶超Redis,这份ES进修攻略不容错过!

从4月DB-Engines最新发布的全球数据库排名中,我们赫然发现ElasticSearch逆袭超越了Redis,从原先的第9名上升至第8名,而Redis则落后一名,排在了其后。 事实上,这场逆袭并不算太让人意外。...

DBAplus社群
04/15
0
0
基于弹性堆栈(ELK堆栈)的日志分析、存储及展示

ELK简介 “ELK”是三个开源项目的首字母缩写:Elasticsearch,Logstash和Kibana。Elasticsearch是一个搜索和分析引擎。Logstash是一个服务器端数据处理管道,它同时从多个源中提取数据,对其...

cchenyz
08/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

闲话高并发的那些神话,看京东架构师如何把它拉下神坛

高并发也算是这几年的热门词汇了,尤其在互联网圈,开口不聊个高并发问题,都不好意思出门。高并发有那么邪乎吗?动不动就千万并发、亿级流量,听上去的确挺吓人。但仔细想想,这么大的并发与...

James-
13分钟前
1
0
Emacs 系列:让我们拥抱 Emacs 和 org 模式

导读 我必须承认,在使用了几十年的 vim 后, 我被 Emacs 吸引了。长期以来,我一直对如何组织安排事情感到沮丧。我也有用过 GTD 和 ZTD 之类的方法,但是像邮件或是大型文件这样的事务真的很...

问题终结者
14分钟前
1
0
解析Node.js通过axios实现网络请求

本次给大家分享一篇node.js通过axios实现网络请求的方法,写的十分的全面细致,具有一定的参考价值,对此有需要的朋友可以参考学习下。如有不足之处,欢迎批评指正。 1、使用Npm 下载axios n...

前端攻城老湿
27分钟前
4
0
深入浅出之React-redux中connect的装饰器用法@connect

这篇文章主要介绍了react-redux中connect的装饰器用法@connect详解,写的十分的全面细致,具有一定的参考价值,对此有需要的朋友可以参考学习下。如有不足之处,欢迎批评指正。 通常我们需要一...

前端攻城小牛
28分钟前
2
0
详解css BEM书写规范

BEM是基于组件的web开发方法。其思想是将用户界面分隔为独立的块,从而使开发复杂的UI界面变得更简单和快,且不需要粘贴复制便可复用现有代码。BEM由Block、Element、Modifier组成。选择器里...

前端小攻略
43分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部