文档章节

数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

芋道源码
 芋道源码
发布于 2017/07/20 01:32
字数 1613
阅读 1976
收藏 81

wechat_mp

🙂🙂🙂关注微信公众号有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右


1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》
  2. 《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

  1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQLMongoDB操作 发送给 MongoDB Server
  2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。


Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.java
public MongoData query() throws MongoSQLException {
   if (!(statement instanceof SQLSelectStatement)) {
       //return null;
       throw new IllegalArgumentException("not a query sql statement");
   }
   MongoData mongo = new MongoData();
   DBCursor c = null;
   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;
   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
   int icount = 0;
   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {
       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();

       BasicDBObject fields = new BasicDBObject();

       // 显示(返回)的字段
       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {
           //System.out.println(item.toString());
           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {
               if (item.getExpr() instanceof SQLAggregateExpr) {
                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();
                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)
                       icount = 1;
                       mongo.setField(getExprFieldName(expr), Types.BIGINT);
                   }
                   fields.put(getExprFieldName(expr), 1);
               } else {
                   fields.put(getFieldName(item), 1);
               }
           }

       }

       // 表名
       SQLTableSource table = mysqlSelectQuery.getFrom();
       DBCollection coll = this._db.getCollection(table.toString());
       mongo.setTable(table.toString());

       // WHERE
       SQLExpr expr = mysqlSelectQuery.getWhere();
       DBObject query = parserWhere(expr);

       // GROUP BY
       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
       BasicDBObject gbkey = new BasicDBObject();
       if (groupby != null) {
           for (SQLExpr gbexpr : groupby.getItems()) {
               if (gbexpr instanceof SQLIdentifierExpr) {
                   String name = ((SQLIdentifierExpr) gbexpr).getName();
                   gbkey.put(name, Integer.valueOf(1));
               }
           }
           icount = 2;
       }

       // SKIP / LIMIT
       int limitoff = 0;
       int limitnum = 0;
       if (mysqlSelectQuery.getLimit() != null) {
           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
       }
       if (icount == 1) { // COUNT(*)
           mongo.setCount(coll.count(query));
       } else if (icount == 2) { // MapReduce
           BasicDBObject initial = new BasicDBObject();
           initial.put("num", 0);
           String reduce = "function (obj, prev) { " + "  prev.num++}";
           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
       } else {
           if ((limitoff > 0) || (limitnum > 0)) {
               c = coll.find(query, fields).skip(limitoff).limit(limitnum);
           } else {
               c = coll.find(query, fields);
           }

           // order by
           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();
           if (orderby != null) {
               BasicDBObject order = new BasicDBObject();
               for (int i = 0; i < orderby.getItems().size(); i++) {
                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);
                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
               }
               c.sort(order);
               // System.out.println(order);
           }
       }
       mongo.setCursor(c);
   }
   return mongo;
}

2、查询条件

// MongoSQLParser.java
private void parserWhere(SQLExpr aexpr, BasicDBObject o) {
   if (aexpr instanceof SQLBinaryOpExpr) {
       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;
       SQLExpr exprL = expr.getLeft();
       if (!(exprL instanceof SQLBinaryOpExpr)) {
           if (expr.getOperator().getName().equals("=")) {
               o.put(exprL.toString(), getExpValue(expr.getRight()));
           } else {
               String op = "";
               if (expr.getOperator().getName().equals("<")) {
                   op = "$lt";
               } else if (expr.getOperator().getName().equals("<=")) {
                   op = "$lte";
               } else if (expr.getOperator().getName().equals(">")) {
                   op = "$gt";
               } else if (expr.getOperator().getName().equals(">=")) {
                   op = "$gte";
               } else if (expr.getOperator().getName().equals("!=")) {
                   op = "$ne";
               } else if (expr.getOperator().getName().equals("<>")) {
                   op = "$ne";
               }
               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
           }
       } else {
           if (expr.getOperator().getName().equals("AND")) {
               parserWhere(exprL, o);
               parserWhere(expr.getRight(), o);
           } else if (expr.getOperator().getName().equals("OR")) {
               orWhere(exprL, expr.getRight(), o);
           } else {
               throw new RuntimeException("Can't identify the operation of  of where");
           }
       }
   }
}

private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {
   BasicDBObject xo = new BasicDBObject();
   BasicDBObject yo = new BasicDBObject();
   parserWhere(exprL, xo);
   parserWhere(exprR, yo);
   ob.put("$or", new Object[]{xo, yo});
}

3、解析 MongoDB 数据

// MongoResultSet.java
public MongoResultSet(MongoData mongo, String schema) throws SQLException {
   this._cursor = mongo.getCursor();
   this._schema = schema;
   this._table = mongo.getTable();
   this.isSum = mongo.getCount() > 0;
   this._sum = mongo.getCount();
   this.isGroupBy = mongo.getType();

   if (this.isGroupBy) {
       dblist = mongo.getGrouyBys();
       this.isSum = true;
   }
   if (this._cursor != null) {
       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);
       // 解析 fields
       if (this._cursor.hasNext()) {
           _cur = _cursor.next();
           if (_cur != null) {
               if (select.length == 0) {
                   SetFields(_cur.keySet());
               }
               _row = 1;
           }
       }
       // 设置 fields 类型
       if (select.length == 0) {
           select = new String[]{"_id"};
           SetFieldType(true);
       } else {
           SetFieldType(false);
       }
   } else {
       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
       SetFieldType(mongo.getFields());
   }
}
  • 当使用 SELECT * 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.java
private void ouputResultSet(ServerConnection sc, String sql)
       throws SQLException {
   ResultSet rs = null;
   Statement stmt = null;

   try {
       stmt = con.createStatement();
       rs = stmt.executeQuery(sql);

       // header
       List<FieldPacket> fieldPks = new LinkedList<>();
       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);
       int colunmCount = fieldPks.size();
       ByteBuffer byteBuf = sc.allocate();
       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
       headerPkg.fieldCount = fieldPks.size();
       headerPkg.packetId = ++packetId;
       byteBuf = headerPkg.write(byteBuf, sc, true);
       byteBuf.flip();
       byte[] header = new byte[byteBuf.limit()];
       byteBuf.get(header);
       byteBuf.clear();
       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
       for (FieldPacket curField : fieldPks) {
           curField.packetId = ++packetId;
           byteBuf = curField.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] field = new byte[byteBuf.limit()];
           byteBuf.get(field);
           byteBuf.clear();
           fields.add(field);
       }
       // header eof
       EOFPacket eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       byte[] eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       byteBuf.clear();
       this.respHandler.fieldEofResponse(header, fields, eof, this);

       // row
       while (rs.next()) {
           RowDataPacket curRow = new RowDataPacket(colunmCount);
           for (int i = 0; i < colunmCount; i++) {
               int j = i + 1;
               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                   curRow.add(rs.getBytes(j));
               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                   // ensure that do not use scientific notation format
                   BigDecimal val = rs.getBigDecimal(j);
                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));
               } else {
                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
               }
           }
           curRow.packetId = ++packetId;
           byteBuf = curRow.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] row = new byte[byteBuf.limit()];
           byteBuf.get(row);
           byteBuf.clear();
           this.respHandler.rowResponse(row, this);
       }
       fieldPks.clear();
       // row eof
       eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       sc.recycle(byteBuf);
       this.respHandler.rowEofResponse(eof, this);
   } finally {
       if (rs != null) {
           try {
               rs.close();
           } catch (SQLException e) {
           }
       }
       if (stmt != null) {
           try {
               stmt.close();
           } catch (SQLException e) {
           }
       }
   }
}

// MongoResultSet.java
@Override
public String getString(String columnLabel) throws SQLException {
   Object x = getObject(columnLabel);
   if (x == null) {
       return null;
   }
   return x.toString();
}
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql> select * from user order by _id asc;
+--------------------------+------+-------------------------------+
| _id                      | name | profile                       |
+--------------------------+------+-------------------------------+
| 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

// MongoSQLParser.java
public int executeUpdate() throws MongoSQLException {
   if (statement instanceof SQLInsertStatement) {
       return InsertData((SQLInsertStatement) statement);
   }
   if (statement instanceof SQLUpdateStatement) {
       return UpData((SQLUpdateStatement) statement);
   }
   if (statement instanceof SQLDropTableStatement) {
       return dropTable((SQLDropTableStatement) statement);
   }
   if (statement instanceof SQLDeleteStatement) {
       return DeleteDate((SQLDeleteStatement) statement);
   }
   if (statement instanceof SQLCreateTableStatement) {
       return 1;
   }
   return 1;
}

private int InsertData(SQLInsertStatement state) {
   if (state.getValues().getValues().size() == 0) {
       throw new RuntimeException("number of  columns error");
   }
   if (state.getValues().getValues().size() != state.getColumns().size()) {
       throw new RuntimeException("number of values and columns have to match");
   }
   SQLTableSource table = state.getTableSource();
   BasicDBObject o = new BasicDBObject();
   int i = 0;
   for (SQLExpr col : state.getColumns()) {
       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
       i++;
   }
   DBCollection coll = this._db.getCollection(table.toString());
   coll.insert(o);
   return 1;
}

5. 彩蛋

老铁,看到这里,来一波微信公众号关注吧?!

wechat_mp

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:single_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb

© 著作权归作者所有

共有 人打赏支持
芋道源码

芋道源码

粉丝 284
博文 74
码字总数 191189
作品 0
徐汇
后端工程师
私信 提问
加载中

评论(18)

奔跑的potato
奔跑的potato
我还是觉得node.js好用
fengyang
fengyang
不知道放到生产环境上如何
fengyang
fengyang
mycat对mongodb的支持有点弱吧,简单的语法没问题
芋道源码
芋道源码

引用来自“doge_刀戈”的评论

cool
刀哥好,求带 LOL 倔强青铜王者
doge_刀戈
doge_刀戈
cool
芋道源码
芋道源码

引用来自“ljxun”的评论

这种中间件的设计思想有问题,这种中间件应该是放在jdbc之前,通过配置去判断是使用jdbc或是其他,是使用sql语句,还是使用nosql语句。这样才可以模糊数据库的界限,通过解析sql语句去转化成其他语句,这种实现方式太复杂,要做到很全的话,要去重新实现一个sql语句解析器。
看设计的思路,MyCAT 本身的思路目前是“伪装”成 MySQL-Proxy
在 Solr、ES 这些中间件没出来的时候,有些公司是实现 SQL 来去访问 Lucene

MyCAT 目前基于 Druid 实现的语法解析器。

最近在学习 Sharding-JDBC 的源码,之后会出一篇 TA 的语法解析器实现的分享文章。

可以关注我的微信公众号哟。:bowtie:
芋道源码
芋道源码

引用来自“黎明伟”的评论

能够支持的SQL操作应该很有限,是吧
对的。基本是平行翻译的,MongoDB 有的操作,才能被翻译成 SQL
ljxun
ljxun
这种中间件的设计思想有问题,这种中间件应该是放在jdbc之前,通过配置去判断是使用jdbc或是其他,是使用sql语句,还是使用nosql语句。这样才可以模糊数据库的界限,通过解析sql语句去转化成其他语句,这种实现方式太复杂,要做到很全的话,要去重新实现一个sql语句解析器。
黎明伟
黎明伟
能够支持的SQL操作应该很有限,是吧
丁富贵
这东西是个好东西,可惜领头人掉钱眼里了,不知道还有没有心思在开源上,每天忙着卖书卖课程吹牛B,还觉得有竞争对手要谋害自己
MyCAT SQL ON MongoDB

概述 可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。 吼吼吼,让我们开启这段神奇的“旅途”。 本文主要分成四部分: 总体流程,让你有个整体...

wangchen1999
05/02
0
0
mongodb通过sql来查询

mongodb本身是不支持用sql进行查询。这是mongodb是nosql数据库。但是可以通过语法转换的方式来实现。记得mycat是有个黑科技的,就是可以查询mongodb。我就在git上查询了一下mycat的源代码。发...

miaojiangmin
05/23
0
0
mycat实现mysql分库分表

1. mycat介绍 MyCat发展到目前的版本,已经不是一个单纯的MySQL代理了,它的后端可以支持MySQL、SQL Server、Oracle、DB2、PostgreSQL等主流数据库,也支持MongoDB这种新型NoSQL方式的存储,...

红隐
05/06
0
0
Mycat学习实战-Mycat初识

Mycat学习实战-Mycat初识 Mycat学习实战-Mycat初识 1. Mycat是什么 2. Mycat与其他中间件的区别 3. Mycat能解决的问题 4. Mycat核心概念 5. Mycat文件夹以及文件介绍 1. Mycat是什么 Mycat是...

ygqygq2
06/29
0
0
mycat 1.6.5 for mysql 8分表攻略

如果你下载已经打包好的mycat 1.6.5压缩包.tar.gz是永远也连接不了mysql 8的,我们需要的是mycat 1.6.5的源代码Mycat-Server-1.6.zip的源代码,这个可以自行去下载.下载完之后,第一次加载需要加...

算法之名
10/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈“李氏代换”——从纪念金庸和斯坦李说起

李氏代换(LSP)简介 李氏代换是软件设计的一个原则,又名依赖倒转原则或依赖倒置原则,其衍生原则有接口分离原则等。该原则由Barbara Liskov于1988年提出。 该原则指出,程序中高级别的元素...

SamYjy
23分钟前
14
0
JavaScript实现在线websocket WSS测试工具 -toolfk程序员工具网

本文要推荐的[ToolFk]是一款程序员经常使用的线上免费测试工具箱,ToolFk 特色是专注于程序员日常的开发工具,不用安装任何软件,只要把内容贴上按一个执行按钮,就能获取到想要的内容结果。T...

toolfk
42分钟前
1
0
linux-scp 远程拷贝报错原因

刚拿到一台重装后的服务器,远程ssh都正常,但是一scp拷贝东西就报错: 本地确定是有scp命令的,而且如果是本地没有scp不会报后面那句lost connection,因此就是远程没有scp这个命令。因此在...

linuxprobe16
55分钟前
1
0
OSChina 周六乱弹 —— 谁小时候没当过熊孩子呀

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @小小编辑:推荐歌曲《行尸走肉》- amazarashi 《行尸走肉》- amazarashi 手机党少年们想听歌,请使劲儿戳(这里) @神话 :周五了,周末干啥...

小小编辑
今天
123
4
docker部署springboot项目

安装docker 菜鸟教程 springboot项目 maven依赖 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001......

yimingkeji
今天
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部