文档章节

数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

芋道源码
 芋道源码
发布于 2017/07/20 01:32
字数 1613
阅读 4.2K
收藏 81

「深度学习福利」大神带你进阶工程师,立即查看>>>

wechat_mp

🙂🙂🙂关注微信公众号有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右


1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》
  2. 《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

  1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQLMongoDB操作 发送给 MongoDB Server
  2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。


Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.java
public MongoData query() throws MongoSQLException {
   if (!(statement instanceof SQLSelectStatement)) {
       //return null;
       throw new IllegalArgumentException("not a query sql statement");
   }
   MongoData mongo = new MongoData();
   DBCursor c = null;
   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;
   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
   int icount = 0;
   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {
       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();

       BasicDBObject fields = new BasicDBObject();

       // 显示(返回)的字段
       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {
           //System.out.println(item.toString());
           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {
               if (item.getExpr() instanceof SQLAggregateExpr) {
                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();
                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)
                       icount = 1;
                       mongo.setField(getExprFieldName(expr), Types.BIGINT);
                   }
                   fields.put(getExprFieldName(expr), 1);
               } else {
                   fields.put(getFieldName(item), 1);
               }
           }

       }

       // 表名
       SQLTableSource table = mysqlSelectQuery.getFrom();
       DBCollection coll = this._db.getCollection(table.toString());
       mongo.setTable(table.toString());

       // WHERE
       SQLExpr expr = mysqlSelectQuery.getWhere();
       DBObject query = parserWhere(expr);

       // GROUP BY
       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
       BasicDBObject gbkey = new BasicDBObject();
       if (groupby != null) {
           for (SQLExpr gbexpr : groupby.getItems()) {
               if (gbexpr instanceof SQLIdentifierExpr) {
                   String name = ((SQLIdentifierExpr) gbexpr).getName();
                   gbkey.put(name, Integer.valueOf(1));
               }
           }
           icount = 2;
       }

       // SKIP / LIMIT
       int limitoff = 0;
       int limitnum = 0;
       if (mysqlSelectQuery.getLimit() != null) {
           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
       }
       if (icount == 1) { // COUNT(*)
           mongo.setCount(coll.count(query));
       } else if (icount == 2) { // MapReduce
           BasicDBObject initial = new BasicDBObject();
           initial.put("num", 0);
           String reduce = "function (obj, prev) { " + "  prev.num++}";
           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
       } else {
           if ((limitoff > 0) || (limitnum > 0)) {
               c = coll.find(query, fields).skip(limitoff).limit(limitnum);
           } else {
               c = coll.find(query, fields);
           }

           // order by
           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();
           if (orderby != null) {
               BasicDBObject order = new BasicDBObject();
               for (int i = 0; i < orderby.getItems().size(); i++) {
                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);
                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
               }
               c.sort(order);
               // System.out.println(order);
           }
       }
       mongo.setCursor(c);
   }
   return mongo;
}

2、查询条件

// MongoSQLParser.java
private void parserWhere(SQLExpr aexpr, BasicDBObject o) {
   if (aexpr instanceof SQLBinaryOpExpr) {
       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;
       SQLExpr exprL = expr.getLeft();
       if (!(exprL instanceof SQLBinaryOpExpr)) {
           if (expr.getOperator().getName().equals("=")) {
               o.put(exprL.toString(), getExpValue(expr.getRight()));
           } else {
               String op = "";
               if (expr.getOperator().getName().equals("<")) {
                   op = "$lt";
               } else if (expr.getOperator().getName().equals("<=")) {
                   op = "$lte";
               } else if (expr.getOperator().getName().equals(">")) {
                   op = "$gt";
               } else if (expr.getOperator().getName().equals(">=")) {
                   op = "$gte";
               } else if (expr.getOperator().getName().equals("!=")) {
                   op = "$ne";
               } else if (expr.getOperator().getName().equals("<>")) {
                   op = "$ne";
               }
               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
           }
       } else {
           if (expr.getOperator().getName().equals("AND")) {
               parserWhere(exprL, o);
               parserWhere(expr.getRight(), o);
           } else if (expr.getOperator().getName().equals("OR")) {
               orWhere(exprL, expr.getRight(), o);
           } else {
               throw new RuntimeException("Can't identify the operation of  of where");
           }
       }
   }
}

private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {
   BasicDBObject xo = new BasicDBObject();
   BasicDBObject yo = new BasicDBObject();
   parserWhere(exprL, xo);
   parserWhere(exprR, yo);
   ob.put("$or", new Object[]{xo, yo});
}

3、解析 MongoDB 数据

// MongoResultSet.java
public MongoResultSet(MongoData mongo, String schema) throws SQLException {
   this._cursor = mongo.getCursor();
   this._schema = schema;
   this._table = mongo.getTable();
   this.isSum = mongo.getCount() > 0;
   this._sum = mongo.getCount();
   this.isGroupBy = mongo.getType();

   if (this.isGroupBy) {
       dblist = mongo.getGrouyBys();
       this.isSum = true;
   }
   if (this._cursor != null) {
       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);
       // 解析 fields
       if (this._cursor.hasNext()) {
           _cur = _cursor.next();
           if (_cur != null) {
               if (select.length == 0) {
                   SetFields(_cur.keySet());
               }
               _row = 1;
           }
       }
       // 设置 fields 类型
       if (select.length == 0) {
           select = new String[]{"_id"};
           SetFieldType(true);
       } else {
           SetFieldType(false);
       }
   } else {
       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
       SetFieldType(mongo.getFields());
   }
}
  • 当使用 SELECT * 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.java
private void ouputResultSet(ServerConnection sc, String sql)
       throws SQLException {
   ResultSet rs = null;
   Statement stmt = null;

   try {
       stmt = con.createStatement();
       rs = stmt.executeQuery(sql);

       // header
       List<FieldPacket> fieldPks = new LinkedList<>();
       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);
       int colunmCount = fieldPks.size();
       ByteBuffer byteBuf = sc.allocate();
       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
       headerPkg.fieldCount = fieldPks.size();
       headerPkg.packetId = ++packetId;
       byteBuf = headerPkg.write(byteBuf, sc, true);
       byteBuf.flip();
       byte[] header = new byte[byteBuf.limit()];
       byteBuf.get(header);
       byteBuf.clear();
       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
       for (FieldPacket curField : fieldPks) {
           curField.packetId = ++packetId;
           byteBuf = curField.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] field = new byte[byteBuf.limit()];
           byteBuf.get(field);
           byteBuf.clear();
           fields.add(field);
       }
       // header eof
       EOFPacket eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       byte[] eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       byteBuf.clear();
       this.respHandler.fieldEofResponse(header, fields, eof, this);

       // row
       while (rs.next()) {
           RowDataPacket curRow = new RowDataPacket(colunmCount);
           for (int i = 0; i < colunmCount; i++) {
               int j = i + 1;
               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                   curRow.add(rs.getBytes(j));
               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                   // ensure that do not use scientific notation format
                   BigDecimal val = rs.getBigDecimal(j);
                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));
               } else {
                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
               }
           }
           curRow.packetId = ++packetId;
           byteBuf = curRow.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] row = new byte[byteBuf.limit()];
           byteBuf.get(row);
           byteBuf.clear();
           this.respHandler.rowResponse(row, this);
       }
       fieldPks.clear();
       // row eof
       eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       sc.recycle(byteBuf);
       this.respHandler.rowEofResponse(eof, this);
   } finally {
       if (rs != null) {
           try {
               rs.close();
           } catch (SQLException e) {
           }
       }
       if (stmt != null) {
           try {
               stmt.close();
           } catch (SQLException e) {
           }
       }
   }
}

// MongoResultSet.java
@Override
public String getString(String columnLabel) throws SQLException {
   Object x = getObject(columnLabel);
   if (x == null) {
       return null;
   }
   return x.toString();
}
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql> select * from user order by _id asc;
+--------------------------+------+-------------------------------+
| _id                      | name | profile                       |
+--------------------------+------+-------------------------------+
| 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

// MongoSQLParser.java
public int executeUpdate() throws MongoSQLException {
   if (statement instanceof SQLInsertStatement) {
       return InsertData((SQLInsertStatement) statement);
   }
   if (statement instanceof SQLUpdateStatement) {
       return UpData((SQLUpdateStatement) statement);
   }
   if (statement instanceof SQLDropTableStatement) {
       return dropTable((SQLDropTableStatement) statement);
   }
   if (statement instanceof SQLDeleteStatement) {
       return DeleteDate((SQLDeleteStatement) statement);
   }
   if (statement instanceof SQLCreateTableStatement) {
       return 1;
   }
   return 1;
}

private int InsertData(SQLInsertStatement state) {
   if (state.getValues().getValues().size() == 0) {
       throw new RuntimeException("number of  columns error");
   }
   if (state.getValues().getValues().size() != state.getColumns().size()) {
       throw new RuntimeException("number of values and columns have to match");
   }
   SQLTableSource table = state.getTableSource();
   BasicDBObject o = new BasicDBObject();
   int i = 0;
   for (SQLExpr col : state.getColumns()) {
       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
       i++;
   }
   DBCollection coll = this._db.getCollection(table.toString());
   coll.insert(o);
   return 1;
}

5. 彩蛋

老铁,看到这里,来一波微信公众号关注吧?!

wechat_mp

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:single_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb

芋道源码

芋道源码

粉丝 347
博文 75
码字总数 193913
作品 0
徐汇
后端工程师
私信 提问
加载中
此博客有 18 条评论,请先登录后再查看。
用vertx实现高吞吐量的站点计数器

工具:vertx,redis,mongodb,log4j 源代码地址:https://github.com/jianglibo/visitrank 先看架构图: 如果你不熟悉vertx,请先google一下。我这里将vertx当作一个容器,上面所有的圆圈要...

jianglibo
2014/04/03
4.3K
3
Flappy Bird(安卓版)逆向分析(一)

更改每过一关的增长分数 反编译的步骤就不介绍了,我们直接来看反编译得到的文件夹 方法1:在smali目录下,我们看到org/andengine/,可以知晓游戏是由andengine引擎开发的。打开/res/raw/at...

enimey
2014/03/04
6.2K
18
beego API开发以及自动化文档

beego API开发以及自动化文档 beego1.3版本已经在上个星期发布了,但是还是有很多人不了解如何来进行开发,也是在一步一步的测试中开发,期间QQ群里面很多人都问我如何开发,我的业余时间实在...

astaxie
2014/06/25
2.7W
22
程序猿媛一:Android滑动翻页+区域点击事件

滑动翻页+区域点击事件 ViewPager+GrideView 声明:博文为原创,文章内容为,效果展示,思路阐述,及代码片段。文尾附注源码获取途径。 转载请保留原文出处“http://my.oschina.net/gluoyer...

花佟林雨月
2013/11/09
4.3K
1
数据库代码辅助工具--MaoCaiJun.Database

MaoCaiJun.DataBase 是一个用于 Microsoft Visual Studio 的数据库代码生成组件。它是基于 xml 文件的代码创建工具,支持sql2000,sql2005,sql2008,access, SQLite MaoCaiJun.Database 数据库...

mccj
2013/02/06
2.4K
1

没有更多内容

加载失败,请刷新页面

加载更多

红队之windows用户和组

目录 0x01 用户账户和组策略 0x02 Windows中的访问控制 0x03 安全标识符SID 0x04 用户账户控制(UAC) 用户帐户 用户帐户是对计算机用户身份的标识,本地用户帐户、密码存在本地计算机上,只...

黑白天安全团队
昨天
15
0
厉害了!百度智能云NIRO Pro智能机器人半年内连获三项产品设计大奖

短短半年内,百度智能云NIRO Pro智能机器人连获三项产品设计大奖,其中包括有“设计界奥斯卡”之称的德国红点奖,成功引领了全球助理机器人的工业设计和发展趋势风向标。红点奖评委这样评价,...

百度智能云
2019/12/04
5
0
StringBuider 在什么条件下、如何使用效率更高?

作者:后青春期的Keats cnblogs.com/keatsCoder/p/13212289.html 引言 都说 StringBuilder 在处理字符串拼接上效率要强于 String,但有时候我们的理解可能会存在一定的偏差。最近我在测试数据...

Object_Man
今天
11
0
发布更新|腾讯云 Serverless 产品动态 20200813

一、云函数 SCF + Ckafka 联合转储方案正式发布 发布时间: 2020-08-06 产品背景: SCF + Ckafka 联合转储方案可以帮忙用户节省使用与开发成本,用户可以将 Ckafka 消息转储同步转储至消息队...

腾讯云Serverless
47分钟前
5
0
如何正确强制执行Git推送? - How do I properly force a Git push?

问题: I've set up a remote non-bare "main" repo and cloned it to my computer. 我已经建立了一个远程的非裸露的“主”仓库,并将其克隆到我的计算机上。 I made some local changes, u......

javail
49分钟前
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部