文档章节

数据库中间件 MyCAT源码分析 —— 跨库两表Join

芋道源码
 芋道源码
发布于 2017/07/13 21:46
字数 2064
阅读 1583
收藏 87
点赞 3
评论 1

wechat_mp

🙂🙂🙂关注微信公众号有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右


1. 概述

MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。

本文主要分享:

  1. 整体流程、调用顺序图
  2. 核心代码的分析

前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》

OK,Let's Go。

2. 主流程

当执行跨库两表 Join SQL 时,经历的大体流程如下:

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。

HintCatletHandler 获取注解对应的 Catlet 实现类,io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看,ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。

核心代码如下:

// HintCatletHandler.java
public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,
                           int sqlType, String realSQL, String charset, ServerConnection sc,
                           LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)
       throws SQLNonTransientException {
   String cateletClass = hintSQLValue;
   if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);
   }
   try {
       Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
       catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));
   } catch (Exception e) {
       LOGGER.warn("catlet error " + e);
       throw new SQLNonTransientException(e);
   }
   return null;
}

3. ShareJoin

目前支持跨库表 Join。ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。

伪代码如下:

// SELECT u.id, o.id FROM t_order o 
// INNER JOIN t_user u ON o.uid = u.id
// 【顺序】查询左表
String leftSQL = "SELECT o.id, u.id FROM t_order o";
List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
// 【并行】查询右表
String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
for (dn : dns) { // 此处是并行执行,使用回调逻辑
    for (rightRecord : dn.select(rightSQL)) { // 查询右表
        // 合并结果
        for (leftRecord : leftList) {
            if (leftRecord.uid == rightRecord.id) {
                write(leftRecord + leftRecord.uid 拼接结果);
            }
        }
    }
} 

实际情况会更加复杂,我们接下来一点点往下看。

3.1 JoinParser

JoinParser 负责对 SQL 进行解析。整体流程如下:

举个例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 结果如下:

  • tName :表名
  • tAlia :表自定义命名
  • where :过滤条件
  • order :排序条件
  • parenTable :左连接的 Join 的表名。t_user表 在 join属性 的 parenTable 为 "o",即 t_order
  • joinParentkey :左连接的 Join 字段
  • joinKey :join 字段。t_user表 在 join属性 为 id
  • join :子 tableFilter。即,该表连接的右边的表。
  • parent :和 join属性 相对。

看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:

// TableFilter.java
public String getSQL() {
   String sql = "";
   // fields
   for (Entry<String, String> entry : fieldAliasMap.entrySet()) {
       String key = entry.getKey();
       String val = entry.getValue();
       if (val == null) {
           sql = unionsql(sql, getFieldfrom(key), ",");
       } else {
           sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
       }
   }
   // where
   if (parent == null) {    // on/where 等于号左边的表
       String parentJoinKey = getJoinKey(true);
       // fix sharejoin bug:
       // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
       // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误
       if (sql != null && parentJoinKey != null &&
               !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
           sql += ", " + parentJoinKey;
       }
       sql = "select " + sql + " from " + tName;
       if (!(where.trim().equals(""))) {
           sql += " where " + where.trim();
       }
   } else {    // on/where 等于号右边边的表
       if (allField) {
           sql = "select " + sql + " from " + tName;
       } else {
           sql = unionField("select " + joinKey, sql, ",");
           sql = sql + " from " + tName;
           //sql="select "+joinKey+","+sql+" from "+tName;
       }
       if (!(where.trim().equals(""))) {
           sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
       } else {
           sql += " where " + joinKey + " in %s ";
       }
   }
   // order
   if (!(order.trim().equals(""))) {
       sql += " order by " + order.trim();
   }
   // limit
   if (parent == null) {
       if ((rowCount > 0) && (offset > 0)) {
           sql += " limit" + offset + "," + rowCount;
       } else {
           if (rowCount > 0) {
               sql += " limit " + rowCount;
           }
       }
   }
   return sql;
}
  • parent 为空时,即on/where 等于号左边的表。例如:select id, uid from t_order
  • parent 不为空时,即on/where 等于号右边的表。例如:select id, username from t_user where id in (1, 2, 3)

3.2 ShareJoin.processSQL(...)

当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getSql() 的返回结果为 select id, uid from t_order

生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

3.3 BatchSQLJob

EngineCtxBatchSQLJob 封装,提供上层两个方法:

  1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务
  2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务

核心代码如下:

// EngineCtx.java
public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
		SQLJobHandler jobHandler) {
	for (String dataNode : dataNodes) {
		SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
				jobHandler, this);
		bachJob.addJob(job, false);
	}
}

public void executeNativeSQLParallJob(String[] dataNodes, String sql,
		SQLJobHandler jobHandler) {
	for (String dataNode : dataNodes) {
		SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
				jobHandler, this);
		bachJob.addJob(job, true);
	}
}	

BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:

// BatchSQLJob.java
/**
* 执行中任务列表
*/
private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();
/**
* 待执行任务列表
*/
private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();

public void addJob(SQLJob newJob, boolean parallExecute) {
   if (parallExecute) {
       runJob(newJob);
   } else {
       waitingJobs.offer(newJob);
       if (runningJobs.isEmpty()) { // 若无正在执行中的任务,则从等待队列里获取任务进行执行。
           SQLJob job = waitingJobs.poll();
           if (job != null) {
               runJob(job);
           }
       }
   }
}

public boolean jobFinished(SQLJob sqlJob) {
	runningJobs.remove(sqlJob.getId());
	SQLJob job = waitingJobs.poll();
	if (job != null) {
		runJob(job);
		return false;
	} else {
		if (noMoreJobInput) {
			return runningJobs.isEmpty() && waitingJobs.isEmpty();
		} else {
			return false;
		}
	}
}
  • 顺序执行时,当 runningJobs 存在执行中的任务时,#addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。
  • 并发执行时,#addJob(...) 时,立即执行。

SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。

ShareJoin 里,SQLJobHandler 有两个实现:ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

3.4 ShareDBJoinHandler

ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。
  • #rowResponse(...) :接收数据节点返回的 row,放入内存。
  • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中**#createQryJob(...)**。

当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 时, sql = getChildSQL() 的返回结果为 select id, username from t_user where id in (1, 2, 3)

核心代码如下:

// ShareJoin.java
private void createQryJob(int batchSize) {
   int count = 0;
   Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
   String theId = null;
   StringBuilder sb = new StringBuilder().append('(');
   String svalue = "";
   for (Map.Entry<String, String> e : ids.entrySet()) {
       theId = e.getKey();
       byte[] rowbyte = rows.remove(theId);
       if (rowbyte != null) {
           batchRows.put(theId, rowbyte);
       }
       if (!svalue.equals(e.getValue())) {
           if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
                   || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 为varchar
               sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
           } else { // 默认joinkey为int/long
               sb.append(e.getValue()).append(','); // (1,2,3)
           }
       }
       svalue = e.getValue();
       if (count++ > batchSize) {
           break;
       }
   }
   if (count == 0) {
       return;
   }
   jointTableIsData = true;
   sb.deleteCharAt(sb.length() - 1).append(')');
   String sql = String.format(joinParser.getChildSQL(), sb);
   getRoute(sql);
   ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));
}

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:

  • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。
  • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。
  • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。

核心代码如下:

// ShareRowOutPutDataHandler.java
public boolean onRowData(String dataNode, byte[] rowData) {
   RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
   //拷贝一份batchRows
   Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();
   batchRowsCopy.putAll(arows);
   // 获取Id字段,
   String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
   // 查找ID对应的A表的记录
   byte[] arow = getRow(batchRowsCopy, id, joinL);
   while (arow != null) {
       RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
       for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
           // 设置b.name 字段
           byte[] bname = rowDataPkgold.fieldValues.get(i);
           rowDataPkg.add(bname);
           rowDataPkg.addFieldCount(1);
       }
       // huangyiming add
       MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
       if (null == middlerResultHandler) {
           ctx.writeRow(rowDataPkg);
       } else {
           if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
               byte[] columnData = rowDataPkg.fieldValues.get(0);
               if (columnData != null && columnData.length > 0) {
                   String rowValue = new String(columnData);
                   middlerResultHandler.add(rowValue);
               }
               //}
           }

       }
       arow = getRow(batchRowsCopy, id, joinL);
   }
   return false;
}

4. 彩蛋

如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。

ShareJoin 另外不支持的功能:

  1. 只支持 inner join,不支持 left join、right join 等等连接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相关聚合函数。
  4. 即使 join 左表的字段未声明为返回 fields 也会返回。

恩,MyCAT 弱XA 源码继续走起!

© 著作权归作者所有

共有 人打赏支持
芋道源码

芋道源码

粉丝 249
博文 71
码字总数 191189
作品 0
徐汇
后端工程师
加载中

评论(1)

Ryan-瑞恩
Ryan-瑞恩
:+1:
MySQL多数据源笔记4-Mycat中间件实战

Mycat 是数据库中间件,就是介于数据库与应用之间,进行数据处理与交互的中间服 务。由于前面讲的对数据进行分片处理之后,从原有的一个库,被切分为多个分片数据库,所有的分片数据库集 群构...

狂小白 ⋅ 03/18 ⋅ 0

数据库中间件01-认识mycat

简述 Mycat是国产的一套免费开源的分布式数据库中间件。想必做开发或者运维的朋友对中间件这个词应该是比较熟悉了,我们见到过java中间件,消息中间件等等,这里又来了一个数据库中间件。那么...

细节探索者 ⋅ 05/11 ⋅ 0

Mysql数据库周末学习之Mycat全局表 mysql DBA

Mysql数据库业务中有些数据类似于数据字典,比如配置文件的配置,常用业务的配置或者数据量不大很少变动的表,这些表往往不是特别大,但是大部分的业务场景都会用到,那么这种表适合于Mycat...

mo默瑶 ⋅ 05/30 ⋅ 0

Mysql数据库开发 mycat全局表配置简析

Mysql数据库业务中有些数据类似于数据字典,比如配置文件的配置,常用业务的配置或者数据量不大很少变动的表,这些表往往不是特别大,但是大部分的业务场景都会用到,那么这种表适合于Mycat...

老男孩Linux培训 ⋅ 05/30 ⋅ 0

mycat实现mysql分库分表

1. mycat介绍 MyCat发展到目前的版本,已经不是一个单纯的MySQL代理了,它的后端可以支持MySQL、SQL Server、Oracle、DB2、PostgreSQL等主流数据库,也支持MongoDB这种新型NoSQL方式的存储,...

红隐 ⋅ 05/06 ⋅ 0

Mycat读写分离笔记Windows

Mycat读写分离笔记Windows 自己搭了一个运用Mycat中间件搭建了一个读写分离的demo,昨晚还在奋战当中,因为连接mycat的时候老是报错:No Mycat DataBases selected.尝试了很多的方法,然后还...

推荐码发放 ⋅ 04/29 ⋅ 0

Mycat读写分离以及拆库拆表综合实验2:部署配置mycat读写分离与拆库拆表

部署 mycat mycat基于java运行,检查java版本 配置java变量 mycat下载地址 大致架构图 核心概念 schema:由多个数据库组成的逻辑数据库 datanode:将数据切分后,一个大表被分配到不同的分片数...

Mathology ⋅ 05/24 ⋅ 0

高性能Mysql中间组件Mycat安装配置

高性能Mysql中间组件Mycat安装配置 由于公司项目需要使用到读写分离,但是官方版本的Mysql-proxy读写分离组件太不稳定且有很多BUG,最终决定使用Mycat作为中间组件,其实读写分离只是Mycat中...

幸运券发放 ⋅ 05/02 ⋅ 0

myCat schema.xml详解

schema.xml 是mycat重要的配置文件之一,管理者mycat的逻辑库、表、分片规则、DataNode以及DataSource。想要了解使用mycat就要弄清楚schema里面的标签含义,下面将介绍一下schema里面的详细内...

WJXing ⋅ 05/22 ⋅ 0

MyCAT SQL ON MongoDB

概述 可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。 吼吼吼,让我们开启这段神奇的“旅途”。 本文主要分成四部分: 总体流程,让你有个整体...

wangchen1999 ⋅ 05/02 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Thrift RPC实战(二) Thrift 网络服务模型

TServer类层次体系 TSimpleServer/TThreadPoolServer是阻塞服务模型 TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO) 1 TServer抽象类的定义 内部静态类Args的...

lemonLove ⋅ 15分钟前 ⋅ 0

vim命令用法

第五章 vim命令 vim和vi几乎是一样的,唯一的区别就是当编辑一个文本时,使用vi不会显示颜色,而使用vim会显示颜色。 vim有三个模式:一般模式,编辑模式,命令模式。 系统最小化安装时没有安...

弓正 ⋅ 17分钟前 ⋅ 0

MyBatis源码解读之配置

1. 目的 本文主要介绍MyBatis配置文件解析,通过源码解读mybatis-config.xml(官方默认命名)、Mapper.xml 与Java对象的映射。 2. MyBatis结构 查看大图 MyBatis结构图,原图实在太模糊了,所以...

无忌 ⋅ 20分钟前 ⋅ 0

Ignite的jdbc与网格的连接方式的查询性能对比

环境: 数据量100万 Ignite2.5 Windows10 8g jdbc方式连接 import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; i......

仔仔1993 ⋅ 35分钟前 ⋅ 0

收集自网络的wordpress 分页导航的代码教程(全网最全版)

wordpress 分页导航是用来切换文章的一个功能,添加了 wordpress 分页导航后,用户即可自由到达指定的页面数浏览分类文章,而这样的一个很简单功能却有很多朋友在用插件:WP-PageNavi,插件的...

Rhymo-Wu ⋅ 51分钟前 ⋅ 0

微服务 WildFly Swarm 入门

Hello World 就像前面章节中的其他框架一样,我们希望添加一些基本的 Hello-world 功能,然后在其上逐步添加更多的功能。让我们从在我们的项目中创建一个 HolaResources 开始。您可以使用您的...

woshixin ⋅ 58分钟前 ⋅ 0

Maven的安装和Eclipse的配置

1. 下载Maven 下载地址 2. 解压压缩包,放到自己习惯的硬盘中 此处我将其放到了 D:\Tools 目录下。 3. 配置环境变量 右键此电脑 -> 属性 -> 高级系统设置 -> 环境变量。 在系统变量中新建,变...

影狼 ⋅ 今天 ⋅ 0

python pip使用国内镜像的方法

国内源 清华:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/ 华中理工大学:http://......

良言 ⋅ 今天 ⋅ 0

对于url变化的spa应该如何使用微信jssdk

使用vue单页面碰上微信jssdk config验证失败的坑。第一次成功 之后切换页面全部失败,找到了解决方法,第一次验证成功后保存验证信息 切换页面时验证信息直接拿来用,加一个wx.error() 失败时...

孙冠峰 ⋅ 今天 ⋅ 0

Spring Cloud Gateway 一般集成

SCF发布,带来很多新东西,不过少了点教程,打开方式又和以前的不一样,比如这个SCG,压根就没有入门指导,所以这里写一个,以备后用。 一、集成 pom.xml <dependency> <groupI...

kut ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部