mongodb内核源码实现、性能调优、最佳运维实践系列-Mongodb write写(增、删、改)模块设计与实现

Mongodb write写(增、删、改)模块源码实现     

       前面的《transport_layer网络传输层模块源码实现》和《command命令处理模块源码实现》详细的分析了mongodb内核网络数据收发过程以及命令解析处理的整个过程,本文将继续分析该系列的第三个子模块-《write写(增、删、改)模块源码实现》。

关于作者

      前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责数万亿级数据量文档数据库mongodb内核研发、性能优化及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz

序言

       本文是oschina专栏《mongodb 源码实现、调优、最佳实践系列》的第19篇文章,其他文章可以参考如下链接:

Qcon-万亿级数据库 MongoDB 集群性能数十倍提升及机房多活容灾实践

Qcon 现代数据架构 -《万亿级数据库 MongoDB 集群性能数十倍提升优化实践》核心 17 问详细解答

百万级高并发 mongodb 集群性能数十倍提升优化实践 (上篇)

百万级高并发 mongodb 集群性能数十倍提升优化实践 (下篇)

Mongodb特定场景性能数十倍提升优化实践(记一次mongodb核心集群雪崩故障)

常用高并发网络线程模型设计及mongodb线程模型优化实践

为何要对开源mongodb数据库内核做二次开发

盘点 2020 | 我要为分布式数据库 mongodb 在国内影响力提升及推广做点事

 百万级代码量 mongodb 内核源码阅读经验分享

话题讨论 | mongodb 拥有十大核心优势,为何国内知名度远不如 mysql 高?

Mongodb 网络模块源码实现及性能极致设计体验

网络传输层模块实现二

网络传输层模块实现三

网络传输层模块实现四

command 命令处理模块源码实现一

command 命令处理模块源码实现二

mongodb 详细表级操作及详细时延统计实现原理 (快速定位表级时延抖动)

[图、文、码配合分析]-Mongodb write 写 (增、删、改) 模块设计与实现

Mongodb集群搭建一篇就够了-复制集模式、分片模式、带认证、不带认证等(带详细步骤说明)

  1.  write写模块与command命令处理模块衔接回顾

       上面两图是command命令处理模块的大体流程,最终经过command模块处理后,会执行对应的命令run接口,本文要分析的write模块也将从本入口入手。增、删、改三个最基本的写操作对应的命令入口如下表:

      mongodb内核write模块主要由如下目录代码实现:

      下面章节将分析增删改操作的详细内核实现流程,注意包括请求序列化解析存储、insert写入流程、update更新计划执行器、delete删除计划执行器等。

2. 增、删、改序列化解析及结构化统一存储

      本章节详细分析增、删、改三个操作的序列化解析及结构化统一存储核心实现过程。

2.1 增删改写入操作语法及其主要含义说明

  • insert插入语法及说明

      insert主要完成数据的写入操作,其命令语法如下:

1.{  
2.   insert: <collection>,  
3.   documents: [ <document>, <document>, <document>, ... ],  
4.   ordered: <boolean>,  
5.   writeConcern: { <write concern> },  
6.   bypassDocumentValidation: <boolean>  
7.} 

      insert操作主要由五个字段类型组成,具体字段功能说明如下:

  • update更新语法及说明

       update操作实现数据更新操作,其命令语法如下:

1.{  
2.   update: <collection>,  
3.   updates: [  
4.      { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,  
5.        collation: <document>, arrayFilters: <array> },  
6.      { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,  
7.        collation: <document>, arrayFilters: <array> },  
8.      { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>,  
9.        collation: <document>, arrayFilters: <array> },  
10.      ...  
11.   ],  
12.   ordered: <boolean>,  
13.   writeConcern: { <write concern> },  
14.   bypassDocumentValidation: <boolean>  
15.}  

      上述语法各字段功能说明如表:

  • delete更新语法及说明

      delete删除操作对应语法如下:

1.{  
2.   delete: <collection>,  
3.   deletes: [  
4.      { q : <query>, limit : <integer>, collation: <document> },  
5.      { q : <query>, limit : <integer>, collation: <document> },  
6.      { q : <query>, limit : <integer>, collation: <document> },  
7.      ...  
8.   ],  
9.   ordered: <boolean>,  
10.   writeConcern: { <write concern> }  
11.} 

      如上,delete语法各个字段功能说明如下:

2.2 增、删、改序列化解析

2.2.1 增、删、改核心数据结构

      从上面的insert、delete、update语法可以看出,这三个操作有一部分字段名是一样的,内核在代码实现的时候也重复利用了这一特定,把这部分成员抽象为公共类,不同的字段则在各自操作类中封装。

      最终,三个操作的字段信息通过公用类WriteCommandBase和各自私有类Insert、Update、Delete保持及解析封装。如下图所示:

      公共基类由WriteCommandBase类实现,如下:

1.class WriteCommandBase {    
2.public:  
3.   //基类接口  
4.   ......  
5.   //mongodb字段验证规则(schema validation)  
6.   bool _bypassDocumentValidation{false};  
7.   //一次对多条数据进行插入或者删除或者更新的时候,前面的数据操作失败,是否继续后面的操作  
8.   bool _ordered{true};  
9.   //事务相关,等4.2版本回头分析  
10.   boost::optional<std::vector<std::int32_t>> _stmtIds;  
11.}  

      Insert类包含WriteCommandBase类成员,同时包括Insert操作对应的私有成员信息,如下:

1.class Insert {    
2.public:  
3.    ......  
4.    //也就是db.collection  
5.    NamespaceString _nss;  
6.    //公共结构信息  
7.    WriteCommandBase _writeCommandBase;  
8.    //真正的文档在这里documents  
9.    std::vector<mongo::BSONObj> _documents;  
10.    //库信息  
11.    std::string _dbName;  
12.    //是否有documents  
13.}  

      delete删除操作对应Delete类核心成员信息如下:

1.class Delete {    
2.public:  
3.    ......  
4.    //DB.COLLECTION信息  
5.    NamespaceString _nss;  
6.    WriteCommandBase _writeCommandBase;  
7.    //具体的delete内容在这里  
8.    std::vector<DeleteOpEntry> _deletes;  
9.}  

      update更新操作对应的Update类核心成员信息如下:

1.class Update {   
2.public:  
3.      ......  
4.    //db.collection信息,也就是库.表信息  
5.    NamespaceString _nss;  
6.    WriteCommandBase _writeCommandBase;  
7.    //需要更新的具体内容在该成员中     
8.    std::vector<UpdateOpEntry> _updates;  
} 

      上面的类结构中,_documents_deletes_updates三个成员分别对应增、删、改操作的集体操作信息,都是数组类型,可以一次进行多条数据操作。

2.2.2 增、删、改解析过程

      增删改三个操作对应三个不同的类,由这三个类来完成各自操作的协议解析及封装,整体代码实现大同小异,本文只分析insert解析及封装过程,主要代码实现如下:

1.Insert Insert::parse(const IDLParserErrorContext& ctxt, const BSONObj& bsonObject) {  
2.    ......  
3.    //调用Insert::parseProtected  
4.    object.parseProtected(ctxt, bsonObject);  
5.    return object;  
6.}  
7.  
8.void Insert::parseProtected(...)  
9.{  
10.    //解析出insert类的对应成员信息  
11.    for (const auto& element :request.body) {  
12.        const auto fieldName = element.fieldNameStringData();  
13.  
14.        //解析bypassDocumentValidation信息  
15.        if (fieldName == kBypassDocumentValidationFieldName) {  
16.              ......  
17.        }  
18.        //解析ordered信息  
19.        else if (fieldName == kOrderedFieldName) {  
20.             ......  
21.        }  
22.        //解析stmtIds信息  
23.        else if (fieldName == kStmtIdsFieldName) {  
24.             ......  
25.        }  
26.        //解析需要插入的文档信息  
27.        else if (fieldName == kDocumentsFieldName) {  
28.           //解析的文档保持到_documents数组  
29.            _documents = std::move(values);  
30.        }  
31.        //解析db名  
32.        else if (fieldName == kDbNameFieldName) {  
33.            ......  
34.        }  
35.        ......  
36.    }  
37.    //从request中解析出_writeCommandBase基础成员内容  
38.    _writeCommandBase = WriteCommandBase::parse(ctxt, request.body);  
39.  
40.    ......  
41.    //根据db+collection构造出db.collection字符串  
42.    _nss = ctxt.parseNSCollectionRequired(_dbName, commandElement);  
43.} 

     和insert操作类似,update和delete操作的解析过程与insert流程一样比较简单,因此不在分析。

     最终,所有解析出的数据保存到各自类中,总结如下图所示:

      此外,增删改操作的序列化封装由write_ops_gen.cpp中的Insert::serialize()、Update::serialize()、Delete::serialize()完成,主要根据各自类完成Bson统一封装,整个实现过程比较简单,这里不在详细分析。

     增删改接口解析及序列化相关几个核心接口功能说明如下:

      注意:在insert、update、delete中还有如下一个细节,为何不见writeConcern相关成员存储?原因是writeConcern解析放到了外层runCommandImpl中通过setWriteConcern()保持到该请求对应得opCtx操作上下文中。

3. Insert数据写操作核心实现

      insert处理和command命令处理模块通过CmdInsert::runImpl()衔接,该接口代码实现如下:

1.//插入文档会走这里面  CmdInsert::runImpl  
2.void runImpl(...) final {  
3.    //从request中解析出write_ops::Insert类成员信息  
4.    const auto batch = InsertOp::parse(request);  
5.    const auto reply = performInserts(opCtx, batch);  
6.    ......  
7.} 

      InsertOp::parse()在前面章节已经分析,主要完成数据的统一解析存储。insert请求解析存储到write_ops::Insert类后,开始调用performInserts(...)处理。在该接口中完成如下流程:分批数据组装、批量数据写入、事务封装、写入存储引擎等。

3.1 数据分批组装

      由于inset一次可以插入多条数据,为了最大化满足性能要求,当写入数据很多的时候,mongodb内核通过把这些数据按照指定规则拆分到多个batch中,这样每个batch代表一批数据,然后进行统一处理。分批数据组装拆分过程核心代码实现如下:

1.//数据分批写入核心代码实现  
2.WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) {  
3.    .......  
4.    //写入数据成功后的会掉处理  
5.    //主要完成表级tps及时延统计  
6.    ON_BLOCK_EXIT([&] {  
7.    //performInserts执行完成后调用,记录执行结束时间   
8.        curOp.done();      
9.        //表级tps及时延统计  
10.        Top::get(opCtx->getServiceContext())  
11.            .record(...);  
12.  
13.    });  
14.  
15.    ......  
16.    size_t bytesInBatch = 0;  
17.    //batch数组  
18.    std::vector<InsertStatement> batch;   
19.    //默认64,可以通过db.adminCommand( { setParameter: 1, internalInsertMaxBatchSize:xx } )配置  
20.    const size_t maxBatchSize = internalInsertMaxBatchSize.load();  
21.    //当写入的数据小于64时,也就是一个batch即可一起处理  
22.    //batch最大限制为写入数据大于64或者batch中总字节数超过256K  
23.    batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize));  
24.    for (auto&& doc : wholeOp.getDocuments()) {  
25.    ......  
26.    //doc检查,例如是否嵌套过多,是否一个doc带有多个_id等  
27.        auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc);  
28.    //如果这个文档检测有异常,则跳过这个文档,进行下一个文档操作  
29.        if (!fixedDoc.isOK()) {   
30.            //啥也不做,直接忽略该doc  
31.        } else {  
32.            //事务相关,先忽略,以后会回头专门分析事务  
33.            const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);  
34.            ......  
35.        //把文档插入到batch数组  
36.            BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());  
37.            batch.emplace_back(stmtId, toInsert);  
38.            bytesInBatch += batch.back().doc.objsize();  
39.        //这里continue,就是为了把批量插入的文档组成到一个batch数组中,到达一定量一次性插入  
40.        //batch里面一次最多插入64个文档或者总字节数256K,则后续的数据拆分到下一个batch  
41.            if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)  
42.                continue;  // Add more to batch before inserting.  
43.        }  
44.  
45.    //把本batch中的数据交由该接口统一处理  
46.        bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out);  
47.    //清空batch,开始下一轮处理  
48.        batch.clear();     
49.        bytesInBatch = 0;  
50.    ......  
51.}  

      上面的代码可以总结为以下图形:

      说明,上面假设64条数据总大小不超过256KB的batch图,如果64条doc文档数据总大小超过256kb,这时候阀值则以总数据256K为限制。单个batch最大上限限制条件如下:

  • 最多64个doc文档数据。
  • 单个batch总数据长度不超过256Kb。

3.2 batch数据事务写入流程及其异常补偿机制

      一批数据通过分批拆分存入多个batch后,调用insertBatchAndHandleErrors()接口来完成单个batch的数据写入。整个batch数据写入可以在一个transaction事务完成,也可以一条数据一个事务来完成写入,具体核心代码实现如下:

1.bool insertBatchAndHandleErrors(...) {  
2.    ......  
3.    try {  
4.        //如果对应collection不存在则创建  
5.        acquireCollection(); //执行上面定义的函数  
6.        //如果collection不是固定capped集合,并且batch中数据大于一条  
7.        //则试着在一个事务中一次性写入所有的数据  
8.        if (!collection->getCollection()->isCapped() && batch.size() > 1) {    
9.            ......  
10.            //为什么这里没有检查返回值?默认全部成功? 实际上通过try catch获取到异常后,再后续改为一条一条插入  
11.            insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end());  
12.            //insert统计计数及返回值赋值  
13.            globalOpCounters.gotInserts(batch.size());  
14.            ......  
15.            std::fill_n(std::back_inserter(out->results), batch.size(), std::move(result));  
16.            curOp.debug().ninserted += batch.size();  
17.            //一个事务写入多个doc成功,直接返回  
18.            return true;  
19.        }  
20.    } catch (const DBException&) { //批量写入失败,则后面一条一条的写  
21.        collection.reset();  
22.        //注意这里没有return,在后续一条一个事务写入  
23.    }  
24.  
25.    //这里循环解析batch,实现一条数据一个在一个事务中处理  
26.    for (auto it = batch.begin(); it != batch.end(); ++it) {  
27.        globalOpCounters.gotInsert(); //insert操作计数  
28.        try {  
29.            //log() << "yang test ............getNamespace().ns():" << wholeOp.getNamespace().ns();  
30.            //writeConflictRetry里面会执行{}中的函数体   
31.            writeConflictRetry(opCtx, "insert", wholeOp.getNamespace().ns(), [&] {  
32.                try {  
33.                    ......  
34.                    //把该条文档插入    
35.                    insertDocuments(opCtx, collection->getCollection(), it, it + 1);  
36.                    //统计计数处理  
37.                    SingleWriteResult result;  
38.                    result.setN(1);  
39.                    out->results.emplace_back(std::move(result));  
40.                    curOp.debug().ninserted++;  
41.                } catch (...) {  
42.                    ......
43.                }  
44.            });  
45.        } catch (const DBException& ex) {//写入异常  
46.            //注意这里,如果失败是否还可以继续后续数据的写入  
47.            bool canContinue =  
48.                handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out);  
49.            if (!canContinue)  
50.                return false; //注意这里直接退出循环,也就是本批次数据后续数据没有写入了  
51.        }  
52.    }  
53.  
54.    return true;  
55.}  

      一批batch数据(假设64条)写入过程,如果不是capped固定集合,则这64条数据首先放入一个transaction事务中完成写入。如果写入异常,则继续一个事务一条数据写入。数据放入事务执行流程如下:

1.void insertDocuments(OperationContext* opCtx,  
2.                     Collection* collection,  
3.                     std::vector<InsertStatement>::iterator begin,  
4.                     std::vector<InsertStatement>::iterator end)  
5.    //事务开始  
6.    WriteUnitOfWork wuow(opCtx);  
7.    ......  
8.    //把数组begin到end之间的所有doc文档数据放入该事务中  
9.    uassertStatusOK(collection->insertDocuments(  
10.        opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true));  
11.    //事务结束  
12.    wuow.commit(); //WriteUnitOfWork::commit  
13.} 

      到这里后,insert操作在write模块中的流程就结束了,后续的doc写入流程存储引擎将交由storage模块实现。

上面的核心代码分析可以总结为如下总结:

      当这个batch中的数据放入同一个事务执行失败后,则改为一条一个事务循环处理,如下图所示:

3.3 中间数据写入异常如何处理

       假设一个batch数据64条数据,如果第23条数据写入失败了,后续的第24-64条数据是否需要继续写入,这就是本章节需要分析的问题。mongodb内核实现的时候通过handleError()接口判断是否需要继续写入,该接口代码如下:

1.//前面数据写入失败,是否可以继续后续数据写入
2.bool handleError(...) {  
3.    ......  
4.  
5.    //判断是什么原因引起的异常,从而返回不同的值  
6.    //如果是isInterruption错误,直接返回true,意思是不需要后续数据写入  
7.    if (ErrorCodes::isInterruption(ex.code())) {  
8.        //如果是interrupt异常,则整批数据写失败,也就是不进行后续数据写入  
9.        throw;  // These have always failed the whole batch.  
10.    }  
11.  
12.    ......  
13.    //如果ordered为false则忽略这条写入失败的数据,继续后续数据写入  
14.    return !wholeOp.getOrdered();  
15.} 

      从上面的代码可以看出,只要出现以下异常情况,就不可继续后续数据insert写入操作了,如下:

  • Interruption错误:包括Interrupted、InterruptedAtShutdown、ExceededTimeLimit、InterruptedDueToReplStateChange四种异常,其他异常情况可以继续写入。
  • ordered参数配置为false: 如果该配置为false则遇到异常不继续处理后续doc写入。

      写入异常后是否继续写总结如下图所示:

3.4 后续

      通过前面的分析可以得出,mongodb内核把多条doc文档按照指定限制把文档封装到不同batch中,然后一个batch一个batch分批处理。最终,这些batch对应数据将会通过mongodb内核的storage存储模块来完成insert事务处理,最终在CollectionImpl::insertDocuments()实现。

      Insert写入流程核心接口调用关系图如下:

 说明:数据如何组装存入wiredtiger存储引擎将在后续《storage存储模块源码实现》中详细分析。

4. delete删除操作核心实现

      delete数据删除通过命令处理模块中的CmdDelete::runImpl(...) ->performDeletes接口完成和write写模块delete操作对接,下面我们分析该接口核心代码实现,如下:

1.WriteResult performDeletes(...)  
2.{  
3.    ......  
4.  
5.    //singleOp类型为DeleteOpEntry     write_ops::Delete::getDeletes  
6.    for (auto&& singleOp : wholeOp.getDeletes()) {  
7.        //事务相关,先跳过,以后相关章节专门分析  
8.        const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);  
9.        ...... 
10.
11.        //该函数接口执行完后执行该finishCurOp  
12.        //finishCurOp实现表级QPS及时延统计 本op操作的慢日志记录等  
13.        ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });  
14.        try {  
15.            lastOpFixer.startingOp();  
16.            out.results.emplace_back(  
17.                //该delete op操作真正执行在这里,singleOp类型为DeleteOpEntry  
18.                performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));  
19.            lastOpFixer.finishedOpSuccessfully();  
20.        } catch (const DBException& ex) {  
21.            ......  
22.    }  
23.  
24.    return out;  
}  

      从上面代码分析可以看出,如果wholeOp携带有多个DeleteOpEntry(也就是singleOp )操作,则循环对singleOp 进行处理,这个处理过程由performSingleDeleteOp(...)接口实现,具体如下:

      performSingleDeleteOp(...)接口核心代码实现如下:

1.static SingleWriteResult performSingleDeleteOp(...) {  
2.    ......  
3.  
4.    //根据ns构造DeleteReques  
5.    //根据请求相关信息初始化赋值DeleteRequest  
6.    DeleteRequest request(ns);  
7.    request.setQuery(op.getQ());  
8.    request.setCollation(write_ops::collationOf(op));  
9.    request.setMulti(op.getMulti());  
10.    request.setYieldPolicy(PlanExecutor::YIELD_AUTO);  // ParsedDelete overrides this for $isolated.  
11.    request.setStmtId(stmtId);  
12.  
13.    //根据DeleteRequest构造ParsedDelete  
14.    ParsedDelete parsedDelete(opCtx, &request);  
15.    //从request解析出对应成员存入parsedDelete  
16.    uassertStatusOK(parsedDelete.parseRequest());  
17.    //检查该请求是否已经被kill掉了  
18.    opCtx->checkForInterrupt();  
19.  
20.    ......  
21.    //写必须走主节点判断及版本判断  
22.    assertCanWrite_inlock(opCtx, ns);  
23.  
24.    //从查询引擎中获取delete执行器  
25.    auto exec = uassertStatusOK(  
26.        getExecutorDelete(opCtx, &curOp.debug(), collection.getCollection(), &parsedDelete));  
27.  
28.    {  
29.        stdx::lock_guard<Client> lk(*opCtx->getClient());  
30.        CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));  
31.    }  
32.  
33.    //运行该执行器  
34.    uassertStatusOK(exec->executePlan());  
35.  
36.    //下面流程是记录各种统计信息  
37.    long long n = DeleteStage::getNumDeleted(*exec);  
38.    curOp.debug().ndeleted = n;  
39.  
40.    PlanSummaryStats summary;  
41.    //获取执行器运行过程中的各种统计信息  
42.    Explain::getSummaryStats(*exec, &summary);  
43.    if (collection.getCollection()) {  
44.        collection.getCollection()->infoCache()->notifyOfQuery(opCtx, summary.indexesUsed);  
45.    }  
46.    curOp.debug().setPlanSummaryMetrics(summary);  
47.    //统计信息序列化  
48.    if (curOp.shouldDBProfile()) {  
49.        BSONObjBuilder execStatsBob;  
50.        Explain::getWinningPlanStats(exec.get(), &execStatsBob);  
51.        curOp.debug().execStats = execStatsBob.obj();  
52.    }  
53.      
54.    ......  
55.    return result;  
56.}  

      该接口最核心的部分为获取delete执行器并运行,执行器由query查询引擎模块实现,因此getExecutorDelete(...)获取delete执行器及其运行过程具体实现流程将在后续《query查询引擎模块实现原理》章节详细分析,这里暂时跳过这一逻辑。write模块中delete操作主要接口调用流程如下:

5. update更新操作核心实现

       update数据更新操作过程和delete操作过程类似,这里不在累述,其核心接口调用流程如下图所示:

6. 下期预告

    下期将分析《storage存储模块源码实现》,storage模块分析完成后将分析mongodb最复杂的《query查询引擎源码实现》,敬请关注。

展开阅读全文
打赏
2
4 收藏
分享
加载中
打赏
0 评论
4 收藏
2
分享
返回顶部
顶部